More clean up - toString / hashCode / equals goodness - unneeded doubled commgroup references removed - some logging tweaks
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/69d711b9 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/69d711b9 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/69d711b9 Branch: refs/heads/master Commit: 69d711b995defb644ecdc7856520fd75a277e5a6 Parents: 018d158 Author: Tammo van Lessen <[email protected]> Authored: Thu Aug 15 23:17:30 2013 +0200 Committer: Tammo van Lessen <[email protected]> Committed: Thu Aug 15 23:17:30 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/ode/jacob/ChannelRef.java | 32 ++++++++++++- src/main/java/org/apache/ode/jacob/Message.java | 20 ++++++++ .../java/org/apache/ode/jacob/soup/Comm.java | 17 ------- .../org/apache/ode/jacob/soup/CommGroup.java | 1 - .../ode/jacob/vpu/ExecutionQueueImpl.java | 48 ++++++++++---------- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 31 +++++++------ .../jacob/examples/helloworld/HelloWorld.java | 19 ++++++-- 7 files changed, 109 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/main/java/org/apache/ode/jacob/ChannelRef.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java index 78fff21..1209bcb 100644 --- a/src/main/java/org/apache/ode/jacob/ChannelRef.java +++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java @@ -76,5 +76,35 @@ public class ChannelRef implements Serializable { return null; } - + + @Override + public String toString() { + return "ChannelRef [target=" + target + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((target == null) ? 0 : target.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ChannelRef other = (ChannelRef) obj; + if (target == null) { + if (other.target != null) + return false; + } else if (!target.equals(other.target)) + return false; + return true; + } + } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/main/java/org/apache/ode/jacob/Message.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java index 5d62135..ea3e901 100644 --- a/src/main/java/org/apache/ode/jacob/Message.java +++ b/src/main/java/org/apache/ode/jacob/Message.java @@ -110,6 +110,26 @@ public class Message implements Serializable { + (headers != null ? "headers=" + headers + ", " : "") + (body != null ? "body=" + body : "") + "]"; } + + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (id ^ (id >>> 32)); + return result; + } + + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Message other = (Message) obj; + if (id != other.id) + return false; + return true; + } public static Message copyFrom(Message message) { Message result = new Message(); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/main/java/org/apache/ode/jacob/soup/Comm.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/Comm.java b/src/main/java/org/apache/ode/jacob/soup/Comm.java index ce842cb..d2a6a98 100644 --- a/src/main/java/org/apache/ode/jacob/soup/Comm.java +++ b/src/main/java/org/apache/ode/jacob/soup/Comm.java @@ -29,7 +29,6 @@ package org.apache.ode.jacob.soup; */ public abstract class Comm extends ExecutionQueueObject { private CommChannel _channel; - private CommGroup _group; protected Comm() { } @@ -42,27 +41,11 @@ public abstract class Comm extends ExecutionQueueObject { return _channel; } - public void setChannel(CommChannel channel) { - _channel = channel; - } - - public CommGroup getGroup() { - return _group; - } - - public void setGroup(CommGroup group) { - if (_group != null) { - throw new IllegalStateException("Attempted to call setGroup() twice!"); - } - _group = group; - } - public String toString() { // TODO: maybe find a better way to do a toString and replace ObjectPrinter return new StringBuilder("{") .append(this.getClass().getSimpleName()) .append(" chnl=").append(_channel) - .append(", group=").append(_group) .append(" }").toString(); } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/main/java/org/apache/ode/jacob/soup/CommGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/CommGroup.java b/src/main/java/org/apache/ode/jacob/soup/CommGroup.java index bd6c10a..de5e602 100644 --- a/src/main/java/org/apache/ode/jacob/soup/CommGroup.java +++ b/src/main/java/org/apache/ode/jacob/soup/CommGroup.java @@ -55,7 +55,6 @@ public class CommGroup extends ExecutionQueueObject { } public void add(Comm comm) { - comm.setGroup(this); _comms.add(comm); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index 8e4265b..d788afe 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -118,20 +118,20 @@ public class ExecutionQueueImpl implements ExecutionQueue { LOG.trace(">> add (channel={})", channel); verifyNew(channel); - ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter, channel.getType().getName(), channel - .getDescription()); + ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter, + channel.getDescription()); _channels.put(cframe.getId(), cframe); assignId(channel, cframe.getId()); } public void enqueueMessage(Message message) { - LOG.trace(">> enqueueReaction (message={})", message); + LOG.trace(">> enqueueMessage (message={})", message); _messages.add(message); } public Message dequeueMessage() { - LOG.trace(">> dequeueReaction ()"); + LOG.trace(">> dequeueMessage ()"); Message message = null; if (!_messages.isEmpty()) { @@ -156,8 +156,9 @@ public class ExecutionQueueImpl implements ExecutionQueue { throw new IllegalStateException("Send attempted on channel containing replicated send! Channel= " + comm.getChannel()); } - if (group.isReplicated()) + if (group.isReplicated()) { chnlFrame.replicatedSend = true; + } CommSend commSend = (CommSend) comm; MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMessage()); @@ -169,10 +170,11 @@ public class ExecutionQueueImpl implements ExecutionQueue { throw new IllegalStateException( "Receive attempted on channel containing replicated receive! Channel= " + comm.getChannel()); } - if (group.isReplicated()) + if (group.isReplicated()) { chnlFrame.replicatedRecv = true; + } CommRecv commRecv = (CommRecv) comm; - ObjectFrame oframe = new ObjectFrame(commGroupFrame, chnlFrame, commRecv.getListener()); + ListenerFrame oframe = new ListenerFrame(commGroupFrame, chnlFrame, commRecv.getListener()); commGroupFrame.commFrames.add(oframe); chnlFrame.objFrames.add(oframe); } @@ -289,7 +291,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) { ChannelFrame cframe = i.next(); sos.writeInt(cframe.objFrames.size()); - for (Iterator<ObjectFrame> j = cframe.objFrames.iterator(); j.hasNext();) { + for (Iterator<ListenerFrame> j = cframe.objFrames.iterator(); j.hasNext();) { sos.writeObject(j.next()); } sos.writeInt(cframe.msgFrames.size()); @@ -367,10 +369,10 @@ public class ExecutionQueueImpl implements ExecutionQueue { ChannelFrame cframe = _channels.get(channel.getId()); while (cframe != null && !cframe.msgFrames.isEmpty() && !cframe.objFrames.isEmpty()) { MessageFrame mframe = cframe.msgFrames.iterator().next(); - ObjectFrame oframe = cframe.objFrames.iterator().next(); + ListenerFrame oframe = cframe.objFrames.iterator().next(); Message msg = Message.copyFrom(mframe.message); - msg.setTo(new org.apache.ode.jacob.ChannelRef(oframe._continuation)); + msg.setTo(new org.apache.ode.jacob.ChannelRef(oframe.listener)); enqueueMessage(msg); if (!mframe.commGroupFrame.replicated) { @@ -407,7 +409,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { // Add all channels reference in the group to the GC candidate set. for (Iterator<CommFrame> i = groupFrame.commFrames.iterator(); i.hasNext();) { CommFrame frame = i.next(); - if (frame instanceof ObjectFrame) { + if (frame instanceof ListenerFrame) { assert frame.channelFrame.objFrames.contains(frame); frame.channelFrame.objFrames.remove(frame); } else { @@ -438,7 +440,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { boolean replicatedRecv; - Set<ObjectFrame> objFrames = new LinkedHashSet<ObjectFrame>(); + Set<ListenerFrame> objFrames = new LinkedHashSet<ListenerFrame>(); Set<MessageFrame> msgFrames = new LinkedHashSet<MessageFrame>(); @@ -448,7 +450,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { public ChannelFrame() { } - public ChannelFrame(Class<?> type, int id, String name, String description) { + public ChannelFrame(Class<?> type, int id, String description) { this.type = type; this.id = id; this.description = description; @@ -462,7 +464,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { return refCount; } - public Set<ObjectFrame> getObjFrames() { + public Set<ListenerFrame> getObjFrames() { return objFrames; } @@ -479,7 +481,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { replicatedRecv = in.readBoolean(); int cnt = in.readInt(); for (int i = 0; i < cnt; ++i) { - objFrames.add((ObjectFrame) in.readObject()); + objFrames.add((ListenerFrame) in.readObject()); } cnt = in.readInt(); for (int i = 0; i < cnt; ++i) { @@ -495,7 +497,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { out.writeBoolean(replicatedSend); out.writeBoolean(replicatedRecv); out.writeInt(objFrames.size()); - for (Iterator<ObjectFrame> i = objFrames.iterator(); i.hasNext();) { + for (Iterator<ListenerFrame> i = objFrames.iterator(); i.hasNext();) { out.writeObject(i.next()); } out.writeInt(msgFrames.size()); @@ -564,28 +566,28 @@ public class ExecutionQueueImpl implements ExecutionQueue { } } - protected static class ObjectFrame extends CommFrame implements Externalizable { + protected static class ListenerFrame extends CommFrame implements Externalizable { private static final long serialVersionUID = -7212430608484116919L; - MessageListener _continuation; + MessageListener listener; // Used for deserialization - public ObjectFrame() { + public ListenerFrame() { } - public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, MessageListener continuation) { + public ListenerFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, MessageListener listener) { super(commGroupFrame, channelFrame); - this._continuation = continuation; + this.listener = listener; } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - _continuation = (ChannelListener)in.readObject(); + listener = (ChannelListener)in.readObject(); } public void writeExternal(ObjectOutput out) throws IOException { super.writeExternal(out); - out.writeObject(_continuation); + out.writeObject(listener); } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 3b709e4..5b8a07d 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -196,18 +196,23 @@ public final class JacobVPU { return buf.toString(); } - static String stringify(Object[] list) { - if (list == null) { + static String stringify(Object obj) { + if (obj == null) { return ""; } - StringBuffer buf = new StringBuffer(); - for (int i = 0; i < list.length; ++i) { - if (i > 0) { - buf.append(','); + + if (obj instanceof Object[]) { + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < ((Object[])obj).length; ++i) { + if (i > 0) { + buf.append(','); + } + buf.append(((Object[])obj)[i]); } - buf.append(list[i]); + return buf.toString(); + } else { + return obj.toString(); } - return buf.toString(); } public void setClassLoader(ClassLoader classLoader) { @@ -252,11 +257,6 @@ public final class JacobVPU { } public Channel message(Channel channel, Method method, Object[] args) { - LOG.trace(">> [{}] : {} ! {} ({})", _cycle, channel, method.getName(), - LOG.isTraceEnabled() ? stringify(args) : null); - - _statistics.messagesSent++; - Channel replyChannel = null; CommChannel replyCommChannel = null; // Check for synchronous methods; create a synchronization channel @@ -277,6 +277,11 @@ public final class JacobVPU { } public void sendMessage(Message msg) { + LOG.trace(">> [{}] : {} ! {} ({})", _cycle, msg.getTo(), msg.getAction(), + LOG.isTraceEnabled() ? stringify(msg.getBody()) : null); + + _statistics.messagesSent++; + CommGroup grp = new CommGroup(false); CommSend send = new CommSend(msg); grp.add(send); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/69d711b9/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 0c6ef73..40f4440 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -25,6 +25,12 @@ import static org.apache.ode.jacob.Jacob.object; import static org.apache.ode.jacob.Jacob.sendMessage; import static org.apache.ode.jacob.Jacob.subscribe; +import java.util.Collections; +import java.util.List; + +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.Message; @@ -321,14 +327,19 @@ public class HelloWorld extends JacobObject implements Runnable { @Override public void run() { simpleHelloWorld(); -// reliableHelloWorld(); -// sequencedHelloWorld(); + reliableHelloWorld(); + sequencedHelloWorld(); calculusHelloWorld(); } public static void main(String args[]) throws Exception { // enable logging - //BasicConfigurator.configure(); + // BasicConfigurator.configure(); + List<Logger> loggers = Collections.<Logger>list(LogManager.getCurrentLoggers()); + loggers.add(LogManager.getRootLogger()); + for ( Logger logger : loggers ) { + logger.setLevel(Level.OFF); + } SmileFactory sf = null; // // enable smile: @@ -360,7 +371,7 @@ public class HelloWorld extends JacobObject implements Runnable { public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception { byte[] json = mapper.writeValueAsBytes(in); // print json - System.out.println(new String(json)); + // System.out.println(new String(json)); JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class); return q2; }
