Repository: incubator-beam
Updated Branches:
  refs/heads/master bf78e9667 -> 70e6a1310


[BEAM-196] make use of SerializedPipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/43b5ec74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/43b5ec74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/43b5ec74

Branch: refs/heads/master
Commit: 43b5ec743718e63c2d9d9532e3ca55bc87370290
Parents: 81577b3
Author: Maximilian Michels <[email protected]>
Authored: Mon Apr 18 17:40:50 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon Apr 18 18:10:05 2016 +0200

----------------------------------------------------------------------
 .../functions/FlinkDoFnFunction.java            | 25 ++-------
 .../functions/FlinkMultiOutputDoFnFunction.java | 27 ++--------
 .../utils/SerializedPipelineOptions.java        | 21 +++++---
 .../translation/wrappers/SinkOutputFormat.java  | 28 +++--------
 .../translation/wrappers/SourceInputFormat.java | 24 ++-------
 .../FlinkGroupAlsoByWindowWrapper.java          | 53 ++------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 28 ++---------
 7 files changed, 41 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 9ed5c7c..3566f7e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -35,15 +36,11 @@ import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.common.collect.ImmutableList;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -55,25 +52,11 @@ import java.util.List;
 public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, 
OUT> {
 
   private final DoFn<IN, OUT> doFn;
-  private transient PipelineOptions options;
+  private final SerializedPipelineOptions serializedOptions;
 
   public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) {
     this.doFn = doFn;
-    this.options = options;
-  }
-
-  private void writeObject(ObjectOutputStream out)
-      throws IOException, ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, options);
-  }
-
-  private void readObject(ObjectInputStream in)
-      throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    options = mapper.readValue(in, PipelineOptions.class);
+    this.serializedOptions = new SerializedPipelineOptions(options);
   }
 
   @Override
@@ -160,7 +143,7 @@ public class FlinkDoFnFunction<IN, OUT> extends 
RichMapPartitionFunction<IN, OUT
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return options;
+      return serializedOptions.getPipelineOptions();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index b1c4be6..476dc5e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -33,15 +34,10 @@ import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.common.collect.ImmutableList;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -57,30 +53,15 @@ import java.util.Map;
 public class FlinkMultiOutputDoFnFunction<IN, OUT> extends 
RichMapPartitionFunction<IN, RawUnionValue> {
 
   private final DoFn<IN, OUT> doFn;
-  private transient PipelineOptions options;
+  private final SerializedPipelineOptions serializedPipelineOptions;
   private final Map<TupleTag<?>, Integer> outputMap;
 
   public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions 
options, Map<TupleTag<?>, Integer> outputMap) {
     this.doFn = doFn;
-    this.options = options;
+    this.serializedPipelineOptions = new SerializedPipelineOptions(options);
     this.outputMap = outputMap;
   }
 
-  private void writeObject(ObjectOutputStream out)
-      throws IOException, ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, options);
-  }
-
-  private void readObject(ObjectInputStream in)
-      throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    options = mapper.readValue(in, PipelineOptions.class);
-
-  }
-
   @Override
   public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) 
throws Exception {
     ProcessContext context = new ProcessContext(doFn, out);
@@ -129,7 +110,7 @@ public class FlinkMultiOutputDoFnFunction<IN, OUT> extends 
RichMapPartitionFunct
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return options;
+      return serializedPipelineOptions.getPipelineOptions();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 7439e02..2b35c31 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.flink.translation.utils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 import java.io.ByteArrayOutputStream;
@@ -30,9 +31,13 @@ import java.io.Serializable;
  */
 public class SerializedPipelineOptions implements Serializable {
 
-  private byte[] serializedOptions;
+  private final byte[] serializedOptions;
+
+  /** Lazily initialized copy of deserialized options */
+  private transient PipelineOptions pipelineOptions;
 
   public SerializedPipelineOptions(PipelineOptions options) {
+    Preconditions.checkNotNull(options, "PipelineOptions must not be null.");
 
     try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
       new ObjectMapper().writeValue(baos, options);
@@ -43,12 +48,16 @@ public class SerializedPipelineOptions implements 
Serializable {
 
   }
 
-  public PipelineOptions deserializeOptions() {
-    try {
-      return new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
-    } catch (IOException e) {
-      throw new RuntimeException("Couldn't deserialize the PipelineOptions.", 
e);
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the 
PipelineOptions.", e);
+      }
     }
+
+    return pipelineOptions;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
index c6a4160..2766a87 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -18,21 +18,16 @@
 
 package org.apache.beam.runners.flink.translation.wrappers;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 
 /**
@@ -43,7 +38,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
 
   private final Sink<T> sink;
 
-  private transient PipelineOptions pipelineOptions;
+  private final SerializedPipelineOptions serializedOptions;
 
   private Sink.WriteOperation<T, ?> writeOperation;
   private Sink.Writer<T, ?> writer;
@@ -52,7 +47,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
 
   public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions 
pipelineOptions) {
     this.sink = extractSink(transform);
-    this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
   }
 
   private Sink<T> extractSink(Write.Bound<T> transform) {
@@ -70,9 +65,9 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
 
   @Override
   public void configure(Configuration configuration) {
-    writeOperation = sink.createWriteOperation(pipelineOptions);
+    writeOperation = 
sink.createWriteOperation(serializedOptions.getPipelineOptions());
     try {
-      writeOperation.initialize(pipelineOptions);
+      writeOperation.initialize(serializedOptions.getPipelineOptions());
     } catch (Exception e) {
       throw new RuntimeException("Failed to initialize the write operation.", 
e);
     }
@@ -81,7 +76,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
   @Override
   public void open(int taskNumber, int numTasks) throws IOException {
     try {
-      writer = writeOperation.createWriter(pipelineOptions);
+      writer = 
writeOperation.createWriter(serializedOptions.getPipelineOptions());
     } catch (Exception e) {
       throw new IOException("Couldn't create writer.", e);
     }
@@ -110,15 +105,4 @@ public class SinkOutputFormat<T> implements 
OutputFormat<T> {
     }
   }
 
-  private void writeObject(ObjectOutputStream out) throws IOException, 
ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, pipelineOptions);
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 4b11abc..dc11c77 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -17,12 +17,11 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -31,9 +30,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 
@@ -47,32 +44,19 @@ public class SourceInputFormat<T> implements InputFormat<T, 
SourceInputSplit<T>>
   private final BoundedSource<T> initialSource;
 
   private transient PipelineOptions options;
-  private final byte[] serializedOptions;
+  private final SerializedPipelineOptions serializedOptions;
 
   private transient BoundedSource.BoundedReader<T> reader = null;
   private boolean inputAvailable = true;
 
   public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions 
options) {
     this.initialSource = initialSource;
-    this.options = options;
-
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      new ObjectMapper().writeValue(baos, options);
-      serializedOptions = baos.toByteArray();
-    } catch (Exception e) {
-      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-    }
-
+    this.serializedOptions = new SerializedPipelineOptions(options);
   }
 
   @Override
   public void configure(Configuration configuration) {
-    try {
-      options = new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
-    } catch (IOException e) {
-      throw new RuntimeException("Couldn't deserialize the PipelineOptions.", 
e);
-    }
+    options = serializedOptions.getPipelineOptions();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 8e7493e..8d9744f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.AbstractFlinkTimerInternals;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
@@ -29,7 +30,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -104,7 +104,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
 
   private static final long serialVersionUID = 1L;
 
-  private transient PipelineOptions options;
+  private SerializedPipelineOptions serializedOptions;
 
   private transient CoderRegistry coderRegistry;
 
@@ -236,7 +236,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
                                         Combine.KeyedCombineFn<K, VIN, VACC, 
VOUT> combiner) {
     Preconditions.checkNotNull(options);
 
-    this.options = Preconditions.checkNotNull(options);
+    this.serializedOptions = new 
SerializedPipelineOptions(Preconditions.checkNotNull(options));
     this.coderRegistry = Preconditions.checkNotNull(registry);
     this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, 
VIN>) input.getCoder();
     this.windowingStrategy = 
Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
@@ -477,52 +477,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      // TODO: PipelineOptions need to be available on the workers.
-      // Ideally they are captured as part of the pipeline.
-      // For now, construct empty options so that 
StateContexts.createFromComponents
-      // will yield a valid StateContext, which is needed to support the 
StateContext.window().
-      if (options == null) {
-        options = new PipelineOptions() {
-          @Override
-          public <T extends PipelineOptions> T as(Class<T> kls) {
-            return null;
-          }
-
-          @Override
-          public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
-            return null;
-          }
-
-          @Override
-          public Class<? extends PipelineRunner<?>> getRunner() {
-            return null;
-          }
-
-          @Override
-          public void setRunner(Class<? extends PipelineRunner<?>> kls) {
-
-          }
-
-          @Override
-          public CheckEnabled getStableUniqueNames() {
-            return null;
-          }
-
-          @Override
-          public void setStableUniqueNames(CheckEnabled enabled) {
-          }
-
-          @Override
-          public String getTempLocation() {
-            return null;
-          }
-
-          @Override
-          public void setTempLocation(String tempLocation) {
-          }
-        };
-      }
-      return options;
+      return serializedOptions.getPipelineOptions();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/43b5ec74/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 5be34e6..9d15a33 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -25,8 +26,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -34,10 +33,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.joda.time.Instant;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
 /**
  * A wrapper for Beam's unbounded sources. This class wraps around a source 
implementing the
  * {@link org.apache.beam.sdk.io.Read.Unbounded}  interface.
@@ -54,15 +49,14 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
 
   private volatile boolean isRunning = false;
 
-  /** Serialized using custom Java serialization via Jackson */
-  private transient PipelineOptions pipelineOptions;
+  private final SerializedPipelineOptions serializedOptions;
 
   /** Instantiated during runtime **/
   private transient UnboundedSource.UnboundedReader<T> reader;
 
   public UnboundedSourceWrapper(PipelineOptions pipelineOptions, 
Read.Unbounded<T> transform) {
     this.name = transform.getName();
-    this.pipelineOptions = pipelineOptions;
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
     this.source = transform.getSource();
   }
 
@@ -91,7 +85,7 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
 
     isRunning = true;
 
-    reader = source.createReader(pipelineOptions, null);
+    reader = source.createReader(serializedOptions.getPipelineOptions(), null);
 
     boolean inputAvailable = reader.start();
 
@@ -156,18 +150,4 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
     return System.currentTimeMillis() + watermarkInterval;
   }
 
-
-  // Special serialization of the PipelineOptions necessary to instantiate the 
reader.
-  private void writeObject(ObjectOutputStream out) throws IOException, 
ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, pipelineOptions);
-  }
-
-  // Special deserialization of the PipelineOptions necessary to instantiate 
the reader.
-  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
-  }
 }

Reply via email to