Updated Branches: refs/heads/master f7b36dd4f -> 588a251bb
Initial version of a Jackson-serializable ExecutionQueue. Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/588a251b Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/588a251b Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/588a251b Branch: refs/heads/master Commit: 588a251bba336c98eecad30e4c2b900623e3388d Parents: f7b36dd Author: Tammo van Lessen <[email protected]> Authored: Thu Feb 21 20:17:46 2013 +0100 Committer: Tammo van Lessen <[email protected]> Committed: Thu Feb 21 20:17:46 2013 +0100 ---------------------------------------------------------------------- pom.xml | 10 + .../soup/jackson/ChannelProxyDeserializer.java | 79 +++++++ .../jacob/soup/jackson/ChannelProxySerializer.java | 68 ++++++ .../soup/jackson/ContinuationDeserializer.java | 75 +++++++ .../jacob/soup/jackson/ContinuationSerializer.java | 64 ++++++ .../jackson/ContinuationValueInstantiator.java | 52 +++++ .../soup/jackson/JacksonExecutionQueueImpl.java | 165 +++++++++++++++ .../soup/jackson/JacobTypeResolverBuilder.java | 142 +++++++++++++ .../apache/ode/jacob/vpu/ExecutionQueueImpl.java | 20 +- .../ode/jacob/examples/helloworld/HelloWorld.java | 145 +++++++++---- 10 files changed, 770 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 48f7644..7e2acc9 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ <properties> <slf4j.version>1.7.2</slf4j.version> <junit.version>4.11</junit.version> + <jackson.version>2.1.3</jackson.version> </properties> <dependencyManagement> @@ -48,6 +49,11 @@ <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -56,6 +62,10 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> <!-- test --> <dependency> http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java new file mode 100644 index 0000000..cab8380 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.io.IOException; + +import org.apache.ode.jacob.Channel; +import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.vpu.ChannelFactory; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +public class ChannelProxyDeserializer extends StdDeserializer<Channel> { + + private static final long serialVersionUID = 1L; + + public ChannelProxyDeserializer() { + super(Channel.class); + } + + @Override + public Channel deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + + String type = null; + int id = -1; + while (jp.nextToken() != JsonToken.END_OBJECT) { + String fieldname = jp.getCurrentName(); + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + // if we're not already on the field, advance by one. + jp.nextToken(); + } + if ("channelType".equals(fieldname)) { + type = jp.getText(); + } else if ("channelId".equals(fieldname)) { + id = jp.getIntValue(); + } + } + + if (type == null) { + throw ctxt.mappingException(Channel.class); + } + + if (id < 0) { + throw ctxt.mappingException(Channel.class); + } + + + try { + CommChannel channel = new CommChannel(ctxt.findClass(type)); + channel.setId(id); + return (Channel)ChannelFactory.createChannel(channel, channel.getType()); + + } catch (ClassNotFoundException e) { + throw ctxt.instantiationException(Channel.class, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java new file mode 100644 index 0000000..eadb813 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.io.IOException; + +import org.apache.ode.jacob.Channel; +import org.apache.ode.jacob.ChannelProxy; +import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.vpu.ChannelFactory; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +public class ChannelProxySerializer extends StdSerializer<ChannelProxy>{ + + protected ChannelProxySerializer() { + super(ChannelProxy.class); + } + + @Override + public void serialize(ChannelProxy value, JsonGenerator jgen, + SerializerProvider provider) throws IOException, + JsonGenerationException { + jgen.writeStartObject(); + serializeContents(value, jgen, provider); + jgen.writeEndObject(); + } + + @Override + public void serializeWithType(ChannelProxy value, JsonGenerator jgen, SerializerProvider provider, + TypeSerializer typeSer) + throws IOException, JsonGenerationException + { + typeSer.writeTypePrefixForObject(value, jgen); + serializeContents(value, jgen, provider); + typeSer.writeTypeSuffixForObject(value, jgen); + } + + private void serializeContents(ChannelProxy value, JsonGenerator jgen, + SerializerProvider provider) throws JsonGenerationException, IOException { + CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)value); + ClassNameIdResolver idResolver = new ClassNameIdResolver(provider.constructType(commChannel.getType()), provider.getTypeFactory()); + jgen.writeStringField("channelType", idResolver.idFromBaseType()); + jgen.writeNumberField("channelId", (Integer)commChannel.getId()); + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java new file mode 100644 index 0000000..75b7fba --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.io.IOException; + +import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.soup.Continuation; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +public class ContinuationDeserializer extends StdDeserializer<Continuation> { + + private static final long serialVersionUID = 1L; + + protected ContinuationDeserializer() { + super(Continuation.class); + } + + @Override + public Continuation deserialize(JsonParser jp, + DeserializationContext ctxt) throws IOException, + JsonProcessingException { + + JacobObject target = null; + String methodName = null; + Object[] args = null; + + while (jp.nextToken() != JsonToken.END_OBJECT) { + String fieldname = jp.getCurrentName(); + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + // if we're not already on the field, advance by one. + jp.nextToken(); + } + + if ("target".equals(fieldname)) { + target = jp.readValueAs(JacobObject.class); + } else if ("method".equals(fieldname)) { + methodName = jp.getText(); + } if ("args".equals(fieldname)) { + args = jp.readValueAs(Object[].class); + } + } + + if (target == null) { + throw ctxt.mappingException(Continuation.class); + } + + if (methodName == null) { + throw ctxt.mappingException(Continuation.class); + } + + return new Continuation(target, target.getMethod(methodName), args); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java new file mode 100644 index 0000000..242b602 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.io.IOException; + +import org.apache.ode.jacob.soup.Continuation; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +public class ContinuationSerializer extends StdSerializer<Continuation> { + + public ContinuationSerializer() { + super(Continuation.class); + } + + @Override + public void serialize(Continuation value, JsonGenerator jgen, + SerializerProvider provider) throws IOException, + JsonGenerationException { + jgen.writeStartObject(); + serializeContents(value, jgen, provider); + jgen.writeEndObject(); + } + + + @Override + public void serializeWithType(Continuation value, JsonGenerator jgen, + SerializerProvider provider, TypeSerializer typeSer) + throws IOException, JsonProcessingException { + typeSer.writeTypePrefixForObject(value, jgen); + serializeContents(value, jgen, provider); + typeSer.writeTypeSuffixForObject(value, jgen); + } + + private void serializeContents(Continuation value, JsonGenerator jgen, + SerializerProvider provider) throws JsonGenerationException, IOException { + + jgen.writeObjectField("target", value.getClosure()); + jgen.writeStringField("method", value.getMethod().getName()); + jgen.writeObjectField("args", value.getArgs()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java new file mode 100644 index 0000000..d17fcfa --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.lang.reflect.Method; + +import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.soup.Continuation; + +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.deser.CreatorProperty; +import com.fasterxml.jackson.databind.deser.SettableBeanProperty; +import com.fasterxml.jackson.databind.deser.ValueInstantiator; + +public class ContinuationValueInstantiator extends ValueInstantiator { + + @Override + public String getValueTypeDesc() { + return Continuation.class.getName(); + } + + @Override + public boolean canCreateFromObjectWith() { + return true; + } + + @Override + public SettableBeanProperty[] getFromObjectArguments( + DeserializationConfig config) { + return new CreatorProperty[] { + new CreatorProperty("_closure", config.constructType(JacobObject.class), null, null, null, 0, null), + new CreatorProperty("_method", config.constructType(Method.class), null, null, null, 1, null), + new CreatorProperty("_args", config.constructType(Object[].class), null, null, null, 2, null)}; + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java new file mode 100644 index 0000000..eeb1333 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; + +import org.apache.ode.jacob.Channel; +import org.apache.ode.jacob.ChannelProxy; +import org.apache.ode.jacob.soup.Continuation; +import org.apache.ode.jacob.vpu.ExecutionQueueImpl; + +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +/** + * Variant of {@link org.apache.ode.jacob.vpu.ExecutionQueueImpl} that can be + * serialized and deserialized with Jackson. + */ +public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { + + public JacksonExecutionQueueImpl() { + super(null); + } + + public static ObjectMapper configureMapper() { + + SimpleModule sm = new SimpleModule("jacobmodule"); + sm.addSerializer(ChannelProxy.class, new ChannelProxySerializer()); + sm.addSerializer(Continuation.class, new ContinuationSerializer()); + sm.addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer()); + sm.addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer()); + sm.addDeserializer(Continuation.class, new ContinuationDeserializer()); + sm.addDeserializer(Channel.class, new ChannelProxyDeserializer()); + + ObjectMapper om = new ObjectMapper(); + om.registerModule(sm); + om.disable(MapperFeature.AUTO_DETECT_CREATORS); + om.disable(MapperFeature.AUTO_DETECT_GETTERS); + om.disable(MapperFeature.AUTO_DETECT_IS_GETTERS); + om.setVisibility(PropertyAccessor.FIELD, Visibility.ANY); + + om.setDefaultTyping(new JacobTypeResolverBuilder()); + + om.enable(SerializationFeature.WRITE_ENUMS_USING_INDEX); + om.enable(SerializationFeature.INDENT_OUTPUT); + + return om; + } + + + public static class ExecutionQueueImplSerializer extends StdSerializer<JacksonExecutionQueueImpl> { + + public ExecutionQueueImplSerializer() { + super(JacksonExecutionQueueImpl.class); + } + + @Override + public void serialize(JacksonExecutionQueueImpl value, JsonGenerator jgen, + SerializerProvider provider) throws IOException, + JsonGenerationException { + jgen.writeStartObject(); + serializeContents(value, jgen, provider); + jgen.writeEndObject(); + } + + + @Override + public void serializeWithType(JacksonExecutionQueueImpl value, JsonGenerator jgen, + SerializerProvider provider, TypeSerializer typeSer) + throws IOException, JsonProcessingException { + typeSer.writeTypePrefixForObject(value, jgen); + serializeContents(value, jgen, provider); + typeSer.writeTypeSuffixForObject(value, jgen); + } + + private void serializeContents(JacksonExecutionQueueImpl value, JsonGenerator jgen, + SerializerProvider provider) throws JsonGenerationException, IOException { + + jgen.writeNumberField("objIdCounter", value._objIdCounter); + jgen.writeNumberField("currentCycle", value._currentCycle); + + jgen.writeObjectField("continuations", value._reactions.toArray(new Continuation[] {})); + jgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[] {})); + jgen.writeObjectField("global", value._gdata); + } + } + + public static class ExecutionQueueImplDeserializer extends StdDeserializer<JacksonExecutionQueueImpl> { + + private static final long serialVersionUID = 1L; + + public ExecutionQueueImplDeserializer() { + super(JacksonExecutionQueueImpl.class); + } + + @Override + public JacksonExecutionQueueImpl deserialize(JsonParser jp, + DeserializationContext ctxt) throws IOException, + JsonProcessingException { + + JacksonExecutionQueueImpl soup = new JacksonExecutionQueueImpl(); + + while (jp.nextToken() != JsonToken.END_OBJECT) { + String fieldname = jp.getCurrentName(); + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + // if we're not already on the field, advance by one. + jp.nextToken(); + } + + if ("objIdCounter".equals(fieldname)) { + soup._objIdCounter = jp.getIntValue(); + } else if ("currentCycle".equals(fieldname)) { + soup._currentCycle = jp.getIntValue(); + } else if ("continuations".equals(fieldname)) { + Continuation[] cs = (Continuation[])jp.readValueAs(Continuation[].class); + soup._reactions = new HashSet<Continuation>(Arrays.asList(cs)); + } else if ("channels".equals(fieldname)) { + soup._channels = new HashMap<Integer, ChannelFrame>(); + ChannelFrame[] frames = jp.readValueAs(ChannelFrame[].class); + for (ChannelFrame f : frames) { + soup._channels.put(f.getId(), f); + } + } + + } + return soup; + } + + } + + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java new file mode 100644 index 0000000..8e6a982 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.soup.jackson; + +import java.util.Collection; + +import org.apache.ode.jacob.Channel; +import org.apache.ode.jacob.ChannelProxy; +import org.apache.ode.jacob.JacobObject; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.SerializationConfig; +import com.fasterxml.jackson.databind.cfg.MapperConfig; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.jsontype.TypeDeserializer; +import com.fasterxml.jackson.databind.jsontype.TypeIdResolver; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer; +import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver; +import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import com.fasterxml.jackson.databind.type.TypeFactory; + +public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { + + public JacobTypeResolverBuilder() { + init(JsonTypeInfo.Id.CLASS, null); + inclusion(JsonTypeInfo.As.PROPERTY); + typeProperty("@class"); + } + + + @Override + protected TypeIdResolver idResolver(MapperConfig<?> config, + JavaType baseType, Collection<NamedType> subtypes, boolean forSer, + boolean forDeser) { + return new ChannelAwareTypeIdResolver(baseType, config.getTypeFactory()); + } + + + @Override + public TypeSerializer buildTypeSerializer(SerializationConfig config, + JavaType baseType, Collection<NamedType> subtypes) { + + return useForType(baseType) ? super.buildTypeSerializer(config, baseType, subtypes) : null; + } + + private boolean useForType(JavaType t) { + if (JacobObject.class.isAssignableFrom(t.getRawClass())) { + //System.err.println("XXX: JO " + t); + return true; + } + + if (Channel.class.isAssignableFrom(t.getRawClass())) { + //System.err.println("XXX: CH " + t); + return true; + } + + //if (!t.isConcrete()) { + if (t.getRawClass() == Object.class) { + //System.err.println("XXX: CON " + t + "- " + t.isConcrete()); + return true; + } + + return false; + } + + @Override + public TypeDeserializer buildTypeDeserializer(DeserializationConfig config, + JavaType baseType, Collection<NamedType> subtypes) { + + if (useForType(baseType)) { + if (baseType.isInterface() && Channel.class.isAssignableFrom(baseType.getRawClass())) { + TypeIdResolver idRes = idResolver(config, baseType, subtypes, false, true); + return new AsPropertyTypeDeserializer(baseType, idRes, + _typeProperty, _typeIdVisible, Channel.class); + } else { + return super.buildTypeDeserializer(config, baseType, subtypes); + } + } + + return null; + } + + public static class ChannelAwareTypeIdResolver extends TypeIdResolverBase { + + private ClassNameIdResolver delegate; + + protected ChannelAwareTypeIdResolver(JavaType baseType, + TypeFactory typeFactory) { + super(baseType, typeFactory); + delegate = new ClassNameIdResolver(baseType, typeFactory); + } + + @Override + public String idFromValue(Object value) { + if (value instanceof ChannelProxy) { + return "<<channelproxy>>"; + } + return delegate.idFromValue(value); + } + + @Override + public String idFromValueAndType(Object value, Class<?> suggestedType) { + return delegate.idFromValueAndType(value, suggestedType); + } + + @Override + public JavaType typeFromId(String id) { + if ("<<channelproxy>>".equals(id)) { + return null; // force jackson to use default impl + } + return delegate.typeFromId(id); + } + + @Override + public Id getMechanism() { + return Id.CUSTOM; + } + + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index 2581b57..362be0c 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -80,21 +80,21 @@ public class ExecutionQueueImpl implements ExecutionQueue { * forward progress; this scenario would occur if a maximum processign * time-per-instance policy were in effect. */ - private Set<Continuation> _reactions = new HashSet<Continuation>(); + protected Set<Continuation> _reactions = new HashSet<Continuation>(); - private Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>(); + protected Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>(); /** * The "expected" cycle counter, use to detect database serialization * issues. */ - private int _currentCycle; + protected int _currentCycle; - private int _objIdCounter; + protected int _objIdCounter; private ReplacementMap _replacementMap; - private Serializable _gdata; + protected Serializable _gdata; private Map<Object, LinkedList<IndexedObject>> _index = new HashMap<Object, LinkedList<IndexedObject>>(); @@ -442,7 +442,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { return _gdata; } - private static class ChannelFrame implements Externalizable { + protected static class ChannelFrame implements Externalizable { Class<?> type; int id; @@ -534,7 +534,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { } @SuppressWarnings("serial") - private static class CommGroupFrame implements Serializable { + protected static class CommGroupFrame implements Serializable { boolean replicated; public Set<CommFrame> commFrames = new HashSet<CommFrame>(); @@ -543,7 +543,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { } } - private static class CommFrame implements Externalizable { + protected static class CommFrame implements Externalizable { CommGroupFrame commGroupFrame; ChannelFrame channelFrame; @@ -566,7 +566,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { } } - private static class ObjectFrame extends CommFrame implements Externalizable { + protected static class ObjectFrame extends CommFrame implements Externalizable { private static final long serialVersionUID = -7212430608484116919L; ChannelListener _continuation; @@ -743,7 +743,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { } } - private static final class ChannelRef implements Externalizable { + protected static final class ChannelRef implements Externalizable { private Class<?> _type; private Integer _id; http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/588a251b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index c0f5d05..d755789 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -24,9 +24,13 @@ import org.apache.ode.jacob.ReceiveProcess; import org.apache.ode.jacob.Synch; import org.apache.ode.jacob.Val; import org.apache.ode.jacob.examples.sequence.Sequence; -import org.apache.ode.jacob.vpu.ExecutionQueueImpl; +import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl; import org.apache.ode.jacob.vpu.JacobVPU; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * Simple Hello World example to showcase different * features and approaches of the Jacob API. @@ -37,20 +41,22 @@ import org.apache.ode.jacob.vpu.JacobVPU; @SuppressWarnings("serial") public class HelloWorld extends JacobRunnable { - public interface Callback<T, R extends Channel> extends Channel { + public static interface Callback<T, R extends Channel> extends Channel { public void invoke(T value, R callback); } static class ReliablePrinterProcess extends JacobRunnable { - private Callback<String, Synch> _in; - public ReliablePrinterProcess(Callback<String, Synch> in) { - _in = in; + private Callback<String, Synch> in; + + @JsonCreator + public ReliablePrinterProcess(@JsonProperty("in") Callback<String, Synch> in) { + this.in = in; } public void run() { object(true, new ReceiveProcess() { private static final long serialVersionUID = 1L; - }.setChannel(_in).setReceiver(new Callback<String, Synch>(){ + }.setChannel(in).setReceiver(new Callback<String, Synch>(){ @Override public void invoke(String value, Synch callback) { System.out.println(value); @@ -63,8 +69,9 @@ public class HelloWorld extends JacobRunnable { static class ReliableStringEmitterProcess extends JacobRunnable { private String str; private Callback<String, Synch> to; - - public ReliableStringEmitterProcess(String str, Callback<String, Synch> to) { + + @JsonCreator + public ReliableStringEmitterProcess(@JsonProperty("str")String str, @JsonProperty("to") Callback<String, Synch> to) { this.str = str; this.to = to; } @@ -85,26 +92,30 @@ public class HelloWorld extends JacobRunnable { static class PrinterProcess extends JacobRunnable { private Val _in; - public PrinterProcess(Val in) { + + @JsonCreator + public PrinterProcess(@JsonProperty("in") Val in) { _in = in; } public void run() { - object(true, new ReceiveProcess() { - private static final long serialVersionUID = 1L; - }.setChannel(_in).setReceiver(new Val(){ - public void val(Object o) { - System.out.println(o); - } - })); + object(true, new PrinterProcessReceiveProcess().setChannel(_in).setReceiver(new PrinterProcessVal())); + } + + static class PrinterProcessReceiveProcess extends ReceiveProcess {} + static class PrinterProcessVal implements Val { + public void val(Object o) { + System.out.println(o); + } } } static class StringEmitterProcess extends JacobRunnable { private String str; private Val to; - - public StringEmitterProcess(String str, Val to) { + + @JsonCreator + public StringEmitterProcess(@JsonProperty("str") String str, @JsonProperty("to") Val to) { this.str = str; this.to = to; } @@ -117,20 +128,29 @@ public class HelloWorld extends JacobRunnable { static class ForwarderProcess extends JacobRunnable { private Val in; private Val out; - public ForwarderProcess(Val in, Val out) { + + @JsonCreator + public ForwarderProcess(@JsonProperty("in") Val in, @JsonProperty("out") Val out) { this.in = in; this.out = out; } public void run() { - object(true, new ReceiveProcess() { - private static final long serialVersionUID = 1L; - }.setChannel(in).setReceiver(new Val(){ - public void val(Object o) { - out.val(o); - } - })); + object(true, new ForwarderProcessReceiveProcess().setChannel(in).setReceiver(new ForwarderProcessVal(out))); + } + + static class ForwarderProcessReceiveProcess extends ReceiveProcess {} + static class ForwarderProcessVal implements Val { + private Val out; + @JsonCreator + public ForwarderProcessVal(@JsonProperty("out")Val out) { + this.out = out; + } + public void val(Object o) { + out.val(o); + } } + } private void simpleHelloWorld() { @@ -153,6 +173,7 @@ public class HelloWorld extends JacobRunnable { // (new(callback).!out(hello).?callback) | (new(callback).!out(world).?callback) // new(rout) + @SuppressWarnings("unchecked") Callback<String, Synch> rout = newChannel(Callback.class, "reliableHelloWorld-rout"); // *(?rout(str).!sysout(str)) instance(new ReliablePrinterProcess(rout)); @@ -170,21 +191,49 @@ public class HelloWorld extends JacobRunnable { // new(out) final Val out = newChannel(Val.class, "sequencedHelloWorld-out"); + // *(?out(str).!sysout(str)) + instance(new PrinterProcess(out)); + final String[] greeting = {"Hello", "World"}; - instance(new Sequence(greeting.length, null) { - @Override - protected JacobRunnable doStep(final int step, final Synch done) { - return new JacobRunnable() { - @Override - public void run() { - instance(new StringEmitterProcess(greeting[step], out)); - done.ret(); - } - }; - } - }); + + instance(new HWSequence(greeting, out, null)); + } + + static class HWSequence extends Sequence { + + private final String[] greetings; + private final Val out; + + @JsonCreator + public HWSequence(@JsonProperty("greetings") String[] greetings, @JsonProperty("out") Val out, @JsonProperty("done") Synch done) { + super(greetings.length, done); + this.greetings = greetings; + this.out = out; + } + + @Override + protected JacobRunnable doStep(int step, Synch done) { + return new SequenceItemEmitter(greetings[step], done); + } + + class SequenceItemEmitter extends JacobRunnable { + private final String string; + private final Synch done; + + public SequenceItemEmitter(String string, Synch done) { + this.string = string; + this.done = done; + } + + @Override + public void run() { + instance(new StringEmitterProcess(string, out)); + done.ret(); + } + } } + @Override public void run() { simpleHelloWorld(); @@ -192,15 +241,31 @@ public class HelloWorld extends JacobRunnable { sequencedHelloWorld(); } - public static void main(String args[]) { + public static void main(String args[]) throws Exception { + long start = System.currentTimeMillis(); + ObjectMapper mapper = JacksonExecutionQueueImpl.configureMapper(); JacobVPU vpu = new JacobVPU(); - vpu.setContext(new ExecutionQueueImpl(null)); + JacksonExecutionQueueImpl queue = new JacksonExecutionQueueImpl(); + vpu.setContext(queue); vpu.inject(new HelloWorld()); while (vpu.execute()) { + queue = loadAndRestoreQueue(mapper, queue); System.out.println(vpu.isComplete() ? "<0>" : "."); //vpu.dumpState(); } vpu.dumpState(); + System.out.println("Runtime in ms: " + (System.currentTimeMillis() - start)); + } + + public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception { + String json = mapper.writeValueAsString(in); + // System.out.println(json); + JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class); + //String json2 = mapper.writeValueAsString(q2); + + // System.out.println("----"); + // System.out.println(json2); + return q2; } }
