Repository: incubator-beam Updated Branches: refs/heads/master bf78e9667 -> 70e6a1310
[BEAM-196] make use of SerializedPipelineOptions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/43b5ec74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/43b5ec74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/43b5ec74 Branch: refs/heads/master Commit: 43b5ec743718e63c2d9d9532e3ca55bc87370290 Parents: 81577b3 Author: Maximilian Michels <[email protected]> Authored: Mon Apr 18 17:40:50 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Apr 18 18:10:05 2016 +0200 ---------------------------------------------------------------------- .../functions/FlinkDoFnFunction.java | 25 ++------- .../functions/FlinkMultiOutputDoFnFunction.java | 27 ++-------- .../utils/SerializedPipelineOptions.java | 21 +++++--- .../translation/wrappers/SinkOutputFormat.java | 28 +++-------- .../translation/wrappers/SourceInputFormat.java | 24 ++------- .../FlinkGroupAlsoByWindowWrapper.java | 53 ++------------------ .../streaming/io/UnboundedSourceWrapper.java | 28 ++--------- 7 files changed, 41 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 9ed5c7c..3566f7e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.functions; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -35,15 +36,11 @@ import org.apache.beam.sdk.values.TupleTag; import com.google.common.collect.ImmutableList; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.util.Collector; import org.joda.time.Instant; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -55,25 +52,11 @@ import java.util.List; public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, OUT> { private final DoFn<IN, OUT> doFn; - private transient PipelineOptions options; + private final SerializedPipelineOptions serializedOptions; public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) { this.doFn = doFn; - this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); + this.serializedOptions = new SerializedPipelineOptions(options); } @Override @@ -160,7 +143,7 @@ public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, OUT @Override public PipelineOptions getPipelineOptions() { - return options; + return serializedOptions.getPipelineOptions(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index b1c4be6..476dc5e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.functions; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -33,15 +34,10 @@ import org.apache.beam.sdk.values.TupleTag; import com.google.common.collect.ImmutableList; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.util.Collector; import org.joda.time.Instant; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -57,30 +53,15 @@ import java.util.Map; public class FlinkMultiOutputDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, RawUnionValue> { private final DoFn<IN, OUT> doFn; - private transient PipelineOptions options; + private final SerializedPipelineOptions serializedPipelineOptions; private final Map<TupleTag<?>, Integer> outputMap; public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) { this.doFn = doFn; - this.options = options; + this.serializedPipelineOptions = new SerializedPipelineOptions(options); this.outputMap = outputMap; } - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - - } - @Override public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) throws Exception { ProcessContext context = new ProcessContext(doFn, out); @@ -129,7 +110,7 @@ public class FlinkMultiOutputDoFnFunction<IN, OUT> extends RichMapPartitionFunct @Override public PipelineOptions getPipelineOptions() { - return options; + return serializedPipelineOptions.getPipelineOptions(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 7439e02..2b35c31 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.utils; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import org.apache.beam.sdk.options.PipelineOptions; import java.io.ByteArrayOutputStream; @@ -30,9 +31,13 @@ import java.io.Serializable; */ public class SerializedPipelineOptions implements Serializable { - private byte[] serializedOptions; + private final byte[] serializedOptions; + + /** Lazily initialized copy of deserialized options */ + private transient PipelineOptions pipelineOptions; public SerializedPipelineOptions(PipelineOptions options) { + Preconditions.checkNotNull(options, "PipelineOptions must not be null."); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { new ObjectMapper().writeValue(baos, options); @@ -43,12 +48,16 @@ public class SerializedPipelineOptions implements Serializable { } - public PipelineOptions deserializeOptions() { - try { - return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } } + + return pipelineOptions; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java index c6a4160..2766a87 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java @@ -18,21 +18,16 @@ package org.apache.beam.runners.flink.translation.wrappers; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; -import com.google.common.base.Preconditions; - -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AbstractID; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.lang.reflect.Field; /** @@ -43,7 +38,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { private final Sink<T> sink; - private transient PipelineOptions pipelineOptions; + private final SerializedPipelineOptions serializedOptions; private Sink.WriteOperation<T, ?> writeOperation; private Sink.Writer<T, ?> writer; @@ -52,7 +47,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) { this.sink = extractSink(transform); - this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); } private Sink<T> extractSink(Write.Bound<T> transform) { @@ -70,9 +65,9 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { @Override public void configure(Configuration configuration) { - writeOperation = sink.createWriteOperation(pipelineOptions); + writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions()); try { - writeOperation.initialize(pipelineOptions); + writeOperation.initialize(serializedOptions.getPipelineOptions()); } catch (Exception e) { throw new RuntimeException("Failed to initialize the write operation.", e); } @@ -81,7 +76,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { @Override public void open(int taskNumber, int numTasks) throws IOException { try { - writer = writeOperation.createWriter(pipelineOptions); + writer = writeOperation.createWriter(serializedOptions.getPipelineOptions()); } catch (Exception e) { throw new IOException("Couldn't create writer.", e); } @@ -110,15 +105,4 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { } } - private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, pipelineOptions); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - pipelineOptions = mapper.readValue(in, PipelineOptions.class); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 4b11abc..dc11c77 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -17,12 +17,11 @@ */ package org.apache.beam.runners.flink.translation.wrappers; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; @@ -31,9 +30,7 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.List; @@ -47,32 +44,19 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> private final BoundedSource<T> initialSource; private transient PipelineOptions options; - private final byte[] serializedOptions; + private final SerializedPipelineOptions serializedOptions; private transient BoundedSource.BoundedReader<T> reader = null; private boolean inputAvailable = true; public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { this.initialSource = initialSource; - this.options = options; - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - new ObjectMapper().writeValue(baos, options); - serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } - + this.serializedOptions = new SerializedPipelineOptions(options); } @Override public void configure(Configuration configuration) { - try { - options = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } + options = serializedOptions.getPipelineOptions(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 8e7493e..8d9744f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.AbstractFlinkTimerInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; @@ -29,7 +30,6 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; @@ -104,7 +104,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> private static final long serialVersionUID = 1L; - private transient PipelineOptions options; + private SerializedPipelineOptions serializedOptions; private transient CoderRegistry coderRegistry; @@ -236,7 +236,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { Preconditions.checkNotNull(options); - this.options = Preconditions.checkNotNull(options); + this.serializedOptions = new SerializedPipelineOptions(Preconditions.checkNotNull(options)); this.coderRegistry = Preconditions.checkNotNull(registry); this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder(); this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy(); @@ -477,52 +477,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> @Override public PipelineOptions getPipelineOptions() { - // TODO: PipelineOptions need to be available on the workers. - // Ideally they are captured as part of the pipeline. - // For now, construct empty options so that StateContexts.createFromComponents - // will yield a valid StateContext, which is needed to support the StateContext.window(). - if (options == null) { - options = new PipelineOptions() { - @Override - public <T extends PipelineOptions> T as(Class<T> kls) { - return null; - } - - @Override - public <T extends PipelineOptions> T cloneAs(Class<T> kls) { - return null; - } - - @Override - public Class<? extends PipelineRunner<?>> getRunner() { - return null; - } - - @Override - public void setRunner(Class<? extends PipelineRunner<?>> kls) { - - } - - @Override - public CheckEnabled getStableUniqueNames() { - return null; - } - - @Override - public void setStableUniqueNames(CheckEnabled enabled) { - } - - @Override - public String getTempLocation() { - return null; - } - - @Override - public void setTempLocation(String tempLocation) { - } - }; - } - return options; + return serializedOptions.getPipelineOptions(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 5be34e6..9d15a33 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -25,8 +26,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -34,10 +33,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.joda.time.Instant; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - /** * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the * {@link org.apache.beam.sdk.io.Read.Unbounded} interface. @@ -54,15 +49,14 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< private volatile boolean isRunning = false; - /** Serialized using custom Java serialization via Jackson */ - private transient PipelineOptions pipelineOptions; + private final SerializedPipelineOptions serializedOptions; /** Instantiated during runtime **/ private transient UnboundedSource.UnboundedReader<T> reader; public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) { this.name = transform.getName(); - this.pipelineOptions = pipelineOptions; + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); this.source = transform.getSource(); } @@ -91,7 +85,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< isRunning = true; - reader = source.createReader(pipelineOptions, null); + reader = source.createReader(serializedOptions.getPipelineOptions(), null); boolean inputAvailable = reader.start(); @@ -156,18 +150,4 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< return System.currentTimeMillis() + watermarkInterval; } - - // Special serialization of the PipelineOptions necessary to instantiate the reader. - private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, pipelineOptions); - } - - // Special deserialization of the PipelineOptions necessary to instantiate the reader. - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - pipelineOptions = mapper.readValue(in, PipelineOptions.class); - } }
