Refactored the Message type added a simpler solution for defining an action, removed MessageType
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/77ea8118 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/77ea8118 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/77ea8118 Branch: refs/heads/master Commit: 77ea81180e5456d52434aca2d64378e33435f5d5 Parents: a8339de Author: Hadrian Zbarcea <[email protected]> Authored: Wed Jul 31 14:43:54 2013 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Jul 31 14:43:54 2013 -0400 ---------------------------------------------------------------------- .../java/org/apache/ode/jacob/ChannelRef.java | 27 ++++++++++ .../java/org/apache/ode/jacob/Expression.java | 29 +++++++++++ src/main/java/org/apache/ode/jacob/Message.java | 55 ++++++++++++++------ .../java/org/apache/ode/jacob/MessageType.java | 27 ---------- .../apache/ode/jacob/oo/ChannelListener.java | 24 +++------ .../java/org/apache/ode/jacob/oo/ClassUtil.java | 28 ++++++++-- .../org/apache/ode/jacob/oo/MessageHandler.java | 8 ++- .../org/apache/ode/jacob/oo/ProcessUtil.java | 1 + .../java/org/apache/ode/jacob/oo/Synch.java | 6 +-- src/main/java/org/apache/ode/jacob/oo/Val.java | 6 +-- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 3 +- .../apache/ode/jacob/examples/cell/Cell.java | 8 +-- .../eratosthenes/NaturalNumberStream.java | 6 +-- .../jacob/examples/helloworld/HelloWorld.java | 5 +- .../ode/jacob/examples/synch/SynchPrint.java | 7 ++- .../jacob/vpu/ProxyConstructorTimingTest.java | 4 +- 16 files changed, 141 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/ChannelRef.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java new file mode 100644 index 0000000..ef755cc --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob; + + +/** + * TODO: Document... + */ + +public interface ChannelRef { +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/Expression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Expression.java b/src/main/java/org/apache/ode/jacob/Expression.java new file mode 100644 index 0000000..e6d6a7e --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/Expression.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob; + + +/** + * TODO: Document... + * TODO: should anything be final here? the class itself? + */ + +public interface Expression { + <T> T evaluate(Message message, Class<T> type); +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/Message.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java index 0eb9024..6eaa356 100644 --- a/src/main/java/org/apache/ode/jacob/Message.java +++ b/src/main/java/org/apache/ode/jacob/Message.java @@ -27,33 +27,57 @@ import java.util.concurrent.ConcurrentHashMap; * TODO: should anything be final here? the class itself? */ -public final class Message { - private Class<? extends MessageType> type; - private MessageChannel reply; - private Map<String, Object> headers; +public class Message { + private long id; + private ChannelRef to; + private ChannelRef replyTo; + private String action; + private Map<String, Object> headers; private Object body; public Message() { - // TODO: do we always need headers? + id = 0; // TODO: generate id + // TODO: always allocating headers may not be a good idea + // checking for non-null headers in the getters below is + // not great either; should look into a better option later + // after finishing pi-calculus refactoring and running some + // perf tests headers = new ConcurrentHashMap<String, Object>(); } - public Message(Class<? extends MessageType> type) { + + public Message(ChannelRef to, ChannelRef replyTo, String action) { this(); - this.type = type; + this.to = to; + this.replyTo = replyTo; + this.action = action; } - public Class<? extends MessageType> getType() { - return type; + // TODO: add any other convenience methods like addHeader, removeHeader? + public long getId() { + return id; + } + public void setId(long id) { + this.id = id; + } + public ChannelRef getTo() { + return to; } - public void setType(Class<? extends MessageType> type) { - this.type = type; + public void setTo(ChannelRef to) { + this.to = to; } - public MessageChannel getReply() { - return reply; + public ChannelRef getReplyTo() { + return replyTo; } - public void setReply(MessageChannel reply) { - this.reply = reply; + public void setReplyTo(ChannelRef replyTo) { + this.replyTo = replyTo; } + public String getAction() { + return action; + } + public void setAction(String action) { + this.action = action; + } + public Map<String, Object> getHeaders() { return headers; } @@ -70,5 +94,4 @@ public final class Message { public boolean containsHeader(String header) { return headers.containsKey(header); } - // TODO: add any other convenience methods like addHeader, removeHeader? } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/MessageType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/MessageType.java b/src/main/java/org/apache/ode/jacob/MessageType.java deleted file mode 100644 index a1e4fe5..0000000 --- a/src/main/java/org/apache/ode/jacob/MessageType.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ode.jacob; - - -/** - * TODO: Document... - */ - -public interface MessageType { -} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java index 0c607c2..b8998d8 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java +++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java @@ -19,14 +19,11 @@ package org.apache.ode.jacob.oo; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Set; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.Message; import org.apache.ode.jacob.MessageChannel; -import org.apache.ode.jacob.MessageType; /** @@ -37,21 +34,14 @@ import org.apache.ode.jacob.MessageType; public abstract class ChannelListener extends JacobObject implements MessageChannel { public void onMessage(Message msg) { - Class<? extends MessageType> type = msg.getType(); - - Set<Method> methods = this.getImplementedMethods(); - for (Method m : methods) { - if (type != null && type.equals(ClassUtil.getMessageType(m))) { - if (this instanceof ReceiveProcess) { - try { - m.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody()); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - break; + Method action = ClassUtil.findActionMethod(getImplementedMethods()).evaluate(msg, Method.class); + try { + if (action != null && this instanceof ReceiveProcess) { + action.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody()); } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java index 4e5c59c..15c8c4b 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java @@ -22,7 +22,8 @@ import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; -import org.apache.ode.jacob.MessageType; +import org.apache.ode.jacob.Expression; +import org.apache.ode.jacob.Message; public final class ClassUtil { @@ -43,14 +44,31 @@ public final class ClassUtil { // Utility class } - public static Class<? extends MessageType> getMessageType(Method channelMethod) { - MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class); - return handler == null ? null : handler.value(); - } public static Set<Method> runMethodSet() { return RUN_METHOD_SET; } + public static String getMessageType(Method channelMethod) { + MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class); + return handler == null ? channelMethod.getClass().getName() + "." + channelMethod.getName() : handler.value(); + } + + public static Expression findActionMethod(final Set<Method> implementedMethods) { + return new Expression() { + @SuppressWarnings("unchecked") + public <T> T evaluate(Message message, Class<T> type) { + String action = message.getAction(); + if (Method.class.equals(type) && action != null) { + for (Method m : implementedMethods) { + if (action.equals(ClassUtil.getMessageType(m))) { + return (T)m; + } + } + } + return null; + } + }; + } public static Set<Method> getImplementedMethods(Set<Method> methods, Class<?> clazz) { // TODO: this can be optimized (some 20 times faster in my tests) by keeping a private // map of interfaces to methods: Map<Class<?>, Method[]> and just do lookups http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java index b2bed2d..eccec5e 100644 --- a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java +++ b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java @@ -24,12 +24,10 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.apache.ode.jacob.MessageType; - /** - * Marks a {@link Channel} method as handling a {@link Message} - * of a certaing {@link MessageType} + * Marks a {@link Channel} method as performing a certain + * {@link Message#getAction()} on a {@link Message} * * @see Message#getType() */ @@ -38,5 +36,5 @@ import org.apache.ode.jacob.MessageType; @Documented @Target({ElementType.METHOD}) public @interface MessageHandler { - Class<? extends MessageType> value(); + String value(); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java index 3be8b82..c6dc94f 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java @@ -49,4 +49,5 @@ public final class ProcessUtil { // once we fix serialization, this can be simplified significantly via a dsl return new ReceiveProcess() {}.setChannel(proxy).setReceiver(listener); } + } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/oo/Synch.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/Synch.java b/src/main/java/org/apache/ode/jacob/oo/Synch.java index 057cc9a..80160b2 100644 --- a/src/main/java/org/apache/ode/jacob/oo/Synch.java +++ b/src/main/java/org/apache/ode/jacob/oo/Synch.java @@ -18,9 +18,6 @@ */ package org.apache.ode.jacob.oo; -import org.apache.ode.jacob.MessageType; - - /** * Synch represents a synchronous invocation callback notification. @@ -30,8 +27,7 @@ import org.apache.ode.jacob.MessageType; */ public interface Synch extends Channel { - public interface RetMessage extends MessageType {} - @MessageHandler(RetMessage.class) public void ret(); + public void ret(); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/oo/Val.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/Val.java b/src/main/java/org/apache/ode/jacob/oo/Val.java index caff565..edc9b8a 100644 --- a/src/main/java/org/apache/ode/jacob/oo/Val.java +++ b/src/main/java/org/apache/ode/jacob/oo/Val.java @@ -18,16 +18,12 @@ */ package org.apache.ode.jacob.oo; -import org.apache.ode.jacob.MessageType; - - /** * Generic return-value channel type. */ public interface Val extends Channel { - public interface ValMessage extends MessageType {} - @MessageHandler(ValMessage.class) public void val(Object retVal); + public void val(Object retVal); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 7857f47..491fb4b 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -27,7 +27,6 @@ import java.util.Stack; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.JacobThread; import org.apache.ode.jacob.Message; -import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; import org.apache.ode.jacob.oo.ClassUtil; @@ -428,7 +427,7 @@ public final class JacobVPU { long ctime = System.currentTimeMillis(); try { if (_methodBody instanceof ReceiveProcess) { - Message msg = new Message(ClassUtil.getMessageType(_method)); + Message msg = new Message(null, null, ClassUtil.getMessageType(_method)); msg.setBody(args); ((ReceiveProcess)_methodBody).onMessage(msg); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java index 6fb53fc..d34524e 100644 --- a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java +++ b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java @@ -19,28 +19,24 @@ package org.apache.ode.jacob.examples.cell; -import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; -import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.Val; /** * Channel type for a cell. The channel allows reading of and setting the values of a cell. */ public interface Cell extends Channel { - public interface ReadMessage extends MessageType {} - public interface WriteMessage extends MessageType {} /** * Read the value of the cell. * @param replyTo channel to which the value of the cell is sent */ - @MessageHandler(ReadMessage.class) public void read(Val replyTo); + public void read(Val replyTo); /** * Write the value of the cell. * @param newVal new value of the cell */ - @MessageHandler(WriteMessage.class) public void write(Object newVal); + public void write(Object newVal); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java index 0ab7844..b894c89 100644 --- a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java +++ b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java @@ -19,11 +19,10 @@ package org.apache.ode.jacob.examples.eratosthenes; -import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; -import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.Synch; + /** * DOCUMENTME. * <p>Created on Feb 12, 2004 at 6:22:59 PM.</p> @@ -32,8 +31,7 @@ import org.apache.ode.jacob.oo.Synch; * @author Maciej Szefler <a href="mailto:[email protected]">mbs</a> */ public interface NaturalNumberStream extends Channel { - public interface ValMessage extends MessageType {} - @MessageHandler(ValMessage.class) public void val(int n, Synch ret); + public void val(int n, Synch ret); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 37a0fec..afda12a 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -19,10 +19,8 @@ package org.apache.ode.jacob.examples.helloworld; import org.apache.ode.jacob.JacobObject; -import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.examples.sequence.Sequence; import org.apache.ode.jacob.oo.Channel; -import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.ReceiveProcess; import org.apache.ode.jacob.oo.Synch; import org.apache.ode.jacob.oo.Val; @@ -49,9 +47,8 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; public class HelloWorld extends JacobObject implements Runnable { public static interface Callback<T, R extends Channel> extends Channel { - public interface CallbackMessage extends MessageType {} - @MessageHandler(CallbackMessage.class) public void invoke(T value, R callback); + public void invoke(T value, R callback); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java index 4da9319..140daee 100644 --- a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java +++ b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java @@ -18,11 +18,11 @@ */ package org.apache.ode.jacob.examples.synch; -import org.apache.ode.jacob.MessageType; + import org.apache.ode.jacob.oo.Channel; -import org.apache.ode.jacob.oo.MessageHandler; import org.apache.ode.jacob.oo.Synch; + /** * DOCUMENTME. * <p>Created on Mar 4, 2004 at 4:21:03 PM.</p> @@ -30,8 +30,7 @@ import org.apache.ode.jacob.oo.Synch; * @author Maciej Szefler <a href="mailto:[email protected]">mbs</a> */ public interface SynchPrint extends Channel { - public interface SynchPrintMessage extends MessageType {} - @MessageHandler(SynchPrintMessage.class) public Synch print(String msg); + public Synch print(String msg); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/77ea8118/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java index cb75def..92bc9bb 100644 --- a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java +++ b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java @@ -24,7 +24,6 @@ import java.lang.reflect.Proxy; import junit.framework.TestCase; -import org.apache.ode.jacob.MessageType; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelProxy; import org.apache.ode.jacob.oo.MessageHandler; @@ -104,8 +103,7 @@ public class ProxyConstructorTimingTest extends TestCase { } public interface Greeter extends Channel { - public interface HelloMessage extends MessageType {} - @MessageHandler(HelloMessage.class) String hello(String name); + String hello(String name); } @SuppressWarnings("serial")
