Updated Branches:
  refs/heads/master 1559bfe07 -> ba539a12b

Use messages instead of reflection for passing values accross Channel(s)


Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/a8339de5
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/a8339de5
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/a8339de5

Branch: refs/heads/master
Commit: a8339de5ea820e38430e8d32935a7b0db5777cba
Parents: 1559bfe
Author: Hadrian Zbarcea <[email protected]>
Authored: Tue Jul 30 13:56:15 2013 -0400
Committer: Hadrian Zbarcea <[email protected]>
Committed: Tue Jul 30 13:56:15 2013 -0400

----------------------------------------------------------------------
 src/main/java/org/apache/ode/jacob/Message.java | 74 ++++++++++++++++++++
 .../org/apache/ode/jacob/MessageChannel.java    | 28 ++++++++
 .../java/org/apache/ode/jacob/MessageType.java  | 27 +++++++
 .../apache/ode/jacob/oo/ChannelListener.java    | 29 +++++++-
 .../java/org/apache/ode/jacob/oo/ClassUtil.java |  6 ++
 .../org/apache/ode/jacob/oo/MessageHandler.java | 42 +++++++++++
 .../java/org/apache/ode/jacob/oo/Synch.java     |  6 +-
 src/main/java/org/apache/ode/jacob/oo/Val.java  |  5 +-
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 17 ++++-
 .../apache/ode/jacob/examples/cell/Cell.java    |  8 ++-
 .../eratosthenes/NaturalNumberStream.java       |  5 +-
 .../jacob/examples/helloworld/HelloWorld.java   |  7 +-
 .../ode/jacob/examples/synch/SynchPrint.java    |  5 +-
 .../jacob/vpu/ProxyConstructorTimingTest.java   |  5 +-
 14 files changed, 251 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/Message.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/Message.java 
b/src/main/java/org/apache/ode/jacob/Message.java
new file mode 100644
index 0000000..0eb9024
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/Message.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * TODO: Document...
+ * TODO: should anything be final here? the class itself?
+ */
+
+public final class Message {
+       private Class<? extends MessageType> type;
+       private MessageChannel reply;
+       private Map<String, Object> headers;
+       private Object body;
+
+       public Message() {
+               // TODO: do we always need headers?
+               headers = new ConcurrentHashMap<String, Object>();
+       }
+       public Message(Class<? extends MessageType> type) {
+               this();
+               this.type = type;
+       }
+
+       public Class<? extends MessageType> getType() {
+               return type;
+       }
+       public void setType(Class<? extends MessageType> type) {
+               this.type = type;
+       }
+       public MessageChannel getReply() {
+               return reply;
+       }
+       public void setReply(MessageChannel reply) {
+               this.reply = reply;
+       }
+       public Map<String, Object> getHeaders() {
+               return headers;
+       }
+       public void setHeaders(Map<String, Object> headers) {
+               this.headers = headers;
+       }
+       public Object getBody() {
+               return body;
+       }
+       public void setBody(Object body) {
+               this.body = body;
+       }
+
+       public boolean containsHeader(String header) {
+               return headers.containsKey(header);
+       }
+       // TODO: add any other convenience methods like addHeader, 
removeHeader? 
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/MessageChannel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/MessageChannel.java 
b/src/main/java/org/apache/ode/jacob/MessageChannel.java
new file mode 100644
index 0000000..bf7a80d
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/MessageChannel.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+
+/**
+ * TODO: Document...
+ */
+
+public interface MessageChannel {
+       void onMessage(Message msg);
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/MessageType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/MessageType.java 
b/src/main/java/org/apache/ode/jacob/MessageType.java
new file mode 100644
index 0000000..a1e4fe5
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/MessageType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+
+/**
+ * TODO: Document...
+ */
+
+public interface MessageType {
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java 
b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
index a8df5f9..0c607c2 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
@@ -19,7 +19,14 @@
 package org.apache.ode.jacob.oo;
 
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Set;
+
 import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.MessageChannel;
+import org.apache.ode.jacob.MessageType;
 
 
 /**
@@ -27,5 +34,25 @@ import org.apache.ode.jacob.JacobObject;
  * class <em>and</em> implement one <code>Channel</code> interface.
  */
 @SuppressWarnings("serial")
-public abstract class ChannelListener extends JacobObject {
+public abstract class ChannelListener extends JacobObject implements 
MessageChannel {
+
+       public void onMessage(Message msg) {
+               Class<? extends MessageType> type = msg.getType();
+
+               Set<Method> methods = this.getImplementedMethods();
+               for (Method m : methods) {
+                       if (type != null && 
type.equals(ClassUtil.getMessageType(m))) {
+                               if (this instanceof ReceiveProcess) {
+                                       try {
+                                               
m.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody());
+                                       } catch (Exception e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                               }
+                               break;
+                       }
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java 
b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
index a7bbd03..4e5c59c 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.ode.jacob.MessageType;
+
 
 public final class ClassUtil {
     public static final Method RUN_METHOD;
@@ -41,6 +43,10 @@ public final class ClassUtil {
         // Utility class
     }
 
+    public static Class<? extends MessageType> getMessageType(Method 
channelMethod) {
+       MessageHandler handler = 
channelMethod.getAnnotation(MessageHandler.class);
+       return handler == null ? null : handler.value();
+    }
     public static Set<Method> runMethodSet() {
        return RUN_METHOD_SET;
     }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java 
b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
new file mode 100644
index 0000000..b2bed2d
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.ode.jacob.MessageType;
+
+
+/**
+ * Marks a {@link Channel} method as handling a {@link Message}
+ *  of a certaing {@link MessageType}
+ *  
+ *  @see Message#getType()
+ */
+
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target({ElementType.METHOD})
+public @interface MessageHandler {
+       Class<? extends MessageType> value();
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/Synch.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/Synch.java 
b/src/main/java/org/apache/ode/jacob/oo/Synch.java
index 71a04ab..057cc9a 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Synch.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Synch.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ode.jacob.oo;
 
+import org.apache.ode.jacob.MessageType;
+
 
 
 /**
@@ -25,11 +27,11 @@ package org.apache.ode.jacob.oo;
  * <p>
  * It is the only allowable return type (other than "void") for JACOB objects.
  *
- * @author Maciej Szefler <a href="mailto:[email protected]";>mbs</a>
  */
 
 public interface Synch extends Channel {
+       public interface RetMessage extends MessageType {}
 
-       public void ret();
+       @MessageHandler(RetMessage.class) public void ret();
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/Val.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/Val.java 
b/src/main/java/org/apache/ode/jacob/oo/Val.java
index 13fb0ec..caff565 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Val.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Val.java
@@ -18,13 +18,16 @@
  */
 package org.apache.ode.jacob.oo;
 
+import org.apache.ode.jacob.MessageType;
+
 
 
 /**
  * Generic return-value channel type.
  */
 public interface Val extends Channel {
+       public interface ValMessage extends MessageType {}
 
-       public void val(Object retVal);
+       @MessageHandler(ValMessage.class) public void val(Object retVal);
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index f9eddf8..7857f47 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -26,6 +26,8 @@ import java.util.Stack;
 
 import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.JacobThread;
+import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.MessageType;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
 import org.apache.ode.jacob.oo.ClassUtil;
@@ -425,11 +427,19 @@ public final class JacobVPU {
             stackThread();
             long ctime = System.currentTimeMillis();
             try {
-                _method.invoke(_methodBody instanceof ReceiveProcess ? 
-                    ((ReceiveProcess)_methodBody).getReceiver() : _methodBody, 
args);
+               if (_methodBody instanceof ReceiveProcess) {
+                       Message msg = new 
Message(ClassUtil.getMessageType(_method));
+                       msg.setBody(args);
+
+                       ((ReceiveProcess)_methodBody).onMessage(msg);
+                       // 
_method.invoke(((ReceiveProcess)_methodBody).getReceiver(), args);
+               } else {
+                       ((Runnable)_methodBody).run();
+               }
                 if (synchChannel != null) {
                     synchChannel.ret();
                 }
+/*
             } catch (IllegalAccessException iae) {
                 throw new RuntimeException("MethodNotAccessible: " + 
_method.getName() + " in " + _method.getDeclaringClass().getName(), iae);
             } catch (InvocationTargetException e) {
@@ -437,7 +447,8 @@ public final class JacobVPU {
                 throw (target instanceof RuntimeException)
                     ? (RuntimeException) target
                     : new RuntimeException("ClientMethodException: " + 
_method.getName() + " in " + _methodBody.getClass().getName(), target);
-            } finally {
+*/
+                       } finally {
                 ctime = System.currentTimeMillis() - ctime;
                 _statistics.totalClientTimeMs += ctime;
                 unstackThread();

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java 
b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
index 0bffb85..6fb53fc 100644
--- a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
+++ b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
@@ -19,24 +19,28 @@
 package org.apache.ode.jacob.examples.cell;
 
 
+import org.apache.ode.jacob.MessageType;
 import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
 import org.apache.ode.jacob.oo.Val;
 
 /**
  * Channel type for a cell. The channel allows reading of and setting the 
values of a cell.
  */
 public interface Cell extends Channel {
+       public interface ReadMessage extends MessageType {}
+       public interface WriteMessage extends MessageType {}
 
     /**
      * Read the value of the cell.
      * @param replyTo channel to which the value of the cell is sent
      */
-    public void read(Val replyTo);
+       @MessageHandler(ReadMessage.class) public void read(Val replyTo);
 
     /**
      * Write the value of the cell.
      * @param newVal new value of the cell
      */
-    public void write(Object newVal);
+       @MessageHandler(WriteMessage.class) public void write(Object newVal);
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
 
b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
index 051a451..0ab7844 100644
--- 
a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
+++ 
b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
@@ -19,7 +19,9 @@
 package org.apache.ode.jacob.examples.eratosthenes;
 
 
+import org.apache.ode.jacob.MessageType;
 import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
 import org.apache.ode.jacob.oo.Synch;
 
 /**
@@ -30,7 +32,8 @@ import org.apache.ode.jacob.oo.Synch;
  * @author Maciej Szefler <a href="mailto:[email protected]";>mbs</a>
  */
 public interface NaturalNumberStream extends Channel {
+       public interface ValMessage extends MessageType {}
 
-       public void val(int n, Synch ret);
+       @MessageHandler(ValMessage.class) public void val(int n, Synch ret);
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java 
b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 92dc03c..37a0fec 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -19,8 +19,10 @@
 package org.apache.ode.jacob.examples.helloworld;
 
 import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.MessageType;
 import org.apache.ode.jacob.examples.sequence.Sequence;
 import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
 import org.apache.ode.jacob.oo.ReceiveProcess;
 import org.apache.ode.jacob.oo.Synch;
 import org.apache.ode.jacob.oo.Val;
@@ -47,7 +49,10 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 public class HelloWorld extends JacobObject implements Runnable {
 
     public static interface Callback<T, R extends Channel> extends Channel {
-        public void invoke(T value, R callback);
+       public interface CallbackMessage extends MessageType {}
+
+        @MessageHandler(CallbackMessage.class) public void invoke(T value, R 
callback);
+
     }
 
     static class ReliablePrinterProcess extends JacobObject implements 
Runnable {

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java 
b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
index 54eda1f..4da9319 100644
--- a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
+++ b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
@@ -18,7 +18,9 @@
  */
 package org.apache.ode.jacob.examples.synch;
 
+import org.apache.ode.jacob.MessageType;
 import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
 import org.apache.ode.jacob.oo.Synch;
 
 /**
@@ -28,7 +30,8 @@ import org.apache.ode.jacob.oo.Synch;
  * @author Maciej Szefler <a href="mailto:[email protected]";>mbs</a>
  */
 public interface SynchPrint extends Channel {
+       public interface SynchPrintMessage extends MessageType {}
 
-       public Synch print(String msg);
+       @MessageHandler(SynchPrintMessage.class) public Synch print(String msg);
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java 
b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
index d1284dc..cb75def 100644
--- a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
+++ b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
@@ -24,8 +24,10 @@ import java.lang.reflect.Proxy;
 
 import junit.framework.TestCase;
 
+import org.apache.ode.jacob.MessageType;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelProxy;
+import org.apache.ode.jacob.oo.MessageHandler;
 
 
 public class ProxyConstructorTimingTest extends TestCase {
@@ -102,7 +104,8 @@ public class ProxyConstructorTimingTest extends TestCase {
     }
     
     public interface Greeter extends Channel {
-        String hello(String name);
+       public interface HelloMessage extends MessageType {}
+       @MessageHandler(HelloMessage.class) String hello(String name);
     }
     
     @SuppressWarnings("serial")

Reply via email to