VPU and soup now only use messages instead of objects and proxies.
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/bac3ef34 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/bac3ef34 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/bac3ef34 Branch: refs/heads/master Commit: bac3ef3493d4b980dfd40e56b504e173885180d0 Parents: 6437f96 Author: Tammo van Lessen <[email protected]> Authored: Wed Aug 14 22:51:46 2013 +0200 Committer: Tammo van Lessen <[email protected]> Committed: Wed Aug 14 22:51:46 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/ode/jacob/ChannelRef.java | 6 +++--- .../java/org/apache/ode/jacob/oo/ClassUtil.java | 4 +++- .../java/org/apache/ode/jacob/soup/Comm.java | 3 +-- .../org/apache/ode/jacob/soup/CommChannel.java | 4 ++++ .../org/apache/ode/jacob/soup/CommRecv.java | 2 +- .../org/apache/ode/jacob/soup/CommSend.java | 4 ++-- .../soup/jackson/ChannelRefSerializer.java | 4 ++-- .../soup/jackson/JacobTypeResolverBuilder.java | 5 +++++ .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 21 +++++++++++++------- 9 files changed, 35 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/ChannelRef.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java index 57adcda..d741b47 100644 --- a/src/main/java/org/apache/ode/jacob/ChannelRef.java +++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java @@ -20,7 +20,7 @@ package org.apache.ode.jacob; import java.io.Serializable; -import org.apache.ode.jacob.oo.Channel; +import org.apache.ode.jacob.soup.CommChannel; @@ -37,7 +37,7 @@ public class ChannelRef implements Serializable { private final Object target; public ChannelRef(Object target) { - type = target instanceof Channel ? Type.CHANNEL : Type.JACOB_OBJECT; + type = target instanceof CommChannel ? Type.CHANNEL : Type.JACOB_OBJECT; this.target = target; } @@ -49,7 +49,7 @@ public class ChannelRef implements Serializable { public <T> T getEndpoint(Class<T> clazz) { if (type.equals(Type.JACOB_OBJECT) && JacobObject.class.isAssignableFrom(clazz)) { return (T)target; - } else if (type.equals(Type.CHANNEL) && Channel.class.isAssignableFrom(clazz)) { + } else if (type.equals(Type.CHANNEL) && CommChannel.class.isAssignableFrom(clazz)) { return (T)target; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java index bad88fa..e1336ee 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java @@ -26,6 +26,7 @@ import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.Expression; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.Message; +import org.apache.ode.jacob.soup.CommChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ public final class ClassUtil { public static final Method RUN_METHOD; public static final String RUN_METHOD_NAME = "run"; public static final String RUN_METHOD_ACTION = "java.lang.Runnable#run"; + public static final String SYNCH_RET_METHOD_ACTION = "org.apache.ode.jacob.oo.Synch#ret"; private static final Set<Method> RUN_METHOD_SET = new HashSet<Method>(); private static final Logger LOG = LoggerFactory.getLogger(ClassUtil.class); @@ -64,7 +66,7 @@ public final class ClassUtil { return message; } - public static Message createMessage(Channel target, String action, Object[] args, Channel replyTo) { + public static Message createMessage(CommChannel target, String action, Object[] args, CommChannel replyTo) { Message message = new Message(); message.setTo(new ChannelRef(target)); message.setReplyTo(replyTo == null ? null : new ChannelRef(replyTo)); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/soup/Comm.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/Comm.java b/src/main/java/org/apache/ode/jacob/soup/Comm.java index d2b4f43..ce842cb 100644 --- a/src/main/java/org/apache/ode/jacob/soup/Comm.java +++ b/src/main/java/org/apache/ode/jacob/soup/Comm.java @@ -34,8 +34,7 @@ public abstract class Comm extends ExecutionQueueObject { protected Comm() { } - protected Comm(CommGroup group, CommChannel chnl) { - _group = group; + protected Comm(CommChannel chnl) { _channel = chnl; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/soup/CommChannel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/CommChannel.java b/src/main/java/org/apache/ode/jacob/soup/CommChannel.java index 151eb97..1d8b98b 100644 --- a/src/main/java/org/apache/ode/jacob/soup/CommChannel.java +++ b/src/main/java/org/apache/ode/jacob/soup/CommChannel.java @@ -29,6 +29,10 @@ public class CommChannel extends ExecutionQueueObject { private Class<?> _type; + // used for deserialization + @SuppressWarnings("unused") + private CommChannel() {} + public CommChannel(Class<?> type) { _type = type; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/soup/CommRecv.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/CommRecv.java b/src/main/java/org/apache/ode/jacob/soup/CommRecv.java index 830f376..2fc5757 100644 --- a/src/main/java/org/apache/ode/jacob/soup/CommRecv.java +++ b/src/main/java/org/apache/ode/jacob/soup/CommRecv.java @@ -35,7 +35,7 @@ public class CommRecv extends Comm { } public CommRecv(CommChannel chnl, ChannelListener listener) { - super(null, chnl); + super(chnl); this.listener = listener; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/soup/CommSend.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/CommSend.java b/src/main/java/org/apache/ode/jacob/soup/CommSend.java index cf3aa86..3b69526 100644 --- a/src/main/java/org/apache/ode/jacob/soup/CommSend.java +++ b/src/main/java/org/apache/ode/jacob/soup/CommSend.java @@ -36,8 +36,8 @@ public class CommSend extends Comm { protected CommSend() { } - public CommSend(CommChannel chnl, Message msg) { - super(null, chnl); + public CommSend(Message msg) { + super(msg.getTo().getEndpoint(CommChannel.class)); this.msg = msg; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java index bdc4432..c50bb00 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.Message; -import org.apache.ode.jacob.oo.Channel; +import org.apache.ode.jacob.soup.CommChannel; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.core.JsonGenerator; @@ -67,6 +67,6 @@ public class ChannelRefSerializer extends StdSerializer<ChannelRef> { SerializerProvider provider) throws JsonGenerationException, IOException { jgen.writeObjectField("target", value.getEndpoint(value.getType() == ChannelRef.Type.CHANNEL - ? Channel.class : JacobObject.class)); + ? CommChannel.class : JacobObject.class)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java index 73b77c3..5bd3d22 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java @@ -86,10 +86,15 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { return true; } + //TODO: check if still needed. if (Channel.class.isAssignableFrom(t.getRawClass())) { return true; } + if (CommChannel.class.isAssignableFrom(t.getRawClass())) { + return true; + } + if (ChannelRef.class.isAssignableFrom(t.getRawClass())) { return true; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/bac3ef34/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 38e7dd8..462846f 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -257,6 +257,7 @@ public final class JacobVPU { _statistics.messagesSent++; Synch replyChannel = null; + CommChannel replyCommChannel = null; // Check for synchronous methods; create a synchronization channel if (method.getReturnType() != void.class) { if (method.getReturnType() != Synch.class) { @@ -264,16 +265,22 @@ public final class JacobVPU { "Channel method '" + method + "' must only return void or Synch"); } replyChannel = (Synch)newChannel(Synch.class, "", "Reply Channel"); + replyCommChannel = (CommChannel) ChannelFactory.getBackend((Channel)replyChannel); } + CommChannel chnl = (CommChannel) ChannelFactory.getBackend((Channel)channel); + Message msg = ClassUtil.createMessage(chnl, ClassUtil.getActionForMethod(method), args, replyCommChannel); + + sendMessage(msg); + return replyChannel; + } + + //XXX: add to super interface as message-oriented API + public void sendMessage(Message msg) { CommGroup grp = new CommGroup(false); - - Message msg = ClassUtil.createMessage(channel, ClassUtil.getActionForMethod(method), args, replyChannel); - - CommSend send = new CommSend(chnl, msg); + CommSend send = new CommSend(msg); grp.add(send); _executionQueue.add(grp); - return replyChannel; } public Channel newChannel(Class<?> channelType, String creator, String description) { @@ -390,7 +397,7 @@ public final class JacobVPU { LOG.trace(">> [{}] : {}", _cycle, _source); stackThread(); - Channel replyTo = message.getReplyTo() != null ? message.getReplyTo().getEndpoint(Channel.class) : null; + CommChannel replyTo = message.getReplyTo() != null ? message.getReplyTo().getEndpoint(CommChannel.class) : null; long ctime = System.currentTimeMillis(); try { @@ -401,7 +408,7 @@ public final class JacobVPU { ((Runnable)target).run(); } if (replyTo != null) { - ((Synch)replyTo).ret(); + sendMessage(ClassUtil.createMessage(replyTo, ClassUtil.SYNCH_RET_METHOD_ACTION, null, null)); } } finally { ctime = System.currentTimeMillis() - ctime;
