Updated Branches: refs/heads/master 1559bfe07 -> ba539a12b
Use messages instead of reflection for passing values accross Channel(s) Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/a8339de5 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/a8339de5 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/a8339de5 Branch: refs/heads/master Commit: a8339de5ea820e38430e8d32935a7b0db5777cba Parents: 1559bfe Author: Hadrian Zbarcea <[email protected]> Authored: Tue Jul 30 13:56:15 2013 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Tue Jul 30 13:56:15 2013 -0400 ---------------------------------------------------------------------- src/main/java/org/apache/ode/jacob/Message.java | 74 ++++++++++++++++++++ .../org/apache/ode/jacob/MessageChannel.java | 28 ++++++++ .../java/org/apache/ode/jacob/MessageType.java | 27 +++++++ .../apache/ode/jacob/oo/ChannelListener.java | 29 +++++++- .../java/org/apache/ode/jacob/oo/ClassUtil.java | 6 ++ .../org/apache/ode/jacob/oo/MessageHandler.java | 42 +++++++++++ .../java/org/apache/ode/jacob/oo/Synch.java | 6 +- src/main/java/org/apache/ode/jacob/oo/Val.java | 5 +- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 17 ++++- .../apache/ode/jacob/examples/cell/Cell.java | 8 ++- .../eratosthenes/NaturalNumberStream.java | 5 +- .../jacob/examples/helloworld/HelloWorld.java | 7 +- .../ode/jacob/examples/synch/SynchPrint.java | 5 +- .../jacob/vpu/ProxyConstructorTimingTest.java | 5 +- 14 files changed, 251 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/Message.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java new file mode 100644 index 0000000..0eb9024 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/Message.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * TODO: Document... + * TODO: should anything be final here? the class itself? + */ + +public final class Message { + private Class<? extends MessageType> type; + private MessageChannel reply; + private Map<String, Object> headers; + private Object body; + + public Message() { + // TODO: do we always need headers? + headers = new ConcurrentHashMap<String, Object>(); + } + public Message(Class<? extends MessageType> type) { + this(); + this.type = type; + } + + public Class<? extends MessageType> getType() { + return type; + } + public void setType(Class<? extends MessageType> type) { + this.type = type; + } + public MessageChannel getReply() { + return reply; + } + public void setReply(MessageChannel reply) { + this.reply = reply; + } + public Map<String, Object> getHeaders() { + return headers; + } + public void setHeaders(Map<String, Object> headers) { + this.headers = headers; + } + public Object getBody() { + return body; + } + public void setBody(Object body) { + this.body = body; + } + + public boolean containsHeader(String header) { + return headers.containsKey(header); + } + // TODO: add any other convenience methods like addHeader, removeHeader? +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/MessageChannel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/MessageChannel.java b/src/main/java/org/apache/ode/jacob/MessageChannel.java new file mode 100644 index 0000000..bf7a80d --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/MessageChannel.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob; + + +/** + * TODO: Document... + */ + +public interface MessageChannel { + void onMessage(Message msg); +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/MessageType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/MessageType.java b/src/main/java/org/apache/ode/jacob/MessageType.java new file mode 100644 index 0000000..a1e4fe5 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/MessageType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob; + + +/** + * TODO: Document... + */ + +public interface MessageType { +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java index a8df5f9..0c607c2 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java +++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java @@ -19,7 +19,14 @@ package org.apache.ode.jacob.oo; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Set; + import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.Message; +import org.apache.ode.jacob.MessageChannel; +import org.apache.ode.jacob.MessageType; /** @@ -27,5 +34,25 @@ import org.apache.ode.jacob.JacobObject; * class <em>and</em> implement one <code>Channel</code> interface. */ @SuppressWarnings("serial") -public abstract class ChannelListener extends JacobObject { +public abstract class ChannelListener extends JacobObject implements MessageChannel { + + public void onMessage(Message msg) { + Class<? extends MessageType> type = msg.getType(); + + Set<Method> methods = this.getImplementedMethods(); + for (Method m : methods) { + if (type != null && type.equals(ClassUtil.getMessageType(m))) { + if (this instanceof ReceiveProcess) { + try { + m.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody()); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + break; + } + } + } + } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java index a7bbd03..4e5c59c 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java @@ -22,6 +22,8 @@ import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; +import org.apache.ode.jacob.MessageType; + public final class ClassUtil { public static final Method RUN_METHOD; @@ -41,6 +43,10 @@ public final class ClassUtil { // Utility class } + public static Class<? extends MessageType> getMessageType(Method channelMethod) { + MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class); + return handler == null ? null : handler.value(); + } public static Set<Method> runMethodSet() { return RUN_METHOD_SET; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java new file mode 100644 index 0000000..b2bed2d --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.oo; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.apache.ode.jacob.MessageType; + + +/** + * Marks a {@link Channel} method as handling a {@link Message} + * of a certaing {@link MessageType} + * + * @see Message#getType() + */ + +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ElementType.METHOD}) +public @interface MessageHandler { + Class<? extends MessageType> value(); +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/Synch.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/Synch.java b/src/main/java/org/apache/ode/jacob/oo/Synch.java index 71a04ab..057cc9a 100644 --- a/src/main/java/org/apache/ode/jacob/oo/Synch.java +++ b/src/main/java/org/apache/ode/jacob/oo/Synch.java @@ -18,6 +18,8 @@ */ package org.apache.ode.jacob.oo; +import org.apache.ode.jacob.MessageType; + /** @@ -25,11 +27,11 @@ package org.apache.ode.jacob.oo; * <p> * It is the only allowable return type (other than "void") for JACOB objects. * - * @author Maciej Szefler <a href="mailto:[email protected]">mbs</a> */ public interface Synch extends Channel { + public interface RetMessage extends MessageType {} - public void ret(); + @MessageHandler(RetMessage.class) public void ret(); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/Val.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/Val.java b/src/main/java/org/apache/ode/jacob/oo/Val.java index 13fb0ec..caff565 100644 --- a/src/main/java/org/apache/ode/jacob/oo/Val.java +++ b/src/main/java/org/apache/ode/jacob/oo/Val.java @@ -18,13 +18,16 @@ */ package org.apache.ode.jacob.oo; +import org.apache.ode.jacob.MessageType; + /** * Generic return-value channel type. */ public interface Val extends Channel { + public interface ValMessage extends MessageType {} - public void val(Object retVal); + @MessageHandler(ValMessage.class) public void val(Object retVal); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index f9eddf8..7857f47 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -26,6 +26,8 @@ import java.util.Stack; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.JacobThread; +import org.apache.ode.jacob.Message; +import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; import org.apache.ode.jacob.oo.ClassUtil; @@ -425,11 +427,19 @@ public final class JacobVPU { stackThread(); long ctime = System.currentTimeMillis(); try { - _method.invoke(_methodBody instanceof ReceiveProcess ? - ((ReceiveProcess)_methodBody).getReceiver() : _methodBody, args); + if (_methodBody instanceof ReceiveProcess) { + Message msg = new Message(ClassUtil.getMessageType(_method)); + msg.setBody(args); + + ((ReceiveProcess)_methodBody).onMessage(msg); + // _method.invoke(((ReceiveProcess)_methodBody).getReceiver(), args); + } else { + ((Runnable)_methodBody).run(); + } if (synchChannel != null) { synchChannel.ret(); } +/* } catch (IllegalAccessException iae) { throw new RuntimeException("MethodNotAccessible: " + _method.getName() + " in " + _method.getDeclaringClass().getName(), iae); } catch (InvocationTargetException e) { @@ -437,7 +447,8 @@ public final class JacobVPU { throw (target instanceof RuntimeException) ? (RuntimeException) target : new RuntimeException("ClientMethodException: " + _method.getName() + " in " + _methodBody.getClass().getName(), target); - } finally { +*/ + } finally { ctime = System.currentTimeMillis() - ctime; _statistics.totalClientTimeMs += ctime; unstackThread(); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java index 0bffb85..6fb53fc 100644 --- a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java +++ b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java @@ -19,24 +19,28 @@ package org.apache.ode.jacob.examples.cell; +import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; +import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.Val; /** * Channel type for a cell. The channel allows reading of and setting the values of a cell. */ public interface Cell extends Channel { + public interface ReadMessage extends MessageType {} + public interface WriteMessage extends MessageType {} /** * Read the value of the cell. * @param replyTo channel to which the value of the cell is sent */ - public void read(Val replyTo); + @MessageHandler(ReadMessage.class) public void read(Val replyTo); /** * Write the value of the cell. * @param newVal new value of the cell */ - public void write(Object newVal); + @MessageHandler(WriteMessage.class) public void write(Object newVal); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java index 051a451..0ab7844 100644 --- a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java +++ b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java @@ -19,7 +19,9 @@ package org.apache.ode.jacob.examples.eratosthenes; +import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; +import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.Synch; /** @@ -30,7 +32,8 @@ import org.apache.ode.jacob.oo.Synch; * @author Maciej Szefler <a href="mailto:[email protected]">mbs</a> */ public interface NaturalNumberStream extends Channel { + public interface ValMessage extends MessageType {} - public void val(int n, Synch ret); + @MessageHandler(ValMessage.class) public void val(int n, Synch ret); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 92dc03c..37a0fec 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -19,8 +19,10 @@ package org.apache.ode.jacob.examples.helloworld; import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.examples.sequence.Sequence; import org.apache.ode.jacob.oo.Channel; +import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.ReceiveProcess; import org.apache.ode.jacob.oo.Synch; import org.apache.ode.jacob.oo.Val; @@ -47,7 +49,10 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; public class HelloWorld extends JacobObject implements Runnable { public static interface Callback<T, R extends Channel> extends Channel { - public void invoke(T value, R callback); + public interface CallbackMessage extends MessageType {} + + @MessageHandler(CallbackMessage.class) public void invoke(T value, R callback); + } static class ReliablePrinterProcess extends JacobObject implements Runnable { http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java index 54eda1f..4da9319 100644 --- a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java +++ b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java @@ -18,7 +18,9 @@ */ package org.apache.ode.jacob.examples.synch; +import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; +import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.Synch; /** @@ -28,7 +30,8 @@ import org.apache.ode.jacob.oo.Synch; * @author Maciej Szefler <a href="mailto:[email protected]">mbs</a> */ public interface SynchPrint extends Channel { + public interface SynchPrintMessage extends MessageType {} - public Synch print(String msg); + @MessageHandler(SynchPrintMessage.class) public Synch print(String msg); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java index d1284dc..cb75def 100644 --- a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java +++ b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java @@ -24,8 +24,10 @@ import java.lang.reflect.Proxy; import junit.framework.TestCase; +import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelProxy; +import org.apache.ode.jacob.oo.MessageHandler; public class ProxyConstructorTimingTest extends TestCase { @@ -102,7 +104,8 @@ public class ProxyConstructorTimingTest extends TestCase { } public interface Greeter extends Channel { - String hello(String name); + public interface HelloMessage extends MessageType {} + @MessageHandler(HelloMessage.class) String hello(String name); } @SuppressWarnings("serial")
