Updated Branches: refs/heads/master 05e07f7bd -> 0f7d9afd5
fixing jackson-based serialization/deserialization. Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/0f7d9afd Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/0f7d9afd Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/0f7d9afd Branch: refs/heads/master Commit: 0f7d9afd5ed440e275d537d33b27424bc6dc15a0 Parents: 05e07f7 Author: Tammo van Lessen <[email protected]> Authored: Wed Apr 10 15:47:34 2013 +0200 Committer: Tammo van Lessen <[email protected]> Committed: Wed Apr 10 15:47:34 2013 +0200 ---------------------------------------------------------------------- pom.xml | 9 +++ .../soup/jackson/ChannelProxyDeserializer.java | 6 +- .../soup/jackson/JacksonExecutionQueueImpl.java | 13 ++-- .../JacobJacksonAnnotationIntrospector.java | 19 +++++ .../soup/jackson/JacobTypeResolverBuilder.java | 36 +++++---- .../apache/ode/jacob/vpu/ExecutionQueueImpl.java | 3 + .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 4 + .../ode/jacob/examples/helloworld/HelloWorld.java | 51 ++++++++----- .../ode/jacob/examples/sequence/Sequence.java | 56 +++++++++------ src/test/resources/log4j.properties | 4 +- 10 files changed, 135 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b3b1f97..e81c28c 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,11 @@ <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-smile</artifactId> + <version>${jackson.version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -67,6 +72,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-smile</artifactId> + </dependency> <!-- test --> <dependency> http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java index 1b85bcc..069882b 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java @@ -67,9 +67,9 @@ public class ChannelProxyDeserializer extends StdDeserializer<Channel> { try { - CommChannel channel = new CommChannel(ctxt.findClass(type)); - channel.setId(id); - return (Channel)ChannelFactory.createChannel(channel, channel.getType()); + CommChannel cchannel = new CommChannel(ctxt.findClass(type)); + cchannel.setId(id); + return (Channel)ChannelFactory.createChannel(cchannel, cchannel.getType()); } catch (ClassNotFoundException e) { throw ctxt.instantiationException(Channel.class, e); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java index ea1376e..e0892df 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java @@ -55,7 +55,7 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { super(null); } - public static ObjectMapper configureMapper() { + public static void configureMapper(ObjectMapper om) { SimpleModule sm = new SimpleModule("jacobmodule"); sm.addSerializer(ChannelProxy.class, new ChannelProxySerializer()); @@ -65,7 +65,6 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { sm.addDeserializer(Continuation.class, new ContinuationDeserializer()); sm.addDeserializer(Channel.class, new ChannelProxyDeserializer()); - ObjectMapper om = new ObjectMapper(); om.registerModule(sm); om.disable(MapperFeature.AUTO_DETECT_CREATORS); om.disable(MapperFeature.AUTO_DETECT_GETTERS); @@ -73,11 +72,11 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { om.setVisibility(PropertyAccessor.FIELD, Visibility.ANY); om.setDefaultTyping(new JacobTypeResolverBuilder()); + om.setAnnotationIntrospector(new JacobJacksonAnnotationIntrospector()); + om.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); om.enable(SerializationFeature.WRITE_ENUMS_USING_INDEX); - om.enable(SerializationFeature.INDENT_OUTPUT); - - return om; + //om.enable(SerializationFeature.INDENT_OUTPUT); } @@ -156,6 +155,10 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { } } + + // Garbage collection + // TODO + return soup; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java new file mode 100644 index 0000000..0cb3a73 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java @@ -0,0 +1,19 @@ +package org.apache.ode.jacob.soup.jackson; + +import com.fasterxml.jackson.annotation.ObjectIdGenerators; +import com.fasterxml.jackson.databind.introspect.Annotated; +import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector; +import com.fasterxml.jackson.databind.introspect.ObjectIdInfo; + +public class JacobJacksonAnnotationIntrospector extends + JacksonAnnotationIntrospector { + + private static final long serialVersionUID = 1L; + + /* enable object ids for all objects. */ + @Override + public ObjectIdInfo findObjectIdInfo(Annotated ann) { + return new ObjectIdInfo("@id", Object.class, ObjectIdGenerators.IntSequenceGenerator.class); + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java index a01d13e..b477fcd 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java @@ -23,6 +23,8 @@ import java.util.Collection; import org.apache.ode.jacob.Channel; import org.apache.ode.jacob.ChannelProxy; import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.vpu.ChannelFactory; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; @@ -34,11 +36,11 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.TypeDeserializer; import com.fasterxml.jackson.databind.jsontype.TypeIdResolver; import com.fasterxml.jackson.databind.jsontype.TypeSerializer; -import com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer; import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver; import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder; import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.databind.util.ClassUtil; public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { @@ -66,18 +68,14 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { private boolean useForType(JavaType t) { if (JacobObject.class.isAssignableFrom(t.getRawClass())) { - //System.err.println("XXX: JO " + t); return true; } if (Channel.class.isAssignableFrom(t.getRawClass())) { - //System.err.println("XXX: CH " + t); return true; } - //if (!t.isConcrete()) { if (t.getRawClass() == Object.class) { - //System.err.println("XXX: CON " + t + "- " + t.isConcrete()); return true; } @@ -89,13 +87,9 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { JavaType baseType, Collection<NamedType> subtypes) { if (useForType(baseType)) { - if (baseType.isInterface() && Channel.class.isAssignableFrom(baseType.getRawClass())) { - TypeIdResolver idRes = idResolver(config, baseType, subtypes, false, true); - return new AsPropertyTypeDeserializer(baseType, idRes, - _typeProperty, _typeIdVisible, Channel.class); - } else { - return super.buildTypeDeserializer(config, baseType, subtypes); - } + // set Channel as the default impl. + defaultImpl(Channel.class); + return super.buildTypeDeserializer(config, baseType, subtypes); } return null; @@ -113,7 +107,9 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { public String idFromValue(Object value) { if (value instanceof ChannelProxy) { - return "<<channelproxy>>"; + CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)value); + return commChannel.getType().getName(); + } return delegate.idFromValue(value); } @@ -123,10 +119,18 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { } public JavaType typeFromId(String id) { - if ("<<channelproxy>>".equals(id)) { - return null; // force jackson to use default impl + try { + Class<?> cls = ClassUtil.findClass(id); + if (Channel.class.isAssignableFrom(cls) && cls.isInterface()) { + // return null to force Jackson to use default deserializer (which is the ChannelProxyDeserializer) + return null; + } + return _typeFactory.constructSpecializedType(_baseType, cls); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Invalid type id '"+id+"' (for id type 'Id.class'): no such class found"); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid type id '"+id+"' (for id type 'Id.class'): "+e.getMessage(), e); } - return delegate.typeFromId(id); } public Id getMechanism() { http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index c95cfb3..fff2ae1 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -537,6 +537,9 @@ public class ExecutionQueueImpl implements ExecutionQueue { boolean replicated; public Set<CommFrame> commFrames = new HashSet<CommFrame>(); + // default constructor for deserialization + public CommGroupFrame() {} + public CommGroupFrame(boolean replicated) { this.replicated = replicated; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 455431f..5d7660c 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -140,6 +140,10 @@ public final class JacobVPU { _executionQueue = executionQueue; _executionQueue.setClassLoader(_classLoader); } + + public ExecutionQueue getContext() { + return _executionQueue; + } public void registerExtension(Class<?> extensionClass, Object obj) { LOG.trace(">> setContext (extensionClass={}, obj={})", extensionClass, obj); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 746f374..5fb0b70 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -78,17 +78,26 @@ public class HelloWorld extends JacobRunnable { public void run() { Synch callback = newChannel(Synch.class, "callback channel to ACK " + str); - object(new ReceiveProcess() { - private static final long serialVersionUID = 1L; - }.setChannel(callback).setReceiver(new Synch() { - public void ret() { - System.out.println(str + " ACKed"); - } - })); + object(new ReliableStringEmitterReceiveProcess().setChannel(callback).setReceiver(new ReliableStringEmitterSynch(str))); to.invoke(str, callback); } + + static class ReliableStringEmitterReceiveProcess extends ReceiveProcess {} + static class ReliableStringEmitterSynch implements Synch { + private String str; + + @JsonCreator + public ReliableStringEmitterSynch(@JsonProperty("str") String str) { + this.str = str; + } + + @Override + public void ret() { + System.out.println(str + " ACKed"); + } + } } - + static class PrinterProcess extends JacobRunnable { private Val _in; @@ -212,16 +221,19 @@ public class HelloWorld extends JacobRunnable { @Override protected JacobRunnable doStep(int step, Synch done) { - return new SequenceItemEmitter(greetings[step], done); + return new SequenceItemEmitter(greetings[step], done, out); } - class SequenceItemEmitter extends JacobRunnable { + static class SequenceItemEmitter extends JacobRunnable { private final String string; private final Synch done; + private final Val out; - public SequenceItemEmitter(String string, Synch done) { + @JsonCreator + public SequenceItemEmitter(@JsonProperty("string") String string, @JsonProperty("done") Synch done, @JsonProperty("out") Val out) { this.string = string; this.done = done; + this.out = out; } @Override @@ -241,14 +253,18 @@ public class HelloWorld extends JacobRunnable { } public static void main(String args[]) throws Exception { + // enable logging + //BasicConfigurator.configure(); long start = System.currentTimeMillis(); - ObjectMapper mapper = JacksonExecutionQueueImpl.configureMapper(); + ObjectMapper mapper = new ObjectMapper(); + JacksonExecutionQueueImpl.configureMapper(mapper); + JacobVPU vpu = new JacobVPU(); JacksonExecutionQueueImpl queue = new JacksonExecutionQueueImpl(); vpu.setContext(queue); vpu.inject(new HelloWorld()); while (vpu.execute()) { - queue = loadAndRestoreQueue(mapper, queue); + queue = loadAndRestoreQueue(mapper, (JacksonExecutionQueueImpl)vpu.getContext()); vpu.setContext(queue); System.out.println(vpu.isComplete() ? "<0>" : "."); //vpu.dumpState(); @@ -258,13 +274,10 @@ public class HelloWorld extends JacobRunnable { } public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception { - String json = mapper.writeValueAsString(in); - // System.out.println(json); + byte[] json = mapper.writeValueAsBytes(in); + // print json + //System.out.println(new String(json)); JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class); - //String json2 = mapper.writeValueAsString(q2); - - // System.out.println("----"); - // System.out.println(json2); return q2; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java index df4a785..9921267 100644 --- a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java +++ b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java @@ -22,14 +22,15 @@ import org.apache.ode.jacob.JacobRunnable; import org.apache.ode.jacob.ReceiveProcess; import org.apache.ode.jacob.Synch; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + /** * Abstract process that executes a number of steps sequentially. */ @SuppressWarnings("serial") public abstract class Sequence extends JacobRunnable { - private int _steps; - private int _current; - private Synch _done; + private final SequenceData data = new SequenceData(); /** * Create a {@link Sequence} with a number of steps. @@ -38,30 +39,20 @@ public abstract class Sequence extends JacobRunnable { * @param done synchronous callback */ public Sequence(int steps, Synch done) { - _steps = steps; - _current = 0; - _done = done; + data._steps = steps; + data._current = 0; + data._done = done; } - /** - * Process execution block - */ public void run() { - if (_current >= _steps) { - if (_done != null) { - _done.ret(); + if (data._current >= data._steps) { + if (data._done != null) { + data._done.ret(); } } else { Synch r = newChannel(Synch.class); - object(new ReceiveProcess() { - private static final long serialVersionUID = -6999108928780639603L; - }.setChannel(r).setReceiver(new Synch() { - public void ret() { - ++_current; - instance(Sequence.this); - } - })); - instance(doStep(_current, r)); + object(new SequenceReceiveProcess().setChannel(r).setReceiver(new SequenceSynch(data, this))); + instance(doStep(data._current, r)); } } @@ -72,4 +63,27 @@ public abstract class Sequence extends JacobRunnable { * @return runnable process */ protected abstract JacobRunnable doStep(int step, Synch done); + + public static class SequenceData { + public int _steps; + public int _current; + public Synch _done; + //public Sequence _seq; + } + + static class SequenceReceiveProcess extends ReceiveProcess {} + static class SequenceSynch implements Synch { + private final SequenceData data; + private final Sequence parent; + + @JsonCreator + public SequenceSynch(@JsonProperty("data") SequenceData data, @JsonProperty("parent") Sequence parent) { + this.data = data; + this.parent = parent; + } + public void ret() { + ++data._current; + instance(parent); + } + } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 9c6599f..91bd688 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -16,10 +16,10 @@ # # Set root logger level to WARN and its only appender to CONSOLE -log4j.rootLogger=WARN, file +log4j.rootLogger=TRACE, file # log4j properties to work with command line tools. -log4j.category.org.apache.ode=INFO +log4j.category.org.apache.ode=TRACE # Console appender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
