Repository: beam Updated Branches: refs/heads/master f43b61af4 -> 02b72d664
[BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f1c8972 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f1c8972 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f1c8972 Branch: refs/heads/master Commit: 1f1c897264ea7ab050c8644344f6e2648af9ae4a Parents: f43b61a Author: Luke Cwik <[email protected]> Authored: Wed May 3 17:17:11 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu May 4 07:16:04 2017 -0700 ---------------------------------------------------------------------- runners/apex/pom.xml | 4 + .../utils/SerializablePipelineOptions.java | 15 ++- .../translation/utils/PipelineOptionsTest.java | 98 ++++++++++++++++---- 3 files changed, 99 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 36252e8..aa4bddf 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -63,6 +63,10 @@ <version>${apex.malhar.version}</version> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java index 1a47ed5..14476b5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.apex.translation.utils; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.Externalizable; import java.io.IOException; @@ -27,6 +28,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; /** * A wrapper to enable serialization of {@link PipelineOptions}. @@ -51,13 +53,13 @@ public class SerializablePipelineOptions implements Externalizable { @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions)); + out.writeUTF(createMapper().writeValueAsString(pipelineOptions)); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { String s = in.readUTF(); - this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class) + this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class) .as(ApexPipelineOptions.class); if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) { @@ -66,4 +68,13 @@ public class SerializablePipelineOptions implements Externalizable { } } + /** + * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing + * for user specified configuration injection into the ObjectMapper. This supports user custom + * types on {@link PipelineOptions}. + */ + private static ObjectMapper createMapper() { + return new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java index d5eb9a9..118ff99 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java @@ -23,15 +23,25 @@ import static org.junit.Assert.assertNotNull; import com.datatorrent.common.util.FSStorageAgent; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; import com.esotericsoftware.kryo.serializers.JavaSerializer; - +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.auto.service.AutoService; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; - +import java.io.IOException; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.BeforeClass; import org.junit.Test; /** @@ -49,36 +59,92 @@ public class PipelineOptionsTest { void setTestOption(String value); } - private static class MyOptionsWrapper { - private MyOptionsWrapper() { + private static class OptionsWrapper { + private OptionsWrapper() { this(null); // required for Kryo } - private MyOptionsWrapper(ApexPipelineOptions options) { + private OptionsWrapper(ApexPipelineOptions options) { this.options = new SerializablePipelineOptions(options); } @Bind(JavaSerializer.class) private final SerializablePipelineOptions options; } - private static MyOptions options; - - private static final String[] args = new String[]{"--testOption=nothing"}; + @Test + public void testSerialization() { + OptionsWrapper wrapper = new OptionsWrapper( + PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FSStorageAgent.store(bos, wrapper); - @BeforeClass - public static void beforeTest() { - options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis); + assertNotNull(wrapperCopy.options); + assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); } @Test - public void testSerialization() { - MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options); + public void testSerializationWithUserCustomType() { + OptionsWrapper wrapper = new OptionsWrapper( + PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"") + .as(JacksonIncompatibleOptions.class)); ByteArrayOutputStream bos = new ByteArrayOutputStream(); FSStorageAgent.store(bos, wrapper); ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis); + OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis); assertNotNull(wrapperCopy.options); - assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); + assertEquals("testValue", + wrapperCopy.options.get().as(JacksonIncompatibleOptions.class) + .getJacksonIncompatible().value); + } + + /** PipelineOptions used to test auto registration of Jackson modules. */ + public interface JacksonIncompatibleOptions extends ApexPipelineOptions { + JacksonIncompatible getJacksonIncompatible(); + void setJacksonIncompatible(JacksonIncompatible value); } + /** A Jackson {@link Module} to test auto-registration of modules. */ + @AutoService(Module.class) + public static class RegisteredTestModule extends SimpleModule { + public RegisteredTestModule() { + super("RegisteredTestModule"); + setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); + } + } + + /** A class which Jackson does not know how to serialize/deserialize. */ + public static class JacksonIncompatible { + private final String value; + public JacksonIncompatible(String value) { + this.value = value; + } + } + + /** A Jackson mixin used to add annotations to other classes. */ + @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) + @JsonSerialize(using = JacksonIncompatibleSerializer.class) + public static final class JacksonIncompatibleMixin {} + + /** A Jackson deserializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleDeserializer extends + JsonDeserializer<JacksonIncompatible> { + + @Override + public JacksonIncompatible deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + return new JacksonIncompatible(jsonParser.readValueAs(String.class)); + } + } + + /** A Jackson serializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { + + @Override + public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(jacksonIncompatible.value); + } + } }
