Repository: beam Updated Branches: refs/heads/master 749b33f0b -> 3c5891b31
[BEAM-2165] Update Flink to support serializing/deserializing custom user types configured via Jackson modules Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f53e5d43 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f53e5d43 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f53e5d43 Branch: refs/heads/master Commit: f53e5d43d58c79ab9f3d04e112e6f05ad9dfe42f Parents: 749b33f Author: Luke Cwik <[email protected]> Authored: Wed May 3 17:12:20 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu May 4 07:28:43 2017 -0700 ---------------------------------------------------------------------- runners/flink/pom.xml | 5 ++ .../utils/SerializedPipelineOptions.java | 16 +++- .../beam/runners/flink/PipelineOptionsTest.java | 87 ++++++++++++++++++++ 3 files changed, 106 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f53e5d43/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index eb2b005..4122454 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -257,6 +257,11 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/f53e5d43/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 2256bb1..f717fd7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation.utils; import static com.google.common.base.Preconditions.checkNotNull; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -27,6 +28,7 @@ import java.io.Serializable; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; /** * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. @@ -42,7 +44,7 @@ public class SerializedPipelineOptions implements Serializable { checkNotNull(options, "PipelineOptions must not be null."); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - new ObjectMapper().writeValue(baos, options); + createMapper().writeValue(baos, options); this.serializedOptions = baos.toByteArray(); } catch (Exception e) { throw new RuntimeException("Couldn't serialize PipelineOptions.", e); @@ -53,7 +55,7 @@ public class SerializedPipelineOptions implements Serializable { public PipelineOptions getPipelineOptions() { if (pipelineOptions == null) { try { - pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); FileSystems.setDefaultConfigInWorkers(pipelineOptions); @@ -64,4 +66,14 @@ public class SerializedPipelineOptions implements Serializable { return pipelineOptions; } + + /** + * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing + * for user specified configuration injection into the ObjectMapper. This supports user custom + * types on {@link PipelineOptions}. + */ + private static ObjectMapper createMapper() { + return new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/f53e5d43/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 23740a1..7519dbf 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -23,6 +23,23 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.Collections; import java.util.HashMap; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -200,4 +217,74 @@ public class PipelineOptionsTest { c.getPipelineOptions().as(MyOptions.class).getTestOption()); } } + + /** PipelineOptions used to test auto registration of Jackson modules. */ + public interface JacksonIncompatibleOptions extends PipelineOptions { + JacksonIncompatible getJacksonIncompatible(); + void setJacksonIncompatible(JacksonIncompatible value); + } + + /** A Jackson {@link Module} to test auto-registration of modules. */ + @AutoService(Module.class) + public static class RegisteredTestModule extends SimpleModule { + public RegisteredTestModule() { + super("RegisteredTestModule"); + setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); + } + } + + /** A class which Jackson does not know how to serialize/deserialize. */ + public static class JacksonIncompatible { + private final String value; + public JacksonIncompatible(String value) { + this.value = value; + } + } + + /** A Jackson mixin used to add annotations to other classes. */ + @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) + @JsonSerialize(using = JacksonIncompatibleSerializer.class) + public static final class JacksonIncompatibleMixin {} + + /** A Jackson deserializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleDeserializer extends + JsonDeserializer<JacksonIncompatible> { + + @Override + public JacksonIncompatible deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + return new JacksonIncompatible(jsonParser.readValueAs(String.class)); + } + } + + /** A Jackson serializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { + + @Override + public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(jacksonIncompatible.value); + } + } + + @Test + public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { + String expectedValue = "testValue"; + PipelineOptions options = PipelineOptionsFactory + .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"") + .as(JacksonIncompatibleOptions.class); + SerializedPipelineOptions context = new SerializedPipelineOptions(options); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) { + outputStream.writeObject(context); + } + try (ObjectInputStream inputStream = + new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) { + SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject(); + assertEquals(expectedValue, + copy.getPipelineOptions().as(JacksonIncompatibleOptions.class) + .getJacksonIncompatible().value); + } + } }
