Repository: beam Updated Branches: refs/heads/master 9b175ccc2 -> f0ce31b9d
[BEAM-2788] Use SerializablePipelineOptions to serde PipelineOptions in Gearpump runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/081df6d3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/081df6d3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/081df6d3 Branch: refs/heads/master Commit: 081df6d331f60926c4bcf3d49a3bef8ab3ad4f2c Parents: 9b175cc Author: huafengw <[email protected]> Authored: Tue Aug 22 15:15:29 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Aug 23 21:58:28 2017 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 13 +++++-------- .../translators/GroupByKeyTranslator.java | 2 +- .../gearpump/translators/io/GearpumpSource.java | 7 ++++--- .../translators/utils/DoFnRunnerFactory.java | 7 ++++--- .../translators/utils/TranslatorUtils.java | 20 -------------------- .../FlattenPCollectionsTranslatorTest.java | 6 ++++++ 6 files changed, 20 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 30bc354..2b460e7 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -154,14 +154,6 @@ </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> <dependency> @@ -183,6 +175,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <classifier>tests</classifier> http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 8409beb..bea5a74 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -94,7 +94,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe } @Override - public <T> Window[] apply(Context<T> context) { + public <T> Window[] apply(Context<T> context) { try { Object element = context.element(); if (element instanceof TranslatorUtils.RawUnionValue) { http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index 2f53139..3766195 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.gearpump.translators.io; import java.io.IOException; import java.time.Instant; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; @@ -39,13 +40,13 @@ import org.apache.gearpump.streaming.task.TaskContext; */ public abstract class GearpumpSource<T> implements DataSource { - private final byte[] serializedOptions; + private final SerializablePipelineOptions serializedOptions; private Source.Reader<T> reader; private boolean available = false; GearpumpSource(PipelineOptions options) { - this.serializedOptions = TranslatorUtils.serializePipelineOptions(options); + this.serializedOptions = new SerializablePipelineOptions(options); } protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException; @@ -53,7 +54,7 @@ public abstract class GearpumpSource<T> implements DataSource { @Override public void open(TaskContext context, Instant startTime) { try { - PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions); + PipelineOptions options = serializedOptions.get(); this.reader = createReader(options); this.available = reader.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index 375b696..6557c8b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -43,7 +44,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { private static final long serialVersionUID = -4109539010014189725L; private final DoFn<InputT, OutputT> fn; - private final byte[] serializedOptions; + private final SerializablePipelineOptions serializedOptions; private final Collection<PCollectionView<?>> sideInputs; private final DoFnRunners.OutputManager outputManager; private final TupleTag<OutputT> mainOutputTag; @@ -61,7 +62,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { StepContext stepContext, WindowingStrategy<?, ?> windowingStrategy) { this.fn = doFn; - this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); this.sideInputs = sideInputs; this.outputManager = outputManager; this.mainOutputTag = mainOutputTag; @@ -72,7 +73,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner( ReadyCheckingSideInputReader sideInputReader) { - PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions); + PipelineOptions options = serializedOptions.get(); DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner( options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index c14298f..2dae955 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -18,11 +18,8 @@ package org.apache.beam.runners.gearpump.translators.utils; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import java.io.IOException; import java.time.Instant; import java.util.Collection; import java.util.HashMap; @@ -30,7 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.gearpump.translators.TranslationContext; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -145,22 +141,6 @@ public class TranslatorUtils { } } - public static byte[] serializePipelineOptions(PipelineOptions options) { - try { - return new ObjectMapper().writeValueAsBytes(options); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) { - try { - return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - /** * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue. */ http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java index 1262177..1115fad 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java @@ -30,7 +30,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; @@ -62,6 +64,8 @@ public class FlattenPCollectionsTranslatorTest { when(translationContext.getInputs()).thenReturn(Collections.EMPTY_MAP); when(translationContext.getOutput()).thenReturn(mockOutput); + when(translationContext.getPipelineOptions()) + .thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class)); translator.translate(transform, translationContext); verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher())); @@ -141,6 +145,8 @@ public class FlattenPCollectionsTranslatorTest { when(translationContext.getInputs()).thenReturn(inputs); when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1); + when(translationContext.getPipelineOptions()) + .thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class)); translator.translate(transform, translationContext); verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
