calculus API added.

Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/93755e41
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/93755e41
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/93755e41

Branch: refs/heads/master
Commit: 93755e413d09eb34321be4818087350a7d2ea96c
Parents: 2f863b2
Author: Tammo van Lessen <[email protected]>
Authored: Thu Aug 15 00:51:34 2013 +0200
Committer: Tammo van Lessen <[email protected]>
Committed: Thu Aug 15 00:51:34 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/ode/jacob/JacobThread.java  | 69 +++++++++++++++++++-
 .../org/apache/ode/jacob/MessageChannel.java    | 28 --------
 .../org/apache/ode/jacob/MessageListener.java   | 28 ++++++++
 .../apache/ode/jacob/oo/ChannelListener.java    |  4 +-
 .../org/apache/ode/jacob/soup/CommRecv.java     |  7 +-
 .../soup/jackson/JacobTypeResolverBuilder.java  |  5 ++
 .../ode/jacob/vpu/ExecutionQueueImpl.java       |  5 +-
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 69 +++++++++++++++++++-
 8 files changed, 177 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/JacobThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java 
b/src/main/java/org/apache/ode/jacob/JacobThread.java
index 0c1d1c2..0b71422 100644
--- a/src/main/java/org/apache/ode/jacob/JacobThread.java
+++ b/src/main/java/org/apache/ode/jacob/JacobThread.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
 
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
+import org.apache.ode.jacob.soup.CommChannel;
 
 /**
  * Class exposing the JACOB operations.
@@ -33,8 +34,48 @@ import org.apache.ode.jacob.oo.ChannelListener;
  * scoping rules for channel names are simply the Java object visibility rules.
  */
 public interface JacobThread {
+
     public Object getExtension(Class<?> extensionClass);
+    
+    // calculus oriented API
+    
+    /**
+     * Create a new calculus channel.
+     * 
+     * @param channelType
+     * @param creator
+     * @param description
+     * @return
+     */
+    public CommChannel newCommChannel(Class<?> channelType, String creator, 
String description);
+    
+    /**
+     * DOCUMENT ME
+     * @param channel
+     * @return
+     */
+    public String exportCommChannel(CommChannel channel);
 
+    /**
+     * DOCUMENT ME
+     * @param channel
+     * @return
+     */
+    public CommChannel importCommChannel(String channelId, Class<?> 
channelType);
+    
+    /**
+     * Send a message. 
+     *
+     * @param message
+     *            self-contained message
+     */
+    public void sendMessage(Message message);
+
+    public void subscribe(boolean replicate, CommChannel channel, 
MessageListener methodList) throws IllegalArgumentException;
+    public void subscribe(boolean replicate, CommChannel channel, 
MessageListener[] methodList) throws IllegalArgumentException;
+
+    // OO oriented API
+    
     public String exportChannel(Channel channel);
 
     public Channel importChannel(String channelId, Class<?> channelClass);
@@ -66,7 +107,7 @@ public interface JacobThread {
      * <p>
      * Receive a message on a channel, allowing for possible replication. The
      * effect of this method is to register a listener (the method list) for a
-     * message on the channel to consume either one or an infinate number of
+     * message on the channel to consume either one or an infinite number of
      * messages on the channel (depending on the value of the
      * <code>replicate</code> argument.
      * </p>
@@ -88,6 +129,32 @@ public interface JacobThread {
      *             if the method list does not match the channel kind
      */
     public void object(boolean replicate, ChannelListener methodList) throws 
IllegalArgumentException;
+
+    /**
+     * <p>
+     * Receive a message on a channel, allowing for possible replication. The
+     * effect of this method is to register a list of listeners (the method 
list) for a
+     * message on the channel to consume either one or an infinite number of
+     * messages on the channel (depending on the value of the
+     * <code>replicate</code> argument.
+     * </p>
+     *
+     * <p>
+     * With respect to process terms, the Java expression <code>object(false, 
x,
+     * ChannelListener)</code>
+     * corresponds to the process term <code> x ? { ChannelListener[] }</code>;
+     * if in the same expression the initial <code>replicate</code> parameter
+     * were instead set to <code>true</code>, corresponding term would be
+     * <code> ! x ? { ChannelListener }</code>.
+     * </p>
+     *
+     * @param replicate
+     *            if set the a replication operator is present
+     * @param methodList
+     *            object representation of the method list
+     * @throws IllegalArgumentException
+     *             if the method list does not match the channel kind
+     */
     public void object(boolean reaplicate, ChannelListener[] methodLists) 
throws IllegalArgumentException;
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/MessageChannel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/MessageChannel.java 
b/src/main/java/org/apache/ode/jacob/MessageChannel.java
deleted file mode 100644
index bf7a80d..0000000
--- a/src/main/java/org/apache/ode/jacob/MessageChannel.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.jacob;
-
-
-/**
- * TODO: Document...
- */
-
-public interface MessageChannel {
-       void onMessage(Message msg);
-}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/MessageListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/MessageListener.java 
b/src/main/java/org/apache/ode/jacob/MessageListener.java
new file mode 100644
index 0000000..5e5282a
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/MessageListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+
+/**
+ * TODO: Document...
+ */
+
+public interface MessageListener {
+       void onMessage(Message msg);
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java 
b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
index b8998d8..d863f31 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
@@ -23,7 +23,7 @@ import java.lang.reflect.Method;
 
 import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.Message;
-import org.apache.ode.jacob.MessageChannel;
+import org.apache.ode.jacob.MessageListener;
 
 
 /**
@@ -31,7 +31,7 @@ import org.apache.ode.jacob.MessageChannel;
  * class <em>and</em> implement one <code>Channel</code> interface.
  */
 @SuppressWarnings("serial")
-public abstract class ChannelListener extends JacobObject implements 
MessageChannel {
+public abstract class ChannelListener extends JacobObject implements 
MessageListener {
 
        public void onMessage(Message msg) {
                Method action = 
ClassUtil.findActionMethod(getImplementedMethods()).evaluate(msg, Method.class);

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/soup/CommRecv.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/CommRecv.java 
b/src/main/java/org/apache/ode/jacob/soup/CommRecv.java
index 2fc5757..c555f61 100644
--- a/src/main/java/org/apache/ode/jacob/soup/CommRecv.java
+++ b/src/main/java/org/apache/ode/jacob/soup/CommRecv.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ode.jacob.soup;
 
+import org.apache.ode.jacob.MessageListener;
 import org.apache.ode.jacob.oo.ChannelListener;
 
 /**
@@ -29,12 +30,12 @@ import org.apache.ode.jacob.oo.ChannelListener;
  * replicated variety.
  */
 public class CommRecv extends Comm {
-    private ChannelListener listener;
+    private MessageListener listener;
 
     protected CommRecv() {
     }
 
-    public CommRecv(CommChannel chnl, ChannelListener listener) {
+    public CommRecv(CommChannel chnl, MessageListener listener) {
         super(chnl);
         this.listener = listener;
     }
@@ -48,7 +49,7 @@ public class CommRecv extends Comm {
      *
      * @return byte array representing the serialized form of the continuation
      */
-    public ChannelListener getListener() {
+    public MessageListener getListener() {
         return listener;
     }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java 
b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
index 5bd3d22..c2e36ee 100644
--- 
a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
+++ 
b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 
 import org.apache.ode.jacob.ChannelRef;
 import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.MessageListener;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelProxy;
 import org.apache.ode.jacob.soup.CommChannel;
@@ -99,6 +100,10 @@ public class JacobTypeResolverBuilder extends 
StdTypeResolverBuilder {
             return true;
         }
 
+        if (MessageListener.class.isAssignableFrom(t.getRawClass()))  {
+            return true;
+        }
+
         if (t.getRawClass() == Object.class || t.isArrayType()) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java 
b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index 3384f71..c5a9e10 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -42,6 +42,7 @@ import java.util.zip.GZIPOutputStream;
 
 import org.apache.ode.jacob.IndexedObject;
 import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.MessageListener;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
 import org.apache.ode.jacob.soup.Comm;
@@ -566,13 +567,13 @@ public class ExecutionQueueImpl implements ExecutionQueue 
{
     protected static class ObjectFrame extends CommFrame implements 
Externalizable {
         private static final long serialVersionUID = -7212430608484116919L;
 
-        ChannelListener _continuation;
+        MessageListener _continuation;
 
         // Used for deserialization
         public ObjectFrame() {
         }
 
-        public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame 
channelFrame, ChannelListener continuation) {
+        public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame 
channelFrame, MessageListener continuation) {
             super(commGroupFrame, channelFrame);
             this._continuation = continuation;
         }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/93755e41/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 10f3631..cf47558 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -26,12 +26,12 @@ import java.util.Stack;
 import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.JacobThread;
 import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.MessageListener;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
 import org.apache.ode.jacob.oo.ClassUtil;
 import org.apache.ode.jacob.oo.CompositeProcess;
 import org.apache.ode.jacob.oo.ReceiveProcess;
-import org.apache.ode.jacob.oo.Synch;
 import org.apache.ode.jacob.soup.CommChannel;
 import org.apache.ode.jacob.soup.CommGroup;
 import org.apache.ode.jacob.soup.CommRecv;
@@ -275,7 +275,6 @@ public final class JacobVPU {
             return replyChannel;
         }
         
-        //XXX: add to super interface as message-oriented API
         public void sendMessage(Message msg) {
             CommGroup grp = new CommGroup(false);
             CommSend send = new CommSend(msg);
@@ -295,6 +294,17 @@ public final class JacobVPU {
             return ret;
         }
 
+        public CommChannel newCommChannel(Class<?> channelType, String 
creator, String description) {
+            CommChannel chnl = new CommChannel(channelType);
+            chnl.setDescription(description);
+            _executionQueue.add(chnl);
+
+            LOG.trace(">> [{}] : new {}", _cycle, chnl);
+
+            _statistics.channelsCreated++;
+            return chnl;
+        }
+        
         public String exportChannel(Channel channel) {
             LOG.trace(">> [{}] : export<{}>", _cycle, channel);
 
@@ -302,11 +312,22 @@ public final class JacobVPU {
             return _executionQueue.createExport(chnl);
         }
 
+        public String exportCommChannel(CommChannel channel) {
+            LOG.trace(">> [{}] : export<{}>", _cycle, channel);
+
+            return _executionQueue.createExport(channel);
+        }
+
+        //XXX: check if channelType is really needed, could be get from 
cframe.getType()
         public Channel importChannel(String channelId, Class<?> channelType) {
             CommChannel cframe = _executionQueue.consumeExport(channelId);
             return ChannelFactory.createChannel(cframe, channelType);
         }
 
+        public CommChannel importCommChannel(String channelId, Class<?> 
channelType) {
+            return _executionQueue.consumeExport(channelId);
+        }
+        
         public void object(boolean replicate, ChannelListener[] ml) {
             if (LOG.isTraceEnabled()) {
                 StringBuffer msg = new StringBuffer();
@@ -353,6 +374,50 @@ public final class JacobVPU {
             addCommChannel(grp, ml);
             _executionQueue.add(grp);
         }
+        
+        public void subscribe(boolean replicate, CommChannel channel, 
MessageListener listener) {
+            if (LOG.isTraceEnabled()) {
+                StringBuffer msg = new StringBuffer();
+                msg.append(_cycle);
+                msg.append(": ");
+                msg.append(channel);
+                msg.append(" ? ");
+                msg.append(listener.toString());
+                LOG.trace(msg.toString());
+            }
+
+            _statistics.numContinuations++;
+
+            CommGroup grp = new CommGroup(replicate);
+            CommRecv recv = new CommRecv(channel, listener);
+            grp.add(recv);
+
+            _executionQueue.add(grp);
+        }
+
+        public void subscribe(boolean replicate, CommChannel channel, 
MessageListener listeners[]) {
+            if (LOG.isTraceEnabled()) {
+                StringBuffer msg = new StringBuffer();
+                msg.append(_cycle);
+                msg.append(": ");
+                for (int i = 0; i < listeners.length; ++i) {
+                    if (i != 0) msg.append(" + ");
+                    msg.append(channel);
+                    msg.append(" ? ");
+                    msg.append(listeners[i].toString());
+                }
+                LOG.debug(msg.toString());
+            }
+
+            _statistics.numContinuations++;
+
+            CommGroup grp = new CommGroup(replicate);
+            for (int i = 0; i < listeners.length; ++i) {
+                CommRecv recv = new CommRecv(channel, listeners[i]);
+                grp.add(recv);
+            }
+            _executionQueue.add(grp);
+        }
 
         private void addCommChannel(CommGroup group, ChannelListener receiver) 
{
             if (receiver instanceof CompositeProcess) {

Reply via email to