Updated Branches: refs/heads/master 69d711b99 -> b9dcd8963
Conversion from CommChannel to Channel added. Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/941f6c43 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/941f6c43 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/941f6c43 Branch: refs/heads/master Commit: 941f6c43e02e5b2cc29c53cac7602c046c6742a4 Parents: 69d711b Author: Tammo van Lessen <[email protected]> Authored: Fri Aug 16 00:19:57 2013 +0200 Committer: Tammo van Lessen <[email protected]> Committed: Fri Aug 16 00:19:57 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/ode/jacob/ChannelRef.java | 22 ++++-- src/main/java/org/apache/ode/jacob/Jacob.java | 4 +- .../java/org/apache/ode/jacob/JacobThread.java | 2 +- .../org/apache/ode/jacob/soup/CommChannel.java | 13 ++-- .../apache/ode/jacob/vpu/ChannelFactory.java | 1 + .../ode/jacob/vpu/ExecutionQueueImpl.java | 2 + .../jacob/examples/helloworld/HelloWorld.java | 6 +- .../org/apache/ode/jacob/vpu/ChannelTest.java | 71 ++++++++++++++++++++ 8 files changed, 107 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/ChannelRef.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java index 1209bcb..ea63610 100644 --- a/src/main/java/org/apache/ode/jacob/ChannelRef.java +++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java @@ -20,7 +20,9 @@ package org.apache.ode.jacob; import java.io.Serializable; +import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.vpu.ChannelFactory; import org.apache.ode.jacob.vpu.JacobVPU; @@ -30,19 +32,21 @@ import org.apache.ode.jacob.vpu.JacobVPU; */ public class ChannelRef implements Serializable { - public enum Type { JACOB_OBJECT, CHANNEL, MESSAGE_LISTENER } + public enum Type { RUNNABLE, CHANNEL, MESSAGE_LISTENER } private static final long serialVersionUID = 1L; private final Type type; private final Object target; + private transient Channel cachedChannel; + public ChannelRef(Object target) { assert target != null; if (target instanceof CommChannel) { type = Type.CHANNEL; - } else if (target instanceof JacobObject) { - type = Type.JACOB_OBJECT; + } else if (target instanceof Runnable) { + type = Type.RUNNABLE; } else if (target instanceof MessageListener) { type = Type.MESSAGE_LISTENER; } else { @@ -66,10 +70,20 @@ public class ChannelRef implements Serializable { @SuppressWarnings("unchecked") public <T> T getEndpoint(Class<T> clazz) { - if (type.equals(Type.JACOB_OBJECT) && JacobObject.class.isAssignableFrom(clazz)) { + if (type.equals(Type.RUNNABLE) && Runnable.class.isAssignableFrom(clazz)) { return (T)target; } else if (type.equals(Type.CHANNEL) && CommChannel.class.isAssignableFrom(clazz)) { return (T)target; + } else if (type.equals(Type.CHANNEL) && Channel.class.isAssignableFrom(clazz)) { + if (cachedChannel == null) { + cachedChannel = ChannelFactory.createChannel((CommChannel)target, clazz); + } + + if (!clazz.isAssignableFrom(cachedChannel.getClass())) { + throw new IllegalStateException("ChannelRef is already associated with a channel of a different type"); + } + + return (T)cachedChannel; } else if (type.equals(Type.MESSAGE_LISTENER) && MessageListener.class.isAssignableFrom(clazz)) { return (T)target; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/Jacob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java index eda96fe..b636994 100644 --- a/src/main/java/org/apache/ode/jacob/Jacob.java +++ b/src/main/java/org/apache/ode/jacob/Jacob.java @@ -71,8 +71,8 @@ public class Jacob { * @param channel * @return */ - public static ChannelRef newCommChannel(Class<?> channelType, String description) { - return JacobVPU.activeJacobThread().newCommChannel(channelType, null, description); + public static ChannelRef newCommChannel(String description) { + return JacobVPU.activeJacobThread().newCommChannel(description); } /** http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/JacobThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java b/src/main/java/org/apache/ode/jacob/JacobThread.java index 95a86a7..1d95c13 100644 --- a/src/main/java/org/apache/ode/jacob/JacobThread.java +++ b/src/main/java/org/apache/ode/jacob/JacobThread.java @@ -47,7 +47,7 @@ public interface JacobThread { * @param description * @return */ - public ChannelRef newCommChannel(Class<?> channelType, String creator, String description); + public ChannelRef newCommChannel(String description); /** * DOCUMENT ME http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/soup/CommChannel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/CommChannel.java b/src/main/java/org/apache/ode/jacob/soup/CommChannel.java index bbf1409..5c33417 100644 --- a/src/main/java/org/apache/ode/jacob/soup/CommChannel.java +++ b/src/main/java/org/apache/ode/jacob/soup/CommChannel.java @@ -32,20 +32,25 @@ public class CommChannel extends ExecutionQueueObject implements Serializable { private Class<?> _type; - // used for deserialization - @SuppressWarnings("unused") - private CommChannel() {} + public CommChannel() {} public CommChannel(Class<?> type) { _type = type; } + public void setType(Class<?> type) { + if (_type != null && _type != type) { + throw new IllegalStateException("Type is already set!"); + } + _type = type; + } + public Class<?> getType() { return _type; } public String toString() { - StringBuffer buf = new StringBuffer(_type.getSimpleName()); + StringBuffer buf = new StringBuffer(_type == null ? "<unbound>" : _type.getSimpleName()); if (getDescription() != null) { buf.append(':').append(getDescription()); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java b/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java index 7fa9e00..ab5743d 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java @@ -46,6 +46,7 @@ public class ChannelFactory { InvocationHandler h = new ChannelInvocationHandler(backend); Class<?>[] ifaces = new Class[] { ChannelProxy.class, type }; Object proxy = Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(), ifaces, h); + backend.setType(type); return (Channel) proxy; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index d788afe..da83d31 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -98,6 +98,8 @@ public class ExecutionQueueImpl implements ExecutionQueue { private Map<Object, LinkedList<IndexedObject>> _index = new LinkedHashMap<Object, LinkedList<IndexedObject>>(); + public ExecutionQueueImpl() {} + public ExecutionQueueImpl(ClassLoader classLoader) { _classLoader = classLoader; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 40f4440..1fa9408 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -231,10 +231,10 @@ public class HelloWorld extends JacobObject implements Runnable { protected void calculusHelloWorld() { // new(out) - final ChannelRef out = newCommChannel(Val.class, "calculusHelloWorld-out"); + final ChannelRef out = newCommChannel("calculusHelloWorld-out"); // new(x) - final ChannelRef x = newCommChannel(Val.class, "calculusHelloWorld-x"); - + final ChannelRef x = newCommChannel("calculusHelloWorld-x"); + // *(?out(str).!sysout(str)) subscribe(true, out, new PrinterMessageListener()); // *(?x(str).!out(str)) http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java b/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java new file mode 100644 index 0000000..3361dfe --- /dev/null +++ b/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.vpu; + +import static org.apache.ode.jacob.Jacob.newCommChannel; +import static org.junit.Assert.*; +import org.apache.ode.jacob.ChannelRef; +import org.apache.ode.jacob.oo.Synch; +import org.apache.ode.jacob.oo.Val; +import org.apache.ode.jacob.soup.CommChannel; + +import org.junit.Test; + +public class ChannelTest { + + @Test + public void testConnectWithInterface() { + JacobVPU vpu = new JacobVPU(); + vpu.setContext(new ExecutionQueueImpl()); + + vpu.inject(new Runnable() { + + @Override + public void run() { + ChannelRef cref = newCommChannel("unbound channel"); + CommChannel cchannel = cref.getEndpoint(CommChannel.class); + assertNotNull(cchannel); + assertNull(cchannel.getType()); + + // now connect it to Val.class + Val val = cref.getEndpoint(Val.class); + assertNotNull(val); + assertEquals(Val.class, cchannel.getType()); + + // now try to associate it with a different channel interface + try { + cref.getEndpoint(Synch.class); + fail("we should get an IllegalStateException"); + } catch (IllegalStateException e) { + assertEquals("ChannelRef is already associated with a channel of a different type", e.getMessage()); + } + + // now try to associate with the same channel + Val val2 = cref.getEndpoint(Val.class); + assertNotNull(val2); + assertSame(val, val2); + + } + }); + + assertEquals(true, vpu.getContext().hasReactions()); + vpu.execute(); + assertEquals(false, vpu.getContext().hasReactions()); + } +}
