Repository: beam Updated Branches: refs/heads/master 02b72d664 -> 749b33f0b
[BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules This also updates the runner harness and existing tests to use a properly constructed ObjectMapper for PipelineOptions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5729b58 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5729b58 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5729b58 Branch: refs/heads/master Commit: e5729b58330a05e7be510710d0027c004704946b Parents: 02b72d6 Author: Luke Cwik <[email protected]> Authored: Wed May 3 17:19:00 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu May 4 07:20:33 2017 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 13 ++- .../DataflowPipelineTranslatorTest.java | 83 ++++++++++++++++++++ .../options/DataflowProfilingOptionsTest.java | 4 +- .../DataflowWorkerLoggingOptionsTest.java | 4 +- .../apache/beam/sdk/options/ValueProviders.java | 4 +- .../apache/beam/sdk/testing/TestPipeline.java | 4 +- .../sdk/options/PipelineOptionsFactoryTest.java | 2 +- .../sdk/options/ProxyInvocationHandlerTest.java | 4 +- .../beam/sdk/options/ValueProviderTest.java | 19 +++-- .../beam/sdk/options/ValueProvidersTest.java | 19 ++--- .../beam/sdk/testing/TestPipelineTest.java | 6 +- .../gcp/options/GoogleApiDebugOptionsTest.java | 8 +- .../org/apache/beam/fn/harness/FnHarness.java | 5 +- .../beam/runners/core/BeamFnDataReadRunner.java | 2 - .../runners/core/BeamFnDataWriteRunner.java | 2 - .../control/ProcessBundleHandlerTest.java | 2 - .../runners/core/BeamFnDataReadRunnerTest.java | 2 - .../runners/core/BeamFnDataWriteRunnerTest.java | 2 - 18 files changed, 143 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 69b4ecd..e727433 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -32,6 +32,7 @@ import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.services.dataflow.model.AutoscalingSettings; import com.google.api.services.dataflow.model.DataflowPackage; @@ -72,6 +73,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -90,6 +92,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -111,6 +114,14 @@ public class DataflowPipelineTranslator { private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); + /** + * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing + * for user specified configuration injection into the ObjectMapper. This supports user custom + * types on {@link PipelineOptions}. + */ + private static final ObjectMapper MAPPER_WITH_MODULES = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) { try { return WindowingStrategies.toProto(windowingStrategy).toByteArray(); @@ -303,7 +314,7 @@ public class DataflowPipelineTranslator { try { environment.setSdkPipelineOptions( - MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class)); + MAPPER.readValue(MAPPER_WITH_MODULES.writeValueAsBytes(options), Map.class)); } catch (IOException e) { throw new IllegalArgumentException( "PipelineOptions specified failed to serialize to JSON.", e); http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 41f3c92..a6ad8c5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -37,11 +37,23 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; +import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -70,6 +82,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Count; @@ -217,6 +230,76 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0)); } + /** PipelineOptions used to test auto registration of Jackson modules. */ + public interface JacksonIncompatibleOptions extends PipelineOptions { + JacksonIncompatible getJacksonIncompatible(); + void setJacksonIncompatible(JacksonIncompatible value); + } + + /** A Jackson {@link Module} to test auto-registration of modules. */ + @AutoService(Module.class) + public static class RegisteredTestModule extends SimpleModule { + public RegisteredTestModule() { + super("RegisteredTestModule"); + setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); + } + } + + /** A class which Jackson does not know how to serialize/deserialize. */ + public static class JacksonIncompatible { + private final String value; + public JacksonIncompatible(String value) { + this.value = value; + } + } + + /** A Jackson mixin used to add annotations to other classes. */ + @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) + @JsonSerialize(using = JacksonIncompatibleSerializer.class) + public static final class JacksonIncompatibleMixin {} + + /** A Jackson deserializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleDeserializer extends + JsonDeserializer<JacksonIncompatible> { + + @Override + public JacksonIncompatible deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + return new JacksonIncompatible(jsonParser.readValueAs(String.class)); + } + } + + /** A Jackson serializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { + + @Override + public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(jacksonIncompatible.value); + } + } + + @Test + public void testSettingOfPipelineOptionsWithCustomUserType() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setRunner(DataflowRunner.class); + options.as(JacksonIncompatibleOptions.class).setJacksonIncompatible( + new JacksonIncompatible("userCustomTypeTest")); + + Pipeline p = Pipeline.create(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList()) + .getJob(); + + Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions(); + assertThat(sdkPipelineOptions, hasKey("options")); + Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options"); + assertThat(optionsMap, hasEntry("jacksonIncompatible", (Object) "userCustomTypeTest")); + } + @Test public void testNetworkConfig() throws IOException { final String testNetwork = "test-network"; http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java index 4018cbb..68118a4 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,7 +34,8 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class DataflowProfilingOptionsTest { - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); @Test public void testOptionsObject() throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java index b463dcb..b1a5258 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -32,7 +33,8 @@ import org.junit.runners.JUnit4; /** Tests for {@link DataflowWorkerLoggingOptions}. */ @RunWith(JUnit4.class) public class DataflowWorkerLoggingOptionsTest { - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); @Rule public ExpectedException expectedException = ExpectedException.none(); @Test http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java index d034b81..e2355ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.util.Map; +import org.apache.beam.sdk.util.common.ReflectHelpers; /** * Utilities for working with the {@link ValueProvider} interface. @@ -37,7 +38,8 @@ class ValueProviders { */ public static String updateSerializedOptions( String serializedOptions, Map<String, String> runtimeValues) { - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); ObjectNode root, options; try { root = mapper.readValue(serializedOptions, ObjectNode.class); http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index ab8772b..4d0cc2b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.junit.runner.Description; @@ -240,7 +241,8 @@ public class TestPipeline extends Pipeline implements TestRule { static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner"; - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private Optional<? extends PipelineRunEnforcement> enforcement = Optional.absent(); http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 11bd7b9..76a5f18 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -1689,7 +1689,7 @@ public class PipelineOptionsFactoryTest { } /** PipelineOptions used to test auto registration of Jackson modules. */ - public interface JacksonIncompatibleOptions extends PipelineOptions { + interface JacksonIncompatibleOptions extends PipelineOptions { JacksonIncompatible getJacksonIncompatible(); void setJacksonIncompatible(JacksonIncompatible value); } http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 323c24c..2c43f57 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Rule; @@ -77,7 +78,8 @@ public class ProxyInvocationHandlerTest { } }; - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); /** A test interface with some primitives and objects. */ public interface Simple extends PipelineOptions { http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java index 9369ae6..e596cc1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -41,6 +42,8 @@ import org.junit.runners.JUnit4; /** Tests for {@link ValueProvider}. */ @RunWith(JUnit4.class) public class ValueProviderTest { + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); @Rule public ExpectedException expectedException = ExpectedException.none(); /** A test interface. */ @@ -118,8 +121,7 @@ public class ValueProviderTest { @Test public void testNoDefaultRuntimeProviderWithOverride() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - TestOptions runtime = mapper.readValue( + TestOptions runtime = MAPPER.readValue( "{ \"options\": { \"foo\": \"quux\" }}", PipelineOptions.class) .as(TestOptions.class); @@ -134,8 +136,7 @@ public class ValueProviderTest { @Test public void testDefaultRuntimeProviderWithOverride() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - TestOptions runtime = mapper.readValue( + TestOptions runtime = MAPPER.readValue( "{ \"options\": { \"bar\": \"quux\" }}", PipelineOptions.class) .as(TestOptions.class); @@ -196,12 +197,11 @@ public class ValueProviderTest { public void testSerializeDeserializeNoArg() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); assertFalse(submitOptions.getFoo().isAccessible()); - ObjectMapper mapper = new ObjectMapper(); - String serializedOptions = mapper.writeValueAsString(submitOptions); + String serializedOptions = MAPPER.writeValueAsString(submitOptions); String runnerString = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("foo", "quux")); - TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class) + TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class) .as(TestOptions.class); ValueProvider<String> vp = runtime.getFoo(); @@ -215,12 +215,11 @@ public class ValueProviderTest { TestOptions submitOptions = PipelineOptionsFactory.fromArgs("--foo=baz").as(TestOptions.class); assertEquals("baz", submitOptions.getFoo().get()); assertTrue(submitOptions.getFoo().isAccessible()); - ObjectMapper mapper = new ObjectMapper(); - String serializedOptions = mapper.writeValueAsString(submitOptions); + String serializedOptions = MAPPER.writeValueAsString(submitOptions); String runnerString = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("foo", "quux")); - TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class) + TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class) .as(TestOptions.class); ValueProvider<String> vp = runtime.getFoo(); http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java index 14f86bc..dd4d55b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -29,6 +30,9 @@ import org.junit.runners.JUnit4; /** Tests for {@link ValueProviders}. */ @RunWith(JUnit4.class) public class ValueProvidersTest { + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + /** A test interface. */ public interface TestOptions extends PipelineOptions { String getString(); @@ -41,11 +45,10 @@ public class ValueProvidersTest { @Test public void testUpdateSerialize() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); - ObjectMapper mapper = new ObjectMapper(); - String serializedOptions = mapper.writeValueAsString(submitOptions); + String serializedOptions = MAPPER.writeValueAsString(submitOptions); String updatedOptions = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("string", "bar")); - TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class) + TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class) .as(TestOptions.class); assertEquals("bar", runtime.getString()); } @@ -54,11 +57,10 @@ public class ValueProvidersTest { public void testUpdateSerializeExistingValue() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.fromArgs( "--string=baz", "--otherString=quux").as(TestOptions.class); - ObjectMapper mapper = new ObjectMapper(); - String serializedOptions = mapper.writeValueAsString(submitOptions); + String serializedOptions = MAPPER.writeValueAsString(submitOptions); String updatedOptions = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("string", "bar")); - TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class) + TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class) .as(TestOptions.class); assertEquals("bar", runtime.getString()); assertEquals("quux", runtime.getOtherString()); @@ -67,11 +69,10 @@ public class ValueProvidersTest { @Test public void testUpdateSerializeEmptyUpdate() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); - ObjectMapper mapper = new ObjectMapper(); - String serializedOptions = mapper.writeValueAsString(submitOptions); + String serializedOptions = MAPPER.writeValueAsString(submitOptions); String updatedOptions = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.<String, String>of()); - TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class) + TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class) .as(TestOptions.class); assertNull(runtime.getString()); } http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 04005c5..05abb59 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -61,6 +62,8 @@ import org.junit.runners.Suite; TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class }) public class TestPipelineTest implements Serializable { + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); /** Tests related to the creation of a {@link TestPipeline}. */ @RunWith(JUnit4.class) @@ -85,9 +88,8 @@ public class TestPipelineTest implements Serializable { @Test public void testCreationOfPipelineOptions() throws Exception { - ObjectMapper mapper = new ObjectMapper(); String stringOptions = - mapper.writeValueAsString( + MAPPER.writeValueAsString( new String[] { "--runner=org.apache.beam.sdk.testing.CrashingRunner" }); http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java index 67d5880..68a29e6 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -35,6 +36,8 @@ import org.junit.runners.JUnit4; /** Tests for {@link GoogleApiDebugOptions}. */ @RunWith(JUnit4.class) public class GoogleApiDebugOptionsTest { + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); private static final String STORAGE_GET_TRACE = "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; private static final String STORAGE_GET_AND_LIST_TRACE = @@ -139,9 +142,8 @@ public class GoogleApiDebugOptionsTest { @Test public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception { String serializedValue = "{\"Api\":\"Token\"}"; - ObjectMapper objectMapper = new ObjectMapper(); assertEquals(serializedValue, - objectMapper.writeValueAsString( - objectMapper.readValue(serializedValue, GoogleApiTracer.class))); + MAPPER.writeValueAsString( + MAPPER.readValue(serializedValue, GoogleApiTracer.class))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index d587986..24f826c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -34,6 +34,7 @@ import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,9 @@ public class FnHarness { System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR)); System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS)); - PipelineOptions options = new ObjectMapper().readValue( + ObjectMapper objectMapper = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + PipelineOptions options = objectMapper.readValue( System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor = http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java index 805d480..7c4a5e8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -47,7 +46,6 @@ import org.slf4j.LoggerFactory; */ public class BeamFnDataReadRunner<OutputT> { private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java index 0ba09e3..3a11def 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; @@ -38,7 +37,6 @@ import org.apache.beam.sdk.values.KV; * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish. */ public class BeamFnDataWriteRunner<InputT> { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; private final BeamFnApi.Target outputTarget; private final Coder<WindowedValue<InputT>> coder; http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 5987267..654f989 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -33,7 +33,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -92,7 +91,6 @@ import org.mockito.MockitoAnnotations; /** Tests for {@link ProcessBundleHandler}. */ @RunWith(JUnit4.class) public class ProcessBundleHandlerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Coder<WindowedValue<String>> STRING_CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final String LONG_CODER_SPEC_ID = "998L"; http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java index 0d036fe..04a3615 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; @@ -65,7 +64,6 @@ import org.mockito.MockitoAnnotations; /** Tests for {@link BeamFnDataReadRunner}. */ @RunWith(JUnit4.class) public class BeamFnDataReadRunnerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder() http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java index 50fee7a..9e50cd0 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Any; import java.io.IOException; import java.util.ArrayList; @@ -53,7 +52,6 @@ import org.mockito.MockitoAnnotations; /** Tests for {@link BeamFnDataWriteRunner}. */ @RunWith(JUnit4.class) public class BeamFnDataWriteRunnerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
