calculus API added.
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/93755e41 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/93755e41 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/93755e41 Branch: refs/heads/master Commit: 93755e413d09eb34321be4818087350a7d2ea96c Parents: 2f863b2 Author: Tammo van Lessen <[email protected]> Authored: Thu Aug 15 00:51:34 2013 +0200 Committer: Tammo van Lessen <[email protected]> Committed: Thu Aug 15 00:51:34 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/ode/jacob/JacobThread.java | 69 +++++++++++++++++++- .../org/apache/ode/jacob/MessageChannel.java | 28 -------- .../org/apache/ode/jacob/MessageListener.java | 28 ++++++++ .../apache/ode/jacob/oo/ChannelListener.java | 4 +- .../org/apache/ode/jacob/soup/CommRecv.java | 7 +- .../soup/jackson/JacobTypeResolverBuilder.java | 5 ++ .../ode/jacob/vpu/ExecutionQueueImpl.java | 5 +- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 69 +++++++++++++++++++- 8 files changed, 177 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/JacobThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java b/src/main/java/org/apache/ode/jacob/JacobThread.java index 0c1d1c2..0b71422 100644 --- a/src/main/java/org/apache/ode/jacob/JacobThread.java +++ b/src/main/java/org/apache/ode/jacob/JacobThread.java @@ -22,6 +22,7 @@ import java.lang.reflect.Method; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; +import org.apache.ode.jacob.soup.CommChannel; /** * Class exposing the JACOB operations. @@ -33,8 +34,48 @@ import org.apache.ode.jacob.oo.ChannelListener; * scoping rules for channel names are simply the Java object visibility rules. */ public interface JacobThread { + public Object getExtension(Class<?> extensionClass); + + // calculus oriented API + + /** + * Create a new calculus channel. + * + * @param channelType + * @param creator + * @param description + * @return + */ + public CommChannel newCommChannel(Class<?> channelType, String creator, String description); + + /** + * DOCUMENT ME + * @param channel + * @return + */ + public String exportCommChannel(CommChannel channel); + /** + * DOCUMENT ME + * @param channel + * @return + */ + public CommChannel importCommChannel(String channelId, Class<?> channelType); + + /** + * Send a message. + * + * @param message + * self-contained message + */ + public void sendMessage(Message message); + + public void subscribe(boolean replicate, CommChannel channel, MessageListener methodList) throws IllegalArgumentException; + public void subscribe(boolean replicate, CommChannel channel, MessageListener[] methodList) throws IllegalArgumentException; + + // OO oriented API + public String exportChannel(Channel channel); public Channel importChannel(String channelId, Class<?> channelClass); @@ -66,7 +107,7 @@ public interface JacobThread { * <p> * Receive a message on a channel, allowing for possible replication. The * effect of this method is to register a listener (the method list) for a - * message on the channel to consume either one or an infinate number of + * message on the channel to consume either one or an infinite number of * messages on the channel (depending on the value of the * <code>replicate</code> argument. * </p> @@ -88,6 +129,32 @@ public interface JacobThread { * if the method list does not match the channel kind */ public void object(boolean replicate, ChannelListener methodList) throws IllegalArgumentException; + + /** + * <p> + * Receive a message on a channel, allowing for possible replication. The + * effect of this method is to register a list of listeners (the method list) for a + * message on the channel to consume either one or an infinite number of + * messages on the channel (depending on the value of the + * <code>replicate</code> argument. + * </p> + * + * <p> + * With respect to process terms, the Java expression <code>object(false, x, + * ChannelListener)</code> + * corresponds to the process term <code> x ? { ChannelListener[] }</code>; + * if in the same expression the initial <code>replicate</code> parameter + * were instead set to <code>true</code>, corresponding term would be + * <code> ! x ? { ChannelListener }</code>. + * </p> + * + * @param replicate + * if set the a replication operator is present + * @param methodList + * object representation of the method list + * @throws IllegalArgumentException + * if the method list does not match the channel kind + */ public void object(boolean reaplicate, ChannelListener[] methodLists) throws IllegalArgumentException; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/MessageChannel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/MessageChannel.java b/src/main/java/org/apache/ode/jacob/MessageChannel.java deleted file mode 100644 index bf7a80d..0000000 --- a/src/main/java/org/apache/ode/jacob/MessageChannel.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ode.jacob; - - -/** - * TODO: Document... - */ - -public interface MessageChannel { - void onMessage(Message msg); -} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/MessageListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/MessageListener.java b/src/main/java/org/apache/ode/jacob/MessageListener.java new file mode 100644 index 0000000..5e5282a --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/MessageListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob; + + +/** + * TODO: Document... + */ + +public interface MessageListener { + void onMessage(Message msg); +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java index b8998d8..d863f31 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java +++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java @@ -23,7 +23,7 @@ import java.lang.reflect.Method; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.Message; -import org.apache.ode.jacob.MessageChannel; +import org.apache.ode.jacob.MessageListener; /** @@ -31,7 +31,7 @@ import org.apache.ode.jacob.MessageChannel; * class <em>and</em> implement one <code>Channel</code> interface. */ @SuppressWarnings("serial") -public abstract class ChannelListener extends JacobObject implements MessageChannel { +public abstract class ChannelListener extends JacobObject implements MessageListener { public void onMessage(Message msg) { Method action = ClassUtil.findActionMethod(getImplementedMethods()).evaluate(msg, Method.class); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/soup/CommRecv.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/CommRecv.java b/src/main/java/org/apache/ode/jacob/soup/CommRecv.java index 2fc5757..c555f61 100644 --- a/src/main/java/org/apache/ode/jacob/soup/CommRecv.java +++ b/src/main/java/org/apache/ode/jacob/soup/CommRecv.java @@ -18,6 +18,7 @@ */ package org.apache.ode.jacob.soup; +import org.apache.ode.jacob.MessageListener; import org.apache.ode.jacob.oo.ChannelListener; /** @@ -29,12 +30,12 @@ import org.apache.ode.jacob.oo.ChannelListener; * replicated variety. */ public class CommRecv extends Comm { - private ChannelListener listener; + private MessageListener listener; protected CommRecv() { } - public CommRecv(CommChannel chnl, ChannelListener listener) { + public CommRecv(CommChannel chnl, MessageListener listener) { super(chnl); this.listener = listener; } @@ -48,7 +49,7 @@ public class CommRecv extends Comm { * * @return byte array representing the serialized form of the continuation */ - public ChannelListener getListener() { + public MessageListener getListener() { return listener; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java index 5bd3d22..c2e36ee 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java @@ -22,6 +22,7 @@ import java.util.Collection; import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.MessageListener; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelProxy; import org.apache.ode.jacob.soup.CommChannel; @@ -99,6 +100,10 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { return true; } + if (MessageListener.class.isAssignableFrom(t.getRawClass())) { + return true; + } + if (t.getRawClass() == Object.class || t.isArrayType()) { return true; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index 3384f71..c5a9e10 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -42,6 +42,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.ode.jacob.IndexedObject; import org.apache.ode.jacob.Message; +import org.apache.ode.jacob.MessageListener; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; import org.apache.ode.jacob.soup.Comm; @@ -566,13 +567,13 @@ public class ExecutionQueueImpl implements ExecutionQueue { protected static class ObjectFrame extends CommFrame implements Externalizable { private static final long serialVersionUID = -7212430608484116919L; - ChannelListener _continuation; + MessageListener _continuation; // Used for deserialization public ObjectFrame() { } - public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, ChannelListener continuation) { + public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, MessageListener continuation) { super(commGroupFrame, channelFrame); this._continuation = continuation; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 10f3631..cf47558 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -26,12 +26,12 @@ import java.util.Stack; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.JacobThread; import org.apache.ode.jacob.Message; +import org.apache.ode.jacob.MessageListener; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; import org.apache.ode.jacob.oo.ClassUtil; import org.apache.ode.jacob.oo.CompositeProcess; import org.apache.ode.jacob.oo.ReceiveProcess; -import org.apache.ode.jacob.oo.Synch; import org.apache.ode.jacob.soup.CommChannel; import org.apache.ode.jacob.soup.CommGroup; import org.apache.ode.jacob.soup.CommRecv; @@ -275,7 +275,6 @@ public final class JacobVPU { return replyChannel; } - //XXX: add to super interface as message-oriented API public void sendMessage(Message msg) { CommGroup grp = new CommGroup(false); CommSend send = new CommSend(msg); @@ -295,6 +294,17 @@ public final class JacobVPU { return ret; } + public CommChannel newCommChannel(Class<?> channelType, String creator, String description) { + CommChannel chnl = new CommChannel(channelType); + chnl.setDescription(description); + _executionQueue.add(chnl); + + LOG.trace(">> [{}] : new {}", _cycle, chnl); + + _statistics.channelsCreated++; + return chnl; + } + public String exportChannel(Channel channel) { LOG.trace(">> [{}] : export<{}>", _cycle, channel); @@ -302,11 +312,22 @@ public final class JacobVPU { return _executionQueue.createExport(chnl); } + public String exportCommChannel(CommChannel channel) { + LOG.trace(">> [{}] : export<{}>", _cycle, channel); + + return _executionQueue.createExport(channel); + } + + //XXX: check if channelType is really needed, could be get from cframe.getType() public Channel importChannel(String channelId, Class<?> channelType) { CommChannel cframe = _executionQueue.consumeExport(channelId); return ChannelFactory.createChannel(cframe, channelType); } + public CommChannel importCommChannel(String channelId, Class<?> channelType) { + return _executionQueue.consumeExport(channelId); + } + public void object(boolean replicate, ChannelListener[] ml) { if (LOG.isTraceEnabled()) { StringBuffer msg = new StringBuffer(); @@ -353,6 +374,50 @@ public final class JacobVPU { addCommChannel(grp, ml); _executionQueue.add(grp); } + + public void subscribe(boolean replicate, CommChannel channel, MessageListener listener) { + if (LOG.isTraceEnabled()) { + StringBuffer msg = new StringBuffer(); + msg.append(_cycle); + msg.append(": "); + msg.append(channel); + msg.append(" ? "); + msg.append(listener.toString()); + LOG.trace(msg.toString()); + } + + _statistics.numContinuations++; + + CommGroup grp = new CommGroup(replicate); + CommRecv recv = new CommRecv(channel, listener); + grp.add(recv); + + _executionQueue.add(grp); + } + + public void subscribe(boolean replicate, CommChannel channel, MessageListener listeners[]) { + if (LOG.isTraceEnabled()) { + StringBuffer msg = new StringBuffer(); + msg.append(_cycle); + msg.append(": "); + for (int i = 0; i < listeners.length; ++i) { + if (i != 0) msg.append(" + "); + msg.append(channel); + msg.append(" ? "); + msg.append(listeners[i].toString()); + } + LOG.debug(msg.toString()); + } + + _statistics.numContinuations++; + + CommGroup grp = new CommGroup(replicate); + for (int i = 0; i < listeners.length; ++i) { + CommRecv recv = new CommRecv(channel, listeners[i]); + grp.add(recv); + } + _executionQueue.add(grp); + } private void addCommChannel(CommGroup group, ChannelListener receiver) { if (receiver instanceof CompositeProcess) {
