This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4836937  [BEAM-11146] Add fasterCopy option to Flink runner (#13240)
4836937 is described below

commit 4836937762a026bf0ce7a681e7af513d124bc26f
Author: Teodor Spæren <[email protected]>
AuthorDate: Tue Nov 3 14:06:55 2020 +0100

    [BEAM-11146] Add fasterCopy option to Flink runner (#13240)
    
    * [BEAM-11146] Add copyFaster option to Flink runner
    
    The copyFaster option, removes an unnecessary deep copy in the Flink
    runner between each operator. This should lead to improved performance
    in all cases.
    
    * Add defaults() function to FlinkPipelineOptions
---
 CHANGES.md                                         |   1 +
 .../translation/types/CoderTypeSerializer.java     |  31 ++--
 .../translation/types/CoderTypeSerializerTest.java |   6 +-
 .../FlinkBatchPortablePipelineTranslator.java      |  14 +-
 .../flink/FlinkBatchTransformTranslators.java      |  13 +-
 .../flink/FlinkBatchTranslationContext.java        |   2 +-
 .../beam/runners/flink/FlinkPipelineOptions.java   |  12 ++
 .../org/apache/beam/runners/flink/FlinkRunner.java |   5 +
 .../FlinkStreamingPortablePipelineTranslator.java  |  43 ++++--
 .../flink/FlinkStreamingTransformTranslators.java  |  53 +++++--
 .../flink/FlinkStreamingTranslationContext.java    |   2 +-
 .../apache/beam/runners/flink/TestFlinkRunner.java |   3 +-
 .../translation/types/CoderTypeInformation.java    |  11 +-
 .../wrappers/streaming/DoFnOperator.java           |  35 +++--
 .../streaming/ExecutableStageDoFnOperator.java     |  13 +-
 .../streaming/KvToByteBufferKeySelector.java       |   7 +-
 .../streaming/io/UnboundedSourceWrapper.java       |   4 +-
 .../streaming/stableinput/BufferingDoFnRunner.java |  12 +-
 .../state/FlinkBroadcastStateInternals.java        |  45 ++++--
 .../streaming/state/FlinkStateInternals.java       |  95 ++++++++-----
 .../flink/FlinkExecutionEnvironmentsTest.java      |  57 ++++----
 .../FlinkPipelineExecutionEnvironmentTest.java     |  21 ++-
 .../runners/flink/FlinkPipelineOptionsTest.java    |  10 +-
 .../flink/FlinkRequiresStableInputTest.java        |   3 +-
 .../apache/beam/runners/flink/FlinkRunnerTest.java |   3 +-
 .../beam/runners/flink/FlinkSavepointTest.java     |   3 +-
 .../beam/runners/flink/FlinkSubmissionTest.java    |   2 +-
 .../runners/flink/FlinkTransformOverridesTest.java |   3 +-
 .../FlinkBroadcastStateInternalsTest.java          |   7 +-
 .../flink/streaming/FlinkStateInternalsTest.java   |  18 ++-
 .../flink/streaming/GroupByWithNullValuesTest.java |   3 +-
 .../wrappers/streaming/DoFnOperatorTest.java       | 156 ++++++++++++++-------
 .../streaming/ExecutableStageDoFnOperatorTest.java |  67 ++++++---
 .../wrappers/streaming/WindowDoFnOperatorTest.java |   9 +-
 .../streaming/io/UnboundedSourceWrapperTest.java   |  10 +-
 .../stableinput/BufferingDoFnRunnerTest.java       |   5 +-
 .../shortcodes/flink_java_pipeline_options.html    |   5 +
 .../shortcodes/flink_python_pipeline_options.html  |   5 +
 38 files changed, 521 insertions(+), 273 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c288da2..e9b1085 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 * Added support for json payload format in Beam SQL Kafka Table 
([BEAM-10893](https://issues.apache.org/jira/browse/BEAM-10893))
 * Added support for protobuf payload format in Beam SQL Kafka Table 
([BEAM-10892](https://issues.apache.org/jira/browse/BEAM-10892))
 * Added support for avro payload format in Beam SQL Pubsub Table 
([BEAM-10857](https://issues.apache.org/jira/browse/BEAM-10857))
+* Added option to disable unnecessary copying between operators in Flink 
Runner (Java) ([BEAM-11146](https://issues.apache.org/jira/browse/BEAM-11146))
 * X feature added (Java/Python) 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
 ## Breaking Changes
diff --git 
a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
 
b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index 04f4bae..758ccc0 100644
--- 
a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ 
b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation.types;
 import java.io.EOFException;
 import java.io.IOException;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -49,20 +50,18 @@ public class CoderTypeSerializer<T> extends 
TypeSerializer<T> {
    * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
    * org.apache.beam.sdk.transforms.Reshuffle} translation.
    */
-  @SuppressWarnings("unused")
-  private final @Nullable SerializablePipelineOptions pipelineOptions;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public CoderTypeSerializer(Coder<T> coder) {
-    Preconditions.checkNotNull(coder);
-    this.coder = coder;
-    this.pipelineOptions = null;
-  }
+  private final boolean fasterCopy;
 
-  public CoderTypeSerializer(
-      Coder<T> coder, @Nullable SerializablePipelineOptions pipelineOptions) {
+  public CoderTypeSerializer(Coder<T> coder, SerializablePipelineOptions 
pipelineOptions) {
     Preconditions.checkNotNull(coder);
+    Preconditions.checkNotNull(pipelineOptions);
     this.coder = coder;
     this.pipelineOptions = pipelineOptions;
+
+    FlinkPipelineOptions options = 
pipelineOptions.get().as(FlinkPipelineOptions.class);
+    this.fasterCopy = options.getFasterCopy();
   }
 
   @Override
@@ -72,7 +71,7 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> 
{
 
   @Override
   public CoderTypeSerializer<T> duplicate() {
-    return new CoderTypeSerializer<>(coder);
+    return new CoderTypeSerializer<>(coder, pipelineOptions);
   }
 
   @Override
@@ -82,10 +81,14 @@ public class CoderTypeSerializer<T> extends 
TypeSerializer<T> {
 
   @Override
   public T copy(T t) {
-    try {
-      return CoderUtils.clone(coder, t);
-    } catch (CoderException e) {
-      throw new RuntimeException("Could not clone.", e);
+    if (fasterCopy) {
+      return t;
+    } else {
+      try {
+        return CoderUtils.clone(coder, t);
+      } catch (CoderException e) {
+        throw new RuntimeException("Could not clone.", e);
+      }
     }
   }
 
diff --git 
a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
 
b/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
index e88f5cc..286fd01 100644
--- 
a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
+++ 
b/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
@@ -24,9 +24,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.junit.Test;
@@ -59,7 +61,9 @@ public class CoderTypeSerializerTest implements Serializable {
   }
 
   private void testWriteAndReadConfigSnapshot(Coder<String> coder) throws 
IOException {
-    CoderTypeSerializer<String> serializer = new CoderTypeSerializer<>(coder);
+    CoderTypeSerializer<String> serializer =
+        new CoderTypeSerializer<>(
+            coder, new 
SerializablePipelineOptions(PipelineOptionsFactory.create()));
 
     TypeSerializerSnapshot writtenSnapshot = 
serializer.snapshotConfiguration();
     ComparatorTestBase.TestOutputView outView = new 
ComparatorTestBase.TestOutputView();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index d8989d4..fe1d762 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -312,7 +312,8 @@ public class FlinkBatchPortablePipelineTranslator
       unionCoders.add(coder);
     }
     UnionCoder unionCoder = UnionCoder.of(unionCoders);
-    TypeInformation<RawUnionValue> typeInformation = new 
CoderTypeInformation<>(unionCoder);
+    TypeInformation<RawUnionValue> typeInformation =
+        new CoderTypeInformation<>(unionCoder, context.getPipelineOptions());
 
     RunnerApi.ExecutableStagePayload stagePayload;
     try {
@@ -423,7 +424,8 @@ public class FlinkBatchPortablePipelineTranslator
               .returns(
                   new CoderTypeInformation<>(
                       WindowedValue.getFullCoder(
-                          (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE)));
+                          (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+                      context.getPipelineOptions()));
     } else {
       for (String pCollectionId : allInputs.values()) {
         DataSet<WindowedValue<T>> current = 
context.getDataSetOrThrow(pCollectionId);
@@ -497,7 +499,7 @@ public class FlinkBatchPortablePipelineTranslator
             windowingStrategy.getWindowFn().windowCoder());
 
     TypeInformation<WindowedValue<KV<K, List<V>>>> partialReduceTypeInfo =
-        new CoderTypeInformation<>(outputCoder);
+        new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());
 
     Grouping<WindowedValue<KV<K, V>>> inputGrouping =
         inputDataSet.groupBy(new 
KvKeySelector<>(inputElementCoder.getKeyCoder()));
@@ -538,7 +540,8 @@ public class FlinkBatchPortablePipelineTranslator
       PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
     TypeInformation<WindowedValue<byte[]>> typeInformation =
         new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+            context.getPipelineOptions());
     DataSource<WindowedValue<byte[]>> dataSource =
         new DataSource<>(
                 context.getExecutionEnvironment(),
@@ -613,7 +616,8 @@ public class FlinkBatchPortablePipelineTranslator
       Coder<WindowedValue<?>> outputCoder,
       String transformName,
       String collectionId) {
-    TypeInformation<WindowedValue<?>> outputType = new 
CoderTypeInformation<>(outputCoder);
+    TypeInformation<WindowedValue<?>> outputType =
+        new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());
     FlinkExecutableStagePruningFunction pruningFunction =
         new FlinkExecutableStagePruningFunction(unionTag, 
context.getPipelineOptions());
     FlatMapOperator<RawUnionValue, WindowedValue<?>> pruningOperator =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index aac9c94..b0464be 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -300,7 +300,8 @@ class FlinkBatchTransformTranslators {
               WindowedValue.getFullCoder(
                   KvCoder.of(
                       inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getValueCoder())),
-                  windowingStrategy.getWindowFn().windowCoder()));
+                  windowingStrategy.getWindowFn().windowCoder()),
+              context.getPipelineOptions());
       final DataSet<WindowedValue<KV<K, Iterable<InputT>>>> outputDataSet =
           new GroupReduceOperator<>(
                   inputGrouping,
@@ -349,7 +350,8 @@ class FlinkBatchTransformTranslators {
           new CoderTypeInformation<>(
               WindowedValue.getFullCoder(
                   KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-                  windowingStrategy.getWindowFn().windowCoder()));
+                  windowingStrategy.getWindowFn().windowCoder()),
+              context.getPipelineOptions());
 
       Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
           inputDataSet.groupBy(new KvKeySelector<>(inputCoder.getKeyCoder()));
@@ -693,8 +695,8 @@ class FlinkBatchTransformTranslators {
 
       TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
           new CoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  unionCoder, windowingStrategy.getWindowFn().windowCoder()));
+              WindowedValue.getFullCoder(unionCoder, 
windowingStrategy.getWindowFn().windowCoder()),
+              context.getPipelineOptions());
 
       List<PCollectionView<?>> sideInputs;
       try {
@@ -842,7 +844,8 @@ class FlinkBatchTransformTranslators {
                 .returns(
                     new CoderTypeInformation<>(
                         WindowedValue.getFullCoder(
-                            (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE)));
+                            (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+                        context.getPipelineOptions()));
       } else {
         for (PValue taggedPc : allInputs.values()) {
           checkArgument(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 75ee568..68114bf 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -143,7 +143,7 @@ class FlinkBatchTranslationContext {
     WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
         WindowedValue.getFullCoder(coder, 
windowingStrategy.getWindowFn().windowCoder());
 
-    return new CoderTypeInformation<>(windowedValueCoder);
+    return new CoderTypeInformation<>(windowedValueCoder, options);
   }
 
   Map<TupleTag<?>, PCollection<?>> getInputs(PTransform<?, ?> transform) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 6c42e2f..9afaa0b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 
 /**
@@ -274,4 +275,15 @@ public interface FlinkPipelineOptions
   Boolean getReIterableGroupByKeyResult();
 
   void setReIterableGroupByKeyResult(Boolean reIterableGroupByKeyResult);
+
+  @Description(
+      "Remove unneeded deep copy between operators. See 
https://issues.apache.org/jira/browse/BEAM-11146";)
+  @Default.Boolean(false)
+  Boolean getFasterCopy();
+
+  void setFasterCopy(Boolean fasterCopy);
+
+  static FlinkPipelineOptions defaults() {
+    return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+  }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 3635954..e97930b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -79,6 +79,11 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
 
     LOG.info("Executing pipeline using FlinkRunner.");
 
+    if (!options.getFasterCopy()) {
+      LOG.warn(
+          "For maximum performance you should set the 'fasterCopy' option. See 
more at https://issues.apache.org/jira/browse/BEAM-11146";);
+    }
+
     FlinkPipelineExecutionEnvironment env = new 
FlinkPipelineExecutionEnvironment(options);
 
     LOG.info("Translating pipeline to Flink program.");
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 6eabacb..9ebba24 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -299,7 +299,8 @@ public class FlinkStreamingPortablePipelineTranslator
               .returns(
                   new CoderTypeInformation<>(
                       WindowedValue.getFullCoder(
-                          (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE)));
+                          (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+                      context.getPipelineOptions()));
       
context.addDataStream(Iterables.getOnlyElement(transform.getOutputsMap().values()),
 result);
     } else {
       DataStream<T> result = null;
@@ -402,7 +403,7 @@ public class FlinkStreamingPortablePipelineTranslator
         WindowedValue.getFullCoder(workItemCoder, 
windowingStrategy.getWindowFn().windowCoder());
 
     CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, V>>> 
workItemTypeInfo =
-        new CoderTypeInformation<>(windowedWorkItemCoder);
+        new CoderTypeInformation<>(windowedWorkItemCoder, 
context.getPipelineOptions());
 
     DataStream<WindowedValue<SingletonKeyedWorkItem<K, V>>> workItemStream =
         inputDataStream
@@ -429,7 +430,7 @@ public class FlinkStreamingPortablePipelineTranslator
             windowingStrategy.getWindowFn().windowCoder());
 
     TypeInformation<WindowedValue<KV<K, Iterable<V>>>> outputTypeInfo =
-        new CoderTypeInformation<>(outputCoder);
+        new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());
 
     TupleTag<KV<K, Iterable<V>>> mainTag = new TupleTag<>("main output");
 
@@ -440,7 +441,10 @@ public class FlinkStreamingPortablePipelineTranslator
             (Coder) windowedWorkItemCoder,
             mainTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory(mainTag, 
outputCoder),
+            new DoFnOperator.MultiOutputOutputManagerFactory(
+                mainTag,
+                outputCoder,
+                new SerializablePipelineOptions(context.getPipelineOptions())),
             windowingStrategy,
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
@@ -497,7 +501,8 @@ public class FlinkStreamingPortablePipelineTranslator
     Coder<WindowedValue<T>> windowCoder =
         instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo = new 
CoderTypeInformation<>(windowCoder);
+    TypeInformation<WindowedValue<T>> outputTypeInfo =
+        new CoderTypeInformation<>(windowCoder, pipelineOptions);
 
     WindowingStrategy windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
@@ -506,7 +511,8 @@ public class FlinkStreamingPortablePipelineTranslator
             WindowedValue.getFullCoder(
                 ValueWithRecordId.ValueWithRecordIdCoder.of(
                     ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()));
+                windowStrategy.getWindowFn().windowCoder()),
+            pipelineOptions);
 
     UnboundedSource unboundedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
 
@@ -547,7 +553,8 @@ public class FlinkStreamingPortablePipelineTranslator
 
     TypeInformation<WindowedValue<byte[]>> typeInfo =
         new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+            context.getPipelineOptions());
 
     long shutdownAfterIdleSourcesMs = 
context.getPipelineOptions().getShutdownSourcesAfterIdleMs();
     SingleOutputStreamOperator<WindowedValue<byte[]>> source =
@@ -575,7 +582,8 @@ public class FlinkStreamingPortablePipelineTranslator
 
     TypeInformation<WindowedValue<byte[]>> typeInfo =
         new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+            context.getPipelineOptions());
 
     ObjectMapper objectMapper = new ObjectMapper();
     final int intervalMillis;
@@ -641,7 +649,7 @@ public class FlinkStreamingPortablePipelineTranslator
       outputCoders.put(localOutputName, windowCoder);
       TupleTag<?> tupleTag = new TupleTag<>(localOutputName);
       CoderTypeInformation<WindowedValue<?>> typeInformation =
-          new CoderTypeInformation(windowCoder);
+          new CoderTypeInformation(windowCoder, context.getPipelineOptions());
       tagsToOutputTags.put(tupleTag, new OutputTag<>(localOutputName, 
typeInformation));
       tagsToCoders.put(tupleTag, windowCoder);
       tagsToIds.put(tupleTag, outputIndexMap.get(localOutputName));
@@ -654,7 +662,8 @@ public class FlinkStreamingPortablePipelineTranslator
 
     CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
         (!outputs.isEmpty())
-            ? new CoderTypeInformation(outputCoders.get(mainOutputTag.getId()))
+            ? new CoderTypeInformation(
+                outputCoders.get(mainOutputTag.getId()), 
context.getPipelineOptions())
             : null;
 
     ArrayList<TupleTag<?>> additionalOutputTags = Lists.newArrayList();
@@ -684,13 +693,19 @@ public class FlinkStreamingPortablePipelineTranslator
                 valueCoder.getClass().getSimpleName()));
       }
       keyCoder = ((KvCoder) valueCoder).getKeyCoder();
-      keySelector = new KvToByteBufferKeySelector(keyCoder);
+      keySelector =
+          new KvToByteBufferKeySelector(
+              keyCoder, new 
SerializablePipelineOptions(context.getPipelineOptions()));
       inputDataStream = inputDataStream.keyBy(keySelector);
     }
 
     DoFnOperator.MultiOutputOutputManagerFactory<OutputT> outputManagerFactory 
=
         new DoFnOperator.MultiOutputOutputManagerFactory<>(
-            mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds);
+            mainOutputTag,
+            tagsToOutputTags,
+            tagsToCoders,
+            tagsToIds,
+            new SerializablePipelineOptions(context.getPipelineOptions()));
 
     DoFnOperator<InputT, OutputT> doFnOperator =
         new ExecutableStageDoFnOperator<>(
@@ -794,7 +809,7 @@ public class FlinkStreamingPortablePipelineTranslator
             .addSource(
                 new TestStreamSource<>(
                     testStreamDecoder, 
transform.getSpec().getPayload().toByteArray()),
-                new CoderTypeInformation<>(coder));
+                new CoderTypeInformation<>(coder, 
context.getPipelineOptions()));
 
     context.addDataStream(outputPCollectionId, source);
   }
@@ -903,7 +918,7 @@ public class FlinkStreamingPortablePipelineTranslator
     UnionCoder unionCoder = UnionCoder.of(viewCoders);
 
     CoderTypeInformation<RawUnionValue> unionTypeInformation =
-        new CoderTypeInformation<>(unionCoder);
+        new CoderTypeInformation<>(unionCoder, context.getPipelineOptions());
 
     // transform each side input to RawUnionValue and union them
     DataStream<RawUnionValue> sideInputUnion = null;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 68af4fb..dbf5114 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -198,7 +198,8 @@ class FlinkStreamingTransformTranslators {
           new CoderTypeInformation<>(
               WindowedValue.getFullCoder(
                   ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
-                  output.getWindowingStrategy().getWindowFn().windowCoder()));
+                  output.getWindowingStrategy().getWindowFn().windowCoder()),
+              context.getPipelineOptions());
 
       UnboundedSource<T, ?> rawSource;
       try {
@@ -296,7 +297,8 @@ class FlinkStreamingTransformTranslators {
 
       TypeInformation<WindowedValue<byte[]>> typeInfo =
           new CoderTypeInformation<>(
-              WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+              WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+              context.getPipelineOptions());
 
       long shutdownAfterIdleSourcesMs =
           context
@@ -434,7 +436,7 @@ class FlinkStreamingTransformTranslators {
     UnionCoder unionCoder = UnionCoder.of(inputCoders);
 
     CoderTypeInformation<RawUnionValue> unionTypeInformation =
-        new CoderTypeInformation<>(unionCoder);
+        new CoderTypeInformation<>(unionCoder, context.getPipelineOptions());
 
     // transform each side input to RawUnionValue and union them
     DataStream<RawUnionValue> sideInputUnion = null;
@@ -544,7 +546,9 @@ class FlinkStreamingTransformTranslators {
         // Based on the fact that the signature is stateful, DoFnSignatures 
ensures
         // that it is also keyed
         keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
-        keySelector = new KvToByteBufferKeySelector(keyCoder);
+        keySelector =
+            new KvToByteBufferKeySelector(
+                keyCoder, new 
SerializablePipelineOptions(context.getPipelineOptions()));
         inputDataStream = inputDataStream.keyBy(keySelector);
         stateful = true;
       } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
@@ -556,7 +560,8 @@ class FlinkStreamingTransformTranslators {
 
       CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
           new CoderTypeInformation<>(
-              context.getWindowedInputCoder((PCollection<OutputT>) 
outputs.get(mainOutputTag)));
+              context.getWindowedInputCoder((PCollection<OutputT>) 
outputs.get(mainOutputTag)),
+              context.getPipelineOptions());
 
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT> doFnOperator =
@@ -732,7 +737,11 @@ class FlinkStreamingTransformTranslators {
                   mainOutputTag1,
                   additionalOutputTags1,
                   new DoFnOperator.MultiOutputOutputManagerFactory<>(
-                      mainOutputTag1, tagsToOutputTags, tagsToCoders, 
tagsToIds),
+                      mainOutputTag1,
+                      tagsToOutputTags,
+                      tagsToCoders,
+                      tagsToIds,
+                      new 
SerializablePipelineOptions(context.getPipelineOptions())),
                   windowingStrategy,
                   transformedSideInputs,
                   sideInputs1,
@@ -793,7 +802,11 @@ class FlinkStreamingTransformTranslators {
                   mainOutputTag,
                   additionalOutputTags,
                   new DoFnOperator.MultiOutputOutputManagerFactory<>(
-                      mainOutputTag, tagsToOutputTags, tagsToCoders, 
tagsToIds),
+                      mainOutputTag,
+                      tagsToOutputTags,
+                      tagsToCoders,
+                      tagsToIds,
+                      new 
SerializablePipelineOptions(context.getPipelineOptions())),
                   windowingStrategy,
                   transformedSideInputs,
                   sideInputs,
@@ -904,7 +917,7 @@ class FlinkStreamingTransformTranslators {
                   workItemCoder, 
input.getWindowingStrategy().getWindowFn().windowCoder());
 
       CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
-          new CoderTypeInformation<>(windowedWorkItemCoder);
+          new CoderTypeInformation<>(windowedWorkItemCoder, 
context.getPipelineOptions());
 
       DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
           inputDataStream
@@ -936,7 +949,10 @@ class FlinkStreamingTransformTranslators {
               (Coder) windowedWorkItemCoder,
               mainTag,
               Collections.emptyList(),
-              new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+              new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                  mainTag,
+                  outputCoder,
+                  new 
SerializablePipelineOptions(context.getPipelineOptions())),
               windowingStrategy,
               new HashMap<>(), /* side-input mapping */
               Collections.emptyList(), /* side inputs */
@@ -1004,7 +1020,7 @@ class FlinkStreamingTransformTranslators {
                   workItemCoder, 
input.getWindowingStrategy().getWindowFn().windowCoder());
 
       CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
-          new CoderTypeInformation<>(windowedWorkItemCoder);
+          new CoderTypeInformation<>(windowedWorkItemCoder, 
context.getPipelineOptions());
 
       DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
           inputDataStream
@@ -1039,7 +1055,10 @@ class FlinkStreamingTransformTranslators {
                 (Coder) windowedWorkItemCoder,
                 mainTag,
                 Collections.emptyList(),
-                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                    mainTag,
+                    outputCoder,
+                    new 
SerializablePipelineOptions(context.getPipelineOptions())),
                 windowingStrategy,
                 new HashMap<>(), /* side-input mapping */
                 Collections.emptyList(), /* side inputs */
@@ -1067,7 +1086,10 @@ class FlinkStreamingTransformTranslators {
                 (Coder) windowedWorkItemCoder,
                 mainTag,
                 Collections.emptyList(),
-                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                    mainTag,
+                    outputCoder,
+                    new 
SerializablePipelineOptions(context.getPipelineOptions())),
                 windowingStrategy,
                 transformSideInputs.f0,
                 sideInputs,
@@ -1137,7 +1159,7 @@ class FlinkStreamingTransformTranslators {
           windowedWorkItemCoder = 
WindowedValue.getValueOnlyCoder(workItemCoder);
 
       CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
-          new CoderTypeInformation<>(windowedWorkItemCoder);
+          new CoderTypeInformation<>(windowedWorkItemCoder, 
context.getPipelineOptions());
 
       DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = 
context.getInputDataStream(input);
 
@@ -1220,7 +1242,8 @@ class FlinkStreamingTransformTranslators {
                 .returns(
                     new CoderTypeInformation<>(
                         WindowedValue.getFullCoder(
-                            (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE)));
+                            (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+                        context.getPipelineOptions()));
         context.setOutputDataStream(context.getOutput(transform), result);
 
       } else {
@@ -1386,7 +1409,7 @@ class FlinkStreamingTransformTranslators {
               .getExecutionEnvironment()
               .addSource(
                   new TestStreamSource<>(testStreamDecoder, payload),
-                  new CoderTypeInformation<>(elementCoder));
+                  new CoderTypeInformation<>(elementCoder, 
context.getPipelineOptions()));
 
       context.setOutputDataStream(context.getOutput(testStream), source);
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 75c2a20..ca98b6c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -115,7 +115,7 @@ class FlinkStreamingTranslationContext {
         WindowedValue.getFullCoder(
             valueCoder, 
collection.getWindowingStrategy().getWindowFn().windowCoder());
 
-    return new CoderTypeInformation<>(windowedValueCoder);
+    return new CoderTypeInformation<>(windowedValueCoder, options);
   }
 
   public AppliedPTransform<?, ?, ?> getCurrentTransform() {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index d9a99a6..1808470 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -22,7 +22,6 @@ import 
org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.UserCodeException;
 
 /** Test Flink runner. */
@@ -46,7 +45,7 @@ public class TestFlinkRunner extends 
PipelineRunner<PipelineResult> {
   }
 
   public static TestFlinkRunner create(boolean streaming) {
-    FlinkPipelineOptions flinkOptions = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions flinkOptions = FlinkPipelineOptions.defaults();
     flinkOptions.setStreaming(streaming);
     return TestFlinkRunner.fromOptions(flinkOptions);
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 10101cb..99f4617 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -36,16 +36,11 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 public class CoderTypeInformation<T> extends TypeInformation<T> implements 
AtomicType<T> {
 
   private final Coder<T> coder;
-  private final @Nullable SerializablePipelineOptions pipelineOptions;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public CoderTypeInformation(Coder<T> coder) {
-    checkNotNull(coder);
-    this.coder = coder;
-    this.pipelineOptions = null;
-  }
-
-  private CoderTypeInformation(Coder<T> coder, PipelineOptions 
pipelineOptions) {
+  public CoderTypeInformation(Coder<T> coder, PipelineOptions pipelineOptions) 
{
     checkNotNull(coder);
+    checkNotNull(pipelineOptions);
     this.coder = coder;
     this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 448dacb..0ccb969 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -387,7 +387,8 @@ public class DoFnOperator<InputT, OutputT>
 
     ListStateDescriptor<WindowedValue<InputT>> pushedBackStateDescriptor =
         new ListStateDescriptor<>(
-            "pushed-back-elements", new 
CoderTypeSerializer<>(windowedInputCoder));
+            "pushed-back-elements",
+            new CoderTypeSerializer<>(windowedInputCoder, serializedOptions));
 
     if (keySelector != null) {
       pushedBackElementsHandler =
@@ -409,7 +410,9 @@ public class DoFnOperator<InputT, OutputT>
 
       FlinkBroadcastStateInternals sideInputStateInternals =
           new FlinkBroadcastStateInternals<>(
-              getContainingTask().getIndexInSubtaskGroup(), 
getOperatorStateBackend());
+              getContainingTask().getIndexInSubtaskGroup(),
+              getOperatorStateBackend(),
+              serializedOptions);
 
       sideInputHandler = new SideInputHandler(sideInputs, 
sideInputStateInternals);
       sideInputReader = sideInputHandler;
@@ -425,11 +428,13 @@ public class DoFnOperator<InputT, OutputT>
     // StatefulPardo or WindowDoFn
     if (keyCoder != null) {
       keyedStateInternals =
-          new FlinkStateInternals<>((KeyedStateBackend) 
getKeyedStateBackend(), keyCoder);
+          new FlinkStateInternals<>(
+              (KeyedStateBackend) getKeyedStateBackend(), keyCoder, 
serializedOptions);
 
       if (timerService == null) {
         timerService =
-            getInternalTimerService("beam-timer", new 
CoderTypeSerializer<>(timerCoder), this);
+            getInternalTimerService(
+                "beam-timer", new CoderTypeSerializer<>(timerCoder, 
serializedOptions), this);
       }
 
       timerInternals = new FlinkTimerInternals();
@@ -486,7 +491,8 @@ public class DoFnOperator<InputT, OutputT>
                   windowingStrategy.getWindowFn().windowCoder(),
                   getOperatorStateBackend(),
                   getKeyedStateBackend(),
-                  options.getNumConcurrentCheckpoints());
+                  options.getNumConcurrentCheckpoints(),
+                  serializedOptions);
     }
     doFnRunner = createWrappingDoFnRunner(doFnRunner, stepContext);
     earlyBindStateIfNeeded();
@@ -534,7 +540,7 @@ public class DoFnOperator<InputT, OutputT>
       if (doFn != null) {
         DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
         FlinkStateInternals.EarlyBinder earlyBinder =
-            new FlinkStateInternals.EarlyBinder(getKeyedStateBackend());
+            new FlinkStateInternals.EarlyBinder(getKeyedStateBackend(), 
serializedOptions);
         for (DoFnSignature.StateDeclaration value : 
signature.stateDeclarations().values()) {
           StateSpec<?> spec =
               (StateSpec<?>) 
signature.stateDeclarations().get(value.id()).field().get(doFn);
@@ -1194,29 +1200,35 @@ public class DoFnOperator<InputT, OutputT>
     private Map<TupleTag<?>, Integer> tagsToIds;
     private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
     private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;
+    private final SerializablePipelineOptions pipelineOptions;
 
     // There is no side output.
     @SuppressWarnings("unchecked")
     public MultiOutputOutputManagerFactory(
-        TupleTag<OutputT> mainTag, Coder<WindowedValue<OutputT>> mainCoder) {
+        TupleTag<OutputT> mainTag,
+        Coder<WindowedValue<OutputT>> mainCoder,
+        SerializablePipelineOptions pipelineOptions) {
       this(
           mainTag,
           new HashMap<>(),
           ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
               .put(mainTag, (Coder) mainCoder)
               .build(),
-          ImmutableMap.<TupleTag<?>, Integer>builder().put(mainTag, 
0).build());
+          ImmutableMap.<TupleTag<?>, Integer>builder().put(mainTag, 0).build(),
+          pipelineOptions);
     }
 
     public MultiOutputOutputManagerFactory(
         TupleTag<OutputT> mainTag,
         Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
         Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
-        Map<TupleTag<?>, Integer> tagsToIds) {
+        Map<TupleTag<?>, Integer> tagsToIds,
+        SerializablePipelineOptions pipelineOptions) {
       this.mainTag = mainTag;
       this.tagsToOutputTags = tagsToOutputTags;
       this.tagsToCoders = tagsToCoders;
       this.tagsToIds = tagsToIds;
+      this.pipelineOptions = pipelineOptions;
     }
 
     @Override
@@ -1231,7 +1243,8 @@ public class DoFnOperator<InputT, OutputT>
 
       TaggedKvCoder taggedKvCoder = buildTaggedKvCoder();
       ListStateDescriptor<KV<Integer, WindowedValue<?>>> 
taggedOutputPushbackStateDescriptor =
-          new ListStateDescriptor<>("bundle-buffer-tag", new 
CoderTypeSerializer<>(taggedKvCoder));
+          new ListStateDescriptor<>(
+              "bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder, 
pipelineOptions));
       ListState<KV<Integer, WindowedValue<?>>> listStateBuffer =
           
operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
       PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> 
pushedBackElementsHandler =
@@ -1292,7 +1305,7 @@ public class DoFnOperator<InputT, OutputT>
           new MapStateDescriptor<>(
               PENDING_TIMERS_STATE_NAME,
               new StringSerializer(),
-              new CoderTypeSerializer<>(timerCoder));
+              new CoderTypeSerializer<>(timerCoder, serializedOptions));
       this.pendingTimersById = 
getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor);
       populateOutputTimestampQueue();
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 08d7a20..d655cb9 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -54,6 +54,7 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.UserStateReference;
@@ -133,6 +134,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
   /** A lock which has to be acquired when concurrently accessing state and 
timers. */
   private final ReentrantLock stateBackendLock;
 
+  private final SerializablePipelineOptions pipelineOptions;
+
   private final boolean isStateful;
 
   private transient ExecutableStageContext stageContext;
@@ -197,6 +200,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     this.outputMap = outputMap;
     this.sideInputIds = sideInputIds;
     this.stateBackendLock = new ReentrantLock();
+    this.pipelineOptions = new SerializablePipelineOptions(options);
   }
 
   @Override
@@ -207,7 +211,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
   @Override
   public void open() throws Exception {
     executableStage = ExecutableStage.fromPayload(payload);
-    initializeUserState(executableStage, getKeyedStateBackend());
+    initializeUserState(executableStage, getKeyedStateBackend(), 
pipelineOptions);
     // TODO: Wire this into the distributed cache and make it pluggable.
     // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
     // It's a little strange because this operator is responsible for the 
lifetime of the stage
@@ -1022,7 +1026,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
    * Eagerly create the user state to work around 
https://jira.apache.org/jira/browse/FLINK-12653.
    */
   private static void initializeUserState(
-      ExecutableStage executableStage, @Nullable KeyedStateBackend 
keyedStateBackend) {
+      ExecutableStage executableStage,
+      @Nullable KeyedStateBackend keyedStateBackend,
+      SerializablePipelineOptions pipelineOptions) {
     executableStage
         .getUserStates()
         .forEach(
@@ -1031,7 +1037,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
                 keyedStateBackend.getOrCreateKeyedState(
                     StringSerializer.INSTANCE,
                     new ListStateDescriptor<>(
-                        ref.localName(), new 
CoderTypeSerializer<>(ByteStringCoder.of())));
+                        ref.localName(),
+                        new CoderTypeSerializer<>(ByteStringCoder.of(), 
pipelineOptions)));
               } catch (Exception e) {
                 throw new RuntimeException("Couldn't initialize user states.", 
e);
               }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
index 3e5e22c..68c891d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -35,9 +36,11 @@ public class KvToByteBufferKeySelector<K, V>
     implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>, 
ResultTypeQueryable<ByteBuffer> {
 
   private final Coder<K> keyCoder;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public KvToByteBufferKeySelector(Coder<K> keyCoder) {
+  public KvToByteBufferKeySelector(Coder<K> keyCoder, 
SerializablePipelineOptions pipelineOptions) {
     this.keyCoder = keyCoder;
+    this.pipelineOptions = pipelineOptions;
   }
 
   @Override
@@ -48,6 +51,6 @@ public class KvToByteBufferKeySelector<K, V>
 
   @Override
   public TypeInformation<ByteBuffer> getProducedType() {
-    return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of());
+    return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), 
pipelineOptions.get());
   }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 06729cd..9fae533 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -430,7 +430,9 @@ public class UnboundedSourceWrapper<OutputT, 
CheckpointMarkT extends UnboundedSo
     OperatorStateStore stateStore = context.getOperatorStateStore();
     @SuppressWarnings("unchecked")
     CoderTypeInformation<KV<? extends UnboundedSource<OutputT, 
CheckpointMarkT>, CheckpointMarkT>>
-        typeInformation = (CoderTypeInformation) new 
CoderTypeInformation<>(checkpointCoder);
+        typeInformation =
+            (CoderTypeInformation)
+                new CoderTypeInformation<>(checkpointCoder, 
serializedOptions.get());
     stateForCheckpoint =
         stateStore.getListState(
             new ListStateDescriptor<>(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 31709cd..b6a1ef2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -55,7 +56,8 @@ public class BufferingDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT,
       org.apache.beam.sdk.coders.Coder windowCoder,
       OperatorStateBackend operatorStateBackend,
       @Nullable KeyedStateBackend<Object> keyedStateBackend,
-      int maxConcurrentCheckpoints)
+      int maxConcurrentCheckpoints,
+      SerializablePipelineOptions pipelineOptions)
       throws Exception {
     return new BufferingDoFnRunner<>(
         doFnRunner,
@@ -64,7 +66,8 @@ public class BufferingDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT,
         windowCoder,
         operatorStateBackend,
         keyedStateBackend,
-        maxConcurrentCheckpoints);
+        maxConcurrentCheckpoints,
+        pipelineOptions);
   }
 
   /** The underlying DoFnRunner that any buffered data will be handed over to 
eventually. */
@@ -87,7 +90,8 @@ public class BufferingDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT,
       org.apache.beam.sdk.coders.Coder windowCoder,
       OperatorStateBackend operatorStateBackend,
       @Nullable KeyedStateBackend keyedStateBackend,
-      int maxConcurrentCheckpoints)
+      int maxConcurrentCheckpoints,
+      SerializablePipelineOptions pipelineOptions)
       throws Exception {
     Preconditions.checkArgument(
         maxConcurrentCheckpoints > 0 && maxConcurrentCheckpoints < 
Short.MAX_VALUE,
@@ -104,7 +108,7 @@ public class BufferingDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT,
               new ListStateDescriptor<>(
                   stateName + stateId,
                   new CoderTypeSerializer<>(
-                      new BufferedElements.Coder(inputCoder, windowCoder, 
null)));
+                      new BufferedElements.Coder(inputCoder, windowCoder, 
null), pipelineOptions));
           if (keyedStateBackend != null) {
             return KeyedBufferingElementsHandler.create(keyedStateBackend, 
stateDescriptor);
           } else {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 794a0e4..2c46c9e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -27,11 +27,13 @@ import java.util.Map;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
@@ -68,9 +70,15 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
   // stateName -> <namespace, state>
   private Map<String, Map<String, ?>> stateForNonZeroOperator;
 
-  public FlinkBroadcastStateInternals(int indexInSubtaskGroup, 
OperatorStateBackend stateBackend) {
+  private final SerializablePipelineOptions pipelineOptions;
+
+  public FlinkBroadcastStateInternals(
+      int indexInSubtaskGroup,
+      OperatorStateBackend stateBackend,
+      SerializablePipelineOptions pipelineOptions) {
     this.stateBackend = stateBackend;
     this.indexInSubtaskGroup = indexInSubtaskGroup;
+    this.pipelineOptions = pipelineOptions;
     if (indexInSubtaskGroup != 0) {
       stateForNonZeroOperator = new HashMap<>();
     }
@@ -91,13 +99,15 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
           @Override
           public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> 
address, Coder<T2> coder) {
 
-            return new FlinkBroadcastValueState<>(stateBackend, address, 
namespace, coder);
+            return new FlinkBroadcastValueState<>(
+                stateBackend, address, namespace, coder, 
pipelineOptions.get());
           }
 
           @Override
           public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, 
Coder<T2> elemCoder) {
 
-            return new FlinkBroadcastBagState<>(stateBackend, address, 
namespace, elemCoder);
+            return new FlinkBroadcastBagState<>(
+                stateBackend, address, namespace, elemCoder, 
pipelineOptions.get());
           }
 
           @Override
@@ -130,7 +140,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
             return new FlinkCombiningState<>(
-                stateBackend, address, combineFn, namespace, accumCoder);
+                stateBackend, address, combineFn, namespace, accumCoder, 
pipelineOptions.get());
           }
 
           @Override
@@ -175,14 +185,15 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
         OperatorStateBackend flinkStateBackend,
         String name,
         StateNamespace namespace,
-        Coder<T> coder) {
+        Coder<T> coder,
+        PipelineOptions pipelineOptions) {
       this.name = name;
 
       this.namespace = namespace;
       this.flinkStateBackend = flinkStateBackend;
 
       CoderTypeInformation<Map<String, T>> typeInfo =
-          new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder));
+          new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder), 
pipelineOptions);
 
       flinkStateDescriptor =
           new ListStateDescriptor<>(name, typeInfo.createSerializer(new 
ExecutionConfig()));
@@ -290,8 +301,9 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
         OperatorStateBackend flinkStateBackend,
         StateTag<ValueState<T>> address,
         StateNamespace namespace,
-        Coder<T> coder) {
-      super(flinkStateBackend, address.getId(), namespace, coder);
+        Coder<T> coder,
+        PipelineOptions pipelineOptions) {
+      super(flinkStateBackend, address.getId(), namespace, coder, 
pipelineOptions);
 
       this.namespace = namespace;
       this.address = address;
@@ -349,8 +361,9 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
         OperatorStateBackend flinkStateBackend,
         StateTag<BagState<T>> address,
         StateNamespace namespace,
-        Coder<T> coder) {
-      super(flinkStateBackend, address.getId(), namespace, 
ListCoder.of(coder));
+        Coder<T> coder,
+        PipelineOptions pipelineOptions) {
+      super(flinkStateBackend, address.getId(), namespace, 
ListCoder.of(coder), pipelineOptions);
 
       this.namespace = namespace;
       this.address = address;
@@ -436,8 +449,9 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
         StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
-        Coder<AccumT> accumCoder) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder);
+        Coder<AccumT> accumCoder,
+        PipelineOptions pipelineOptions) {
+      super(flinkStateBackend, address.getId(), namespace, accumCoder, 
pipelineOptions);
 
       this.namespace = namespace;
       this.address = address;
@@ -552,8 +566,9 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
-        FlinkBroadcastStateInternals<K2> flinkStateInternals) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder);
+        FlinkBroadcastStateInternals<K2> flinkStateInternals,
+        PipelineOptions pipelineOptions) {
+      super(flinkStateBackend, address.getId(), namespace, accumCoder, 
pipelineOptions);
 
       this.namespace = namespace;
       this.address = address;
@@ -687,7 +702,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
         Coder<AccumT> accumCoder,
         FlinkBroadcastStateInternals<K2> flinkStateInternals,
         CombineWithContext.Context context) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder);
+      super(flinkStateBackend, address.getId(), namespace, accumCoder, 
pipelineOptions.get());
 
       this.namespace = namespace;
       this.address = address;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index e2d85af..55f3628 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
 import org.apache.beam.sdk.coders.Coder;
@@ -96,16 +97,24 @@ public class FlinkStateInternals<K> implements 
StateInternals {
   // Watermark holds for all keys/windows of this partition, allows efficient 
lookup of the minimum
   private final TreeMultiset<Long> watermarkHolds = TreeMultiset.create();
   // State to persist combined watermark holds for all keys of this partition
-  private final MapStateDescriptor<String, Instant> 
watermarkHoldStateDescriptor =
-      new MapStateDescriptor<>(
-          "watermark-holds",
-          StringSerializer.INSTANCE,
-          new CoderTypeSerializer<>(InstantCoder.of()));
+  private final MapStateDescriptor<String, Instant> 
watermarkHoldStateDescriptor;
 
-  public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, 
Coder<K> keyCoder)
+  private final SerializablePipelineOptions pipelineOptions;
+
+  public FlinkStateInternals(
+      KeyedStateBackend<ByteBuffer> flinkStateBackend,
+      Coder<K> keyCoder,
+      SerializablePipelineOptions pipelineOptions)
       throws Exception {
     this.flinkStateBackend = flinkStateBackend;
     this.keyCoder = keyCoder;
+    watermarkHoldStateDescriptor =
+        new MapStateDescriptor<>(
+            "watermark-holds",
+            StringSerializer.INSTANCE,
+            new CoderTypeSerializer<>(InstantCoder.of(), pipelineOptions));
+    this.pipelineOptions = pipelineOptions;
+
     restoreWatermarkHoldsView();
   }
 
@@ -166,7 +175,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
     public <T2> ValueState<T2> bindValue(
         String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) {
       FlinkValueState<T2> valueState =
-          new FlinkValueState<>(flinkStateBackend, id, namespace, coder);
+          new FlinkValueState<>(flinkStateBackend, id, namespace, coder, 
pipelineOptions);
       collectGlobalWindowStateDescriptor(valueState.flinkStateDescriptor);
       return valueState;
     }
@@ -174,14 +183,15 @@ public class FlinkStateInternals<K> implements 
StateInternals {
     @Override
     public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec, 
Coder<T2> elemCoder) {
       FlinkBagState<Object, T2> bagState =
-          new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder);
+          new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder, 
pipelineOptions);
       collectGlobalWindowStateDescriptor(bagState.flinkStateDescriptor);
       return bagState;
     }
 
     @Override
     public <T2> SetState<T2> bindSet(String id, StateSpec<SetState<T2>> spec, 
Coder<T2> elemCoder) {
-      FlinkSetState<T2> setState = new FlinkSetState<>(flinkStateBackend, id, 
namespace, elemCoder);
+      FlinkSetState<T2> setState =
+          new FlinkSetState<>(flinkStateBackend, id, namespace, elemCoder, 
pipelineOptions);
       collectGlobalWindowStateDescriptor(setState.flinkStateDescriptor);
       return setState;
     }
@@ -193,7 +203,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         Coder<KeyT> mapKeyCoder,
         Coder<ValueT> mapValueCoder) {
       FlinkMapState<KeyT, ValueT> mapState =
-          new FlinkMapState<>(flinkStateBackend, id, namespace, mapKeyCoder, 
mapValueCoder);
+          new FlinkMapState<>(
+              flinkStateBackend, id, namespace, mapKeyCoder, mapValueCoder, 
pipelineOptions);
       collectGlobalWindowStateDescriptor(mapState.flinkStateDescriptor);
       return mapState;
     }
@@ -212,7 +223,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         Coder<AccumT> accumCoder,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
       FlinkCombiningState<Object, InputT, AccumT, OutputT> combiningState =
-          new FlinkCombiningState<>(flinkStateBackend, id, combineFn, 
namespace, accumCoder);
+          new FlinkCombiningState<>(
+              flinkStateBackend, id, combineFn, namespace, accumCoder, 
pipelineOptions);
       collectGlobalWindowStateDescriptor(combiningState.flinkStateDescriptor);
       return combiningState;
     }
@@ -231,7 +243,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
               combineFn,
               namespace,
               accumCoder,
-              CombineContextFactory.createFromStateContext(stateContext));
+              CombineContextFactory.createFromStateContext(stateContext),
+              pipelineOptions);
       
collectGlobalWindowStateDescriptor(combiningStateWithContext.flinkStateDescriptor);
       return combiningStateWithContext;
     }
@@ -263,13 +276,15 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         String stateId,
         StateNamespace namespace,
-        Coder<T> coder) {
+        Coder<T> coder,
+        SerializablePipelineOptions pipelineOptions) {
 
       this.namespace = namespace;
       this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
 
-      flinkStateDescriptor = new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(coder));
+      flinkStateDescriptor =
+          new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder, 
pipelineOptions));
     }
 
     @Override
@@ -347,14 +362,15 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         String stateId,
         StateNamespace namespace,
-        Coder<T> coder) {
+        Coder<T> coder,
+        SerializablePipelineOptions pipelineOptions) {
 
       this.namespace = namespace;
       this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
       this.storesVoidValues = coder instanceof VoidCoder;
       this.flinkStateDescriptor =
-          new ListStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder));
+          new ListStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder, 
pipelineOptions));
     }
 
     @Override
@@ -486,7 +502,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         String stateId,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
-        Coder<AccumT> accumCoder) {
+        Coder<AccumT> accumCoder,
+        SerializablePipelineOptions pipelineOptions) {
 
       this.namespace = namespace;
       this.stateId = stateId;
@@ -494,7 +511,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.flinkStateBackend = flinkStateBackend;
 
       flinkStateDescriptor =
-          new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(accumCoder));
+          new ValueStateDescriptor<>(
+              stateId, new CoderTypeSerializer<>(accumCoder, pipelineOptions));
     }
 
     @Override
@@ -649,7 +667,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
-        CombineWithContext.Context context) {
+        CombineWithContext.Context context,
+        SerializablePipelineOptions pipelineOptions) {
 
       this.namespace = namespace;
       this.stateId = stateId;
@@ -658,7 +677,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.context = context;
 
       flinkStateDescriptor =
-          new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(accumCoder));
+          new ValueStateDescriptor<>(
+              stateId, new CoderTypeSerializer<>(accumCoder, pipelineOptions));
     }
 
     @Override
@@ -934,15 +954,16 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         String stateId,
         StateNamespace namespace,
         Coder<KeyT> mapKeyCoder,
-        Coder<ValueT> mapValueCoder) {
+        Coder<ValueT> mapValueCoder,
+        SerializablePipelineOptions pipelineOptions) {
       this.namespace = namespace;
       this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateDescriptor =
           new MapStateDescriptor<>(
               stateId,
-              new CoderTypeSerializer<>(mapKeyCoder),
-              new CoderTypeSerializer<>(mapValueCoder));
+              new CoderTypeSerializer<>(mapKeyCoder, pipelineOptions),
+              new CoderTypeSerializer<>(mapValueCoder, pipelineOptions));
     }
 
     @Override
@@ -1120,13 +1141,14 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         String stateId,
         StateNamespace namespace,
-        Coder<T> coder) {
+        Coder<T> coder,
+        SerializablePipelineOptions pipelineOptions) {
       this.namespace = namespace;
       this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateDescriptor =
           new MapStateDescriptor<>(
-              stateId, new CoderTypeSerializer<>(coder), new 
BooleanSerializer());
+              stateId, new CoderTypeSerializer<>(coder, pipelineOptions), new 
BooleanSerializer());
     }
 
     @Override
@@ -1287,9 +1309,12 @@ public class FlinkStateInternals<K> implements 
StateInternals {
   public static class EarlyBinder implements StateBinder {
 
     private final KeyedStateBackend keyedStateBackend;
+    private final SerializablePipelineOptions pipelineOptions;
 
-    public EarlyBinder(KeyedStateBackend keyedStateBackend) {
+    public EarlyBinder(
+        KeyedStateBackend keyedStateBackend, SerializablePipelineOptions 
pipelineOptions) {
       this.keyedStateBackend = keyedStateBackend;
+      this.pipelineOptions = pipelineOptions;
     }
 
     @Override
@@ -1297,7 +1322,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       try {
         keyedStateBackend.getOrCreateKeyedState(
             StringSerializer.INSTANCE,
-            new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder)));
+            new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder, 
pipelineOptions)));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -1310,7 +1335,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       try {
         keyedStateBackend.getOrCreateKeyedState(
             StringSerializer.INSTANCE,
-            new ListStateDescriptor<>(id, new 
CoderTypeSerializer<>(elemCoder)));
+            new ListStateDescriptor<>(id, new CoderTypeSerializer<>(elemCoder, 
pipelineOptions)));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -1324,7 +1349,9 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         keyedStateBackend.getOrCreateKeyedState(
             StringSerializer.INSTANCE,
             new MapStateDescriptor<>(
-                id, new CoderTypeSerializer<>(elemCoder), 
VoidSerializer.INSTANCE));
+                id,
+                new CoderTypeSerializer<>(elemCoder, pipelineOptions),
+                VoidSerializer.INSTANCE));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -1342,8 +1369,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
             StringSerializer.INSTANCE,
             new MapStateDescriptor<>(
                 id,
-                new CoderTypeSerializer<>(mapKeyCoder),
-                new CoderTypeSerializer<>(mapValueCoder)));
+                new CoderTypeSerializer<>(mapKeyCoder, pipelineOptions),
+                new CoderTypeSerializer<>(mapValueCoder, pipelineOptions)));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -1366,7 +1393,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       try {
         keyedStateBackend.getOrCreateKeyedState(
             StringSerializer.INSTANCE,
-            new ValueStateDescriptor<>(id, new 
CoderTypeSerializer<>(accumCoder)));
+            new ValueStateDescriptor<>(id, new 
CoderTypeSerializer<>(accumCoder, pipelineOptions)));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -1383,7 +1410,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       try {
         keyedStateBackend.getOrCreateKeyedState(
             StringSerializer.INSTANCE,
-            new ValueStateDescriptor<>(id, new 
CoderTypeSerializer<>(accumCoder)));
+            new ValueStateDescriptor<>(id, new 
CoderTypeSerializer<>(accumCoder, pipelineOptions)));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -1399,7 +1426,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
             new MapStateDescriptor<>(
                 "watermark-holds",
                 StringSerializer.INSTANCE,
-                new CoderTypeSerializer<>(InstantCoder.of())));
+                new CoderTypeSerializer<>(InstantCoder.of(), 
pipelineOptions)));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index f2eb3b1..fc4686a 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -28,7 +28,6 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.util.Collections;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.RemoteEnvironment;
@@ -54,7 +53,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSetParallelismBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setParallelism(42);
 
@@ -68,7 +67,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSetParallelismStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setParallelism(42);
 
@@ -82,7 +81,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSetMaxParallelismStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setMaxParallelism(42);
 
@@ -98,7 +97,7 @@ public class FlinkExecutionEnvironmentsTest {
   public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
     String flinkConfDir = extractFlinkConfig();
 
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("host:80");
 
@@ -114,7 +113,7 @@ public class FlinkExecutionEnvironmentsTest {
   public void shouldInferParallelismFromEnvironmentStreaming() throws 
IOException {
     String confDir = extractFlinkConfig();
 
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("host:80");
 
@@ -128,7 +127,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldFallbackToDefaultParallelismBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("host:80");
 
@@ -142,7 +141,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldFallbackToDefaultParallelismStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("host:80");
 
@@ -156,7 +155,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void useDefaultParallelismFromContextBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
 
     ExecutionEnvironment bev =
@@ -170,7 +169,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void useDefaultParallelismFromContextStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
 
     StreamExecutionEnvironment sev =
@@ -184,7 +183,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldParsePortForRemoteEnvironmentBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setFlinkMaster("host:1234");
 
@@ -198,7 +197,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldParsePortForRemoteEnvironmentStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setFlinkMaster("host:1234");
 
@@ -212,7 +211,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setFlinkMaster("host");
 
@@ -226,7 +225,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setFlinkMaster("host");
 
@@ -240,7 +239,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldTreatAutoAndEmptyHostTheSameBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     ExecutionEnvironment sev =
@@ -258,7 +257,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     StreamExecutionEnvironment sev =
@@ -276,7 +275,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldDetectMalformedPortBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setFlinkMaster("host:p0rt");
 
@@ -288,7 +287,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldDetectMalformedPortStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setFlinkMaster("host:p0rt");
 
@@ -300,7 +299,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSupportIPv4Batch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     options.setFlinkMaster("192.168.1.1:1234");
@@ -318,7 +317,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSupportIPv4Streaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     options.setFlinkMaster("192.168.1.1:1234");
@@ -336,7 +335,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSupportIPv6Batch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
@@ -355,7 +354,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldSupportIPv6Streaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
@@ -374,7 +373,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldRemoveHttpProtocolFromHostBatch() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     for (String flinkMaster :
@@ -391,7 +390,7 @@ public class FlinkExecutionEnvironmentsTest {
 
   @Test
   public void shouldRemoveHttpProtocolFromHostStreaming() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
 
     for (String flinkMaster :
@@ -416,7 +415,7 @@ public class FlinkExecutionEnvironmentsTest {
   @Test
   public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() {
     // Checkpointing disabled, shut down sources immediately
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
     assertThat(options.getShutdownSourcesAfterIdleMs(), is(0L));
   }
@@ -424,7 +423,7 @@ public class FlinkExecutionEnvironmentsTest {
   @Test
   public void shouldAutoSetIdleSourcesFlagWithCheckpointing() {
     // Checkpointing is enabled, never shut down sources
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setCheckpointingInterval(1000L);
     FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
     assertThat(options.getShutdownSourcesAfterIdleMs(), is(Long.MAX_VALUE));
@@ -433,7 +432,7 @@ public class FlinkExecutionEnvironmentsTest {
   @Test
   public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() {
     // Checkpointing disabled, accept flag
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setShutdownSourcesAfterIdleMs(42L);
     FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
     assertThat(options.getShutdownSourcesAfterIdleMs(), is(42L));
@@ -442,7 +441,7 @@ public class FlinkExecutionEnvironmentsTest {
   @Test
   public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() {
     // Checkpointing enable, still accept flag
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setCheckpointingInterval(1000L);
     options.setShutdownSourcesAfterIdleMs(42L);
     FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
@@ -452,7 +451,7 @@ public class FlinkExecutionEnvironmentsTest {
   @Test
   public void shouldSetSavepointRestoreForRemoteStreaming() {
     String path = "fakePath";
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("host:80");
     options.setSavepointPath(path);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
index 4785f4e..2aafb90 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -46,7 +46,6 @@ import 
org.apache.beam.runners.core.construction.resources.PipelineResources;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -84,7 +83,7 @@ public class FlinkPipelineExecutionEnvironmentTest implements 
Serializable {
 
   @Test
   public void shouldRecognizeAndTranslateStreamingPipeline() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("[auto]");
 
@@ -152,7 +151,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   @Test
   public void shouldUseDefaultTempLocationIfNoneSet() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("clusterAddress");
 
@@ -168,7 +167,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   @Test
   public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("clusterAddress");
 
@@ -189,7 +188,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   @Test
   public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws 
Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("clusterAddress");
     options.setStreaming(true);
@@ -214,7 +213,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
   public void shouldUseTransformOverrides() {
     boolean[] testParameters = {true, false};
     for (boolean streaming : testParameters) {
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
       options.setStreaming(streaming);
       options.setRunner(FlinkRunner.class);
       FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
@@ -234,7 +233,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   @Test
   public void shouldProvideParallelismToTransformOverrides() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setStreaming(true);
     options.setRunner(FlinkRunner.class);
     FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
@@ -278,7 +277,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   @Test
   public void shouldUseStreamingTransformOverridesWithUnboundedSources() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     // no explicit streaming mode set
     options.setRunner(FlinkRunner.class);
     FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
@@ -303,7 +302,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   @Test
   public void testTranslationModeOverrideWithUnboundedSources() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setStreaming(false);
 
@@ -319,7 +318,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
   public void testTranslationModeNoOverrideWithoutUnboundedSources() {
     boolean[] testArgs = new boolean[] {true, false};
     for (boolean streaming : testArgs) {
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
       options.setRunner(FlinkRunner.class);
       options.setStreaming(streaming);
 
@@ -408,7 +407,7 @@ public class FlinkPipelineExecutionEnvironmentTest 
implements Serializable {
 
   private FlinkPipelineOptions setPipelineOptions(
       String flinkMaster, String tempLocation, List<String> filesToStage) {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster(flinkMaster);
     options.setTempLocation(tempLocation);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
index f99a41e..b9b9513 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.core.IsNull.nullValue;
 import java.util.Collections;
 import java.util.HashMap;
 import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -70,7 +71,7 @@ public class FlinkPipelineOptionsTest {
   /** These defaults should only be changed with a very good reason. */
   @Test
   public void testDefaults() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     assertThat(options.getParallelism(), is(-1));
     assertThat(options.getMaxParallelism(), is(-1));
     assertThat(options.getFlinkMaster(), is("[auto]"));
@@ -95,6 +96,7 @@ public class FlinkPipelineOptionsTest {
     assertThat(options.getSavepointPath(), is(nullValue()));
     assertThat(options.getAllowNonRestoredState(), is(false));
     assertThat(options.getDisableMetrics(), is(false));
+    assertThat(options.getFasterCopy(), is(false));
   }
 
   @Test(expected = Exception.class)
@@ -108,7 +110,8 @@ public class FlinkPipelineOptionsTest {
         Collections.emptyMap(),
         mainTag,
         Collections.emptyList(),
-        new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(
+            mainTag, coder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
         WindowingStrategy.globalDefault(),
         new HashMap<>(),
         Collections.emptyList(),
@@ -134,7 +137,8 @@ public class FlinkPipelineOptionsTest {
             Collections.emptyMap(),
             mainTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                mainTag, coder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             WindowingStrategy.globalDefault(),
             new HashMap<>(),
             Collections.emptyList(),
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
index 6baf700..20e7627 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -123,7 +122,7 @@ public class FlinkRequiresStableInputTest {
    */
   @Test(timeout = 30_000)
   public void testParDoRequiresStableInput() throws Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setParallelism(1);
     // We only want to trigger external savepoints but we require
     // checkpointing to be enabled for @RequiresStableInput
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
index 4cda4cf..5aaa8ff 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -57,7 +56,7 @@ public class FlinkRunnerTest extends FlinkRunnerTestCompat {
 
   /** Main method for {@code testEnsureStdoutStdErrIsRestored()}. */
   public static void main(String[] args) {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(NotExecutingFlinkRunner.class);
     Pipeline p = Pipeline.create(options);
     p.apply(GenerateSequence.from(0));
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 9b928d1..d899917 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -33,7 +33,6 @@ import 
org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.StateSpec;
@@ -151,7 +150,7 @@ public class FlinkSavepointTest implements Serializable {
   }
 
   private void runSavepointAndRestore(boolean isPortablePipeline) throws 
Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setStreaming(true);
     // Initial parallelism
     options.setParallelism(2);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
index 9b39917..193a787 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
@@ -161,7 +161,7 @@ public class FlinkSubmissionTest {
 
   /** The Flink program which is executed by the CliFrontend. */
   public static void main(String[] args) {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(FlinkRunner.class);
     options.setStreaming(streaming);
     options.setParallelism(1);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
index 5436137..4f060b5 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.WriteFilesResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -55,7 +54,7 @@ public class FlinkTransformOverridesTest {
 
   @Test
   public void testRunnerDeterminedSharding() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setRunner(TestFlinkRunner.class);
     options.setFlinkMaster("[auto]");
     options.setParallelism(5);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 0011db9..842852e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.streaming;
 import java.util.Collections;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsTest;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -45,7 +47,10 @@ public class FlinkBroadcastStateInternalsTest extends 
StateInternalsTest {
       OperatorStateBackend operatorStateBackend =
           backend.createOperatorStateBackend(
               new DummyEnvironment("test", 1, 0), "", Collections.emptyList(), 
null);
-      return new FlinkBroadcastStateInternals<>(1, operatorStateBackend);
+      return new FlinkBroadcastStateInternals<>(
+          1,
+          operatorStateBackend,
+          new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index c92c893..8aa0440 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -28,6 +28,8 @@ import org.apache.beam.runners.core.StateInternalsTest;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -62,7 +64,10 @@ public class FlinkStateInternalsTest extends 
StateInternalsTest {
   protected StateInternals createStateInternals() {
     try {
       KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
-      return new FlinkStateInternals<>(keyedStateBackend, 
StringUtf8Coder.of());
+      return new FlinkStateInternals<>(
+          keyedStateBackend,
+          StringUtf8Coder.of(),
+          new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -72,7 +77,10 @@ public class FlinkStateInternalsTest extends 
StateInternalsTest {
   public void testWatermarkHoldsPersistence() throws Exception {
     KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
     FlinkStateInternals stateInternals =
-        new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of());
+        new FlinkStateInternals<>(
+            keyedStateBackend,
+            StringUtf8Coder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
 
     StateTag<WatermarkHoldState> stateTag =
         StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
@@ -123,7 +131,11 @@ public class FlinkStateInternalsTest extends 
StateInternalsTest {
     assertThat(fixedWindow.read(), is(middle));
 
     // Discard watermark view and recover it
-    stateInternals = new FlinkStateInternals<>(keyedStateBackend, 
StringUtf8Coder.of());
+    stateInternals =
+        new FlinkStateInternals<>(
+            keyedStateBackend,
+            StringUtf8Coder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     globalWindow = stateInternals.state(StateNamespaces.global(), stateTag);
     fixedWindow =
         stateInternals.state(
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
index d783801..9245962 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.TestFlinkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -44,7 +43,7 @@ public class GroupByWithNullValuesTest implements 
Serializable {
 
   @Test
   public void testGroupByWithNullValues() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
 
     options.setRunner(TestFlinkRunner.class);
     options.setStreaming(true);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 06dfb53..9411967 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -38,6 +38,7 @@ import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -139,11 +139,12 @@ public class DoFnOperatorTest {
             Collections.emptyMap(),
             outputTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
coder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag, coder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             WindowingStrategy.globalDefault(),
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             null,
             null,
             DoFnSchemaInformation.create(),
@@ -204,11 +205,15 @@ public class DoFnOperatorTest {
             mainOutput,
             ImmutableList.of(additionalOutput1, additionalOutput2),
             new DoFnOperator.MultiOutputOutputManagerFactory(
-                mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds),
+                mainOutput,
+                tagsToOutputTags,
+                tagsToCoders,
+                tagsToIds,
+                new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             WindowingStrategy.globalDefault(),
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             null,
             null,
             DoFnSchemaInformation.create(),
@@ -340,11 +345,14 @@ public class DoFnOperatorTest {
             Collections.emptyMap(),
             outputTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
outputCoder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag,
+                outputCoder,
+                new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             windowingStrategy,
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             keyCoder, /* key coder */
             keySelector,
             DoFnSchemaInformation.create(),
@@ -354,9 +362,12 @@ public class DoFnOperatorTest {
         new KeyedOneInputStreamOperatorTestHarness<>(
             doFnOperator,
             keySelector,
-            new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+            new CoderTypeInformation<>(
+                FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
 
-    testHarness.setup(new CoderTypeSerializer<>(outputCoder));
+    testHarness.setup(
+        new CoderTypeSerializer<>(
+            outputCoder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())));
 
     testHarness.open();
 
@@ -449,11 +460,14 @@ public class DoFnOperatorTest {
             Collections.emptyMap(),
             outputTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
outputCoder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag,
+                outputCoder,
+                new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             windowingStrategy,
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             keyCoder, /* key coder */
             keySelector,
             DoFnSchemaInformation.create(),
@@ -463,7 +477,8 @@ public class DoFnOperatorTest {
         new KeyedOneInputStreamOperatorTestHarness<>(
             doFnOperator,
             keySelector,
-            new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+            new CoderTypeInformation<>(
+                FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
 
     testHarness.open();
 
@@ -687,11 +702,12 @@ public class DoFnOperatorTest {
             Collections.emptyMap(),
             outputTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
coder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag, coder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             windowingStrategy,
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             StringUtf8Coder.of(), /* key coder */
             keySelector,
             DoFnSchemaInformation.create(),
@@ -703,7 +719,8 @@ public class DoFnOperatorTest {
             new KeyedOneInputStreamOperatorTestHarness<>(
                 doFnOperator,
                 keySelector,
-                new 
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+                new CoderTypeInformation<>(
+                    FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
     return testHarness;
   }
 
@@ -741,11 +758,12 @@ public class DoFnOperatorTest {
             Collections.emptyMap(),
             outputTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
coder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag, coder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
             sideInputMapping, /* side-input mapping */
             ImmutableList.of(view1, view2), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             keyed ? keyCoder : null,
             keyed ? keySelector : null,
             DoFnSchemaInformation.create(),
@@ -761,7 +779,8 @@ public class DoFnOperatorTest {
               doFnOperator,
               keySelector,
               null,
-              new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+              new CoderTypeInformation<>(
+                  FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
     }
 
     testHarness.open();
@@ -850,14 +869,15 @@ public class DoFnOperatorTest {
     TupleTag<KV<String, Long>> outputTag = new TupleTag<>("main-output");
 
     StringUtf8Coder keyCoder = StringUtf8Coder.of();
-    KvToByteBufferKeySelector keySelector = new 
KvToByteBufferKeySelector<>(keyCoder);
+    KvToByteBufferKeySelector keySelector = new 
KvToByteBufferKeySelector<>(keyCoder, null);
     KvCoder<String, Long> coder = KvCoder.of(keyCoder, VarLongCoder.of());
 
     FullWindowedValueCoder<KV<String, Long>> kvCoder =
         WindowedValue.getFullCoder(coder, 
windowingStrategy.getWindowFn().windowCoder());
 
     CoderTypeInformation<ByteBuffer> keyCoderInfo =
-        new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of());
+        new CoderTypeInformation<>(
+            FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults());
 
     OneInputStreamOperatorTestHarness<
             WindowedValue<KV<String, Long>>, WindowedValue<KV<String, Long>>>
@@ -935,11 +955,14 @@ public class DoFnOperatorTest {
                   Collections.emptyMap(),
                   outputTag,
                   Collections.emptyList(),
-                  new 
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+                  new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                      outputTag,
+                      coder,
+                      new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
                   WindowingStrategy.globalDefault(),
                   sideInputMapping, /* side-input mapping */
                   ImmutableList.of(view1, view2), /* side inputs */
-                  PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+                  FlinkPipelineOptions.defaults(),
                   null,
                   null,
                   DoFnSchemaInformation.create(),
@@ -975,11 +998,14 @@ public class DoFnOperatorTest {
                   Collections.emptyMap(),
                   outputTag,
                   Collections.emptyList(),
-                  new 
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+                  new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                      outputTag,
+                      coder,
+                      new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
                   WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
                   sideInputMapping, /* side-input mapping */
                   ImmutableList.of(view1, view2), /* side inputs */
-                  PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+                  FlinkPipelineOptions.defaults(),
                   keyCoder,
                   keySelector,
                   DoFnSchemaInformation.create(),
@@ -990,7 +1016,8 @@ public class DoFnOperatorTest {
               keySelector,
               // we use a dummy key for the second input since it is 
considered to be broadcast
               null,
-              new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+              new CoderTypeInformation<>(
+                  FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
         });
   }
 
@@ -1074,11 +1101,14 @@ public class DoFnOperatorTest {
                   Collections.emptyMap(),
                   outputTag,
                   Collections.emptyList(),
-                  new 
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+                  new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                      outputTag,
+                      coder,
+                      new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
                   WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
                   sideInputMapping, /* side-input mapping */
                   ImmutableList.of(view1, view2), /* side inputs */
-                  PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+                  FlinkPipelineOptions.defaults(),
                   null,
                   null,
                   DoFnSchemaInformation.create(),
@@ -1115,11 +1145,14 @@ public class DoFnOperatorTest {
                   Collections.emptyMap(),
                   outputTag,
                   Collections.emptyList(),
-                  new 
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+                  new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                      outputTag,
+                      coder,
+                      new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
                   WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
                   sideInputMapping, /* side-input mapping */
                   ImmutableList.of(view1, view2), /* side inputs */
-                  PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+                  FlinkPipelineOptions.defaults(),
                   keyCoder,
                   keySelector,
                   DoFnSchemaInformation.create(),
@@ -1130,7 +1163,8 @@ public class DoFnOperatorTest {
               keySelector,
               // we use a dummy key for the second input since it is 
considered to be broadcast
               null,
-              new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+              new CoderTypeInformation<>(
+                  FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
         });
   }
 
@@ -1230,9 +1264,11 @@ public class DoFnOperatorTest {
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     final CoderTypeSerializer<WindowedValue<String>> outputSerializer =
-        new CoderTypeSerializer<>(outputCoder);
+        new CoderTypeSerializer<>(
+            outputCoder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     CoderTypeInformation<ByteBuffer> keyCoderInfo =
-        new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of());
+        new CoderTypeInformation<>(
+            FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults());
     KeySelector<WindowedValue<Integer>, ByteBuffer> keySelector =
         e -> FlinkKeyUtils.encodeKey(e.getValue(), keyCoder);
 
@@ -1310,11 +1346,14 @@ public class DoFnOperatorTest {
             Collections.emptyMap(),
             outputTag,
             Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
outputCoder),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag,
+                outputCoder,
+                new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
             windowingStrategy,
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             keyCoder /* key coder */,
             keySelector,
             DoFnSchemaInformation.create(),
@@ -1331,7 +1370,7 @@ public class DoFnOperatorTest {
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setMaxBundleSize(2L);
     options.setMaxBundleTimeMills(10L);
 
@@ -1347,7 +1386,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     DoFnOperator<String, String> doFnOperator =
         new DoFnOperator<>(
@@ -1465,13 +1505,15 @@ public class DoFnOperatorTest {
   public void testBundleKeyed() throws Exception {
 
     StringUtf8Coder keyCoder = StringUtf8Coder.of();
-    KvToByteBufferKeySelector keySelector = new 
KvToByteBufferKeySelector<>(keyCoder);
+    KvToByteBufferKeySelector keySelector =
+        new KvToByteBufferKeySelector<>(
+            keyCoder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     KvCoder<String, String> kvCoder = KvCoder.of(keyCoder, 
StringUtf8Coder.of());
     WindowedValue.ValueOnlyWindowedValueCoder<KV<String, String>> 
windowedValueCoder =
         WindowedValue.getValueOnlyCoder(kvCoder);
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setMaxBundleSize(2L);
     options.setMaxBundleTimeMills(10L);
 
@@ -1493,7 +1535,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(kvCoder.getValueCoder(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(kvCoder.getValueCoder(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator =
         new DoFnOperator(
@@ -1596,7 +1639,7 @@ public class DoFnOperatorTest {
 
   @Test
   public void testCheckpointBufferingWithMultipleBundles() throws Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setMaxBundleSize(10L);
     options.setCheckpointingInterval(1L);
 
@@ -1609,7 +1652,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory<>(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     @SuppressWarnings("unchecked")
     Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
@@ -1700,7 +1744,7 @@ public class DoFnOperatorTest {
 
   @Test
   public void testExactlyOnceBuffering() throws Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setMaxBundleSize(2L);
     options.setCheckpointingInterval(1L);
 
@@ -1733,7 +1777,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
         () ->
@@ -1810,14 +1855,15 @@ public class DoFnOperatorTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testExactlyOnceBufferingKeyed() throws Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setMaxBundleSize(2L);
     options.setCheckpointingInterval(1L);
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     StringUtf8Coder keyCoder = StringUtf8Coder.of();
-    KvToByteBufferKeySelector keySelector = new 
KvToByteBufferKeySelector<>(keyCoder);
+    KvToByteBufferKeySelector keySelector =
+        new KvToByteBufferKeySelector<>(keyCoder, new 
SerializablePipelineOptions(options));
     KvCoder<String, String> kvCoder = KvCoder.of(keyCoder, 
StringUtf8Coder.of());
     WindowedValue.ValueOnlyWindowedValueCoder<KV<String, String>> 
windowedValueCoder =
         WindowedValue.getValueOnlyCoder(kvCoder);
@@ -1848,7 +1894,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     Supplier<DoFnOperator<KV<String, String>, KV<String, String>>> 
doFnOperatorSupplier =
         () ->
@@ -1942,7 +1989,7 @@ public class DoFnOperatorTest {
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     StringUtf8Coder keyCoder = StringUtf8Coder.of();
-    KvToByteBufferKeySelector keySelector = new 
KvToByteBufferKeySelector<>(keyCoder);
+    KvToByteBufferKeySelector keySelector = new 
KvToByteBufferKeySelector<>(keyCoder, null);
     KvCoder<String, String> kvCoder = KvCoder.of(keyCoder, 
StringUtf8Coder.of());
     WindowedValue.ValueOnlyWindowedValueCoder<KV<String, String>> 
windowedValueCoder =
         WindowedValue.getValueOnlyCoder(kvCoder);
@@ -1960,9 +2007,10 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
 
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     // should make the DoFnOperator creation fail
     options.setCheckpointingInterval(-1L);
     new DoFnOperator(
@@ -1985,7 +2033,7 @@ public class DoFnOperatorTest {
 
   @Test
   public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws 
Exception {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setMaxBundleSize(10L);
     options.setCheckpointingInterval(1L);
 
@@ -1998,7 +2046,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     @SuppressWarnings("unchecked")
     DoFnOperator doFnOperator =
@@ -2079,7 +2128,7 @@ public class DoFnOperatorTest {
   }
 
   private static DoFnOperator getOperatorForCleanupInspection() {
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
     options.setParallelism(4);
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
@@ -2097,7 +2146,8 @@ public class DoFnOperatorTest {
     DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
         new DoFnOperator.MultiOutputOutputManagerFactory(
             outputTag,
-            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
+            new SerializablePipelineOptions(options));
 
     return new DoFnOperator<>(
         doFn,
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 02684bc..91ff57c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -60,6 +60,7 @@ import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
@@ -84,7 +85,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -196,7 +196,10 @@ public class ExecutableStageDoFnOperatorTest {
   public void sdkErrorsSurfaceOnClose() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VoidCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     ExecutableStageDoFnOperator<Integer, Integer> operator =
         getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
 
@@ -225,7 +228,10 @@ public class ExecutableStageDoFnOperatorTest {
   public void expectedInputsAreSent() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VoidCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     ExecutableStageDoFnOperator<Integer, Integer> operator =
         getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
 
@@ -291,7 +297,11 @@ public class ExecutableStageDoFnOperatorTest {
 
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
         new DoFnOperator.MultiOutputOutputManagerFactory(
-            mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+            mainOutput,
+            tagsToOutputTags,
+            tagsToCoders,
+            tagsToIds,
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
 
     WindowedValue<Integer> zero = WindowedValue.valueInGlobalWindow(0);
     WindowedValue<Integer> three = WindowedValue.valueInGlobalWindow(3);
@@ -411,7 +421,10 @@ public class ExecutableStageDoFnOperatorTest {
   public void testWatermarkHandling() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VoidCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     ExecutableStageDoFnOperator<KV<String, Integer>, Integer> operator =
         getOperator(
             mainOutput,
@@ -428,7 +441,7 @@ public class ExecutableStageDoFnOperatorTest {
             new KeyedOneInputStreamOperatorTestHarness<>(
                 operator,
                 val -> val.getValue().getKey(),
-                new CoderTypeInformation<>(StringUtf8Coder.of()));
+                new CoderTypeInformation<>(StringUtf8Coder.of(), 
FlinkPipelineOptions.defaults()));
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
     when(bundle.getInputReceivers())
         .thenReturn(
@@ -544,7 +557,10 @@ public class ExecutableStageDoFnOperatorTest {
   public void testStageBundleClosed() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VoidCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     ExecutableStageDoFnOperator<Integer, Integer> operator =
         getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
 
@@ -578,7 +594,10 @@ public class ExecutableStageDoFnOperatorTest {
   public void testEnsureStateCleanupWithKeyedInput() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VarIntCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VarIntCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     VarIntCoder keyCoder = VarIntCoder.of();
     ExecutableStageDoFnOperator<Integer, Integer> operator =
         getOperator(
@@ -592,7 +611,9 @@ public class ExecutableStageDoFnOperatorTest {
     KeyedOneInputStreamOperatorTestHarness<Integer, WindowedValue<Integer>, 
WindowedValue<Integer>>
         testHarness =
             new KeyedOneInputStreamOperatorTestHarness(
-                operator, val -> val, new CoderTypeInformation<>(keyCoder));
+                operator,
+                val -> val,
+                new CoderTypeInformation<>(keyCoder, 
FlinkPipelineOptions.defaults()));
 
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
     when(bundle.getInputReceivers())
@@ -691,7 +712,10 @@ public class ExecutableStageDoFnOperatorTest {
       throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VoidCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
     StringUtf8Coder keyCoder = StringUtf8Coder.of();
 
     WindowingStrategy windowingStrategy =
@@ -738,7 +762,8 @@ public class ExecutableStageDoFnOperatorTest {
             new KeyedOneInputStreamOperatorTestHarness(
                 operator,
                 operator.keySelector,
-                new 
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+                new CoderTypeInformation<>(
+                    FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
 
     testHarness.open();
 
@@ -858,7 +883,10 @@ public class ExecutableStageDoFnOperatorTest {
   public void testEnsureStateCleanupOnFinalWatermark() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
-        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+        new DoFnOperator.MultiOutputOutputManagerFactory(
+            mainOutput,
+            VoidCoder.of(),
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
 
     StringUtf8Coder keyCoder = StringUtf8Coder.of();
 
@@ -881,7 +909,8 @@ public class ExecutableStageDoFnOperatorTest {
             new KeyedOneInputStreamOperatorTestHarness(
                 operator,
                 operator.keySelector,
-                new 
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+                new CoderTypeInformation<>(
+                    FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
 
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
     when(bundle.getInputReceivers())
@@ -1060,9 +1089,13 @@ public class ExecutableStageDoFnOperatorTest {
 
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
         new DoFnOperator.MultiOutputOutputManagerFactory(
-            mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+            mainOutput,
+            tagsToOutputTags,
+            tagsToCoders,
+            tagsToIds,
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
 
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
 
     ExecutableStageDoFnOperator<Integer, Integer> operator =
         new ExecutableStageDoFnOperator<>(
@@ -1139,14 +1172,14 @@ public class ExecutableStageDoFnOperatorTest {
             Collections.emptyMap() /* sideInputTagMapping */,
             Collections.emptyList() /* sideInputs */,
             Collections.emptyMap() /* sideInputId mapping */,
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            FlinkPipelineOptions.defaults(),
             stagePayload,
             jobInfo,
             contextFactory,
             createOutputMap(mainOutput, additionalOutputs),
             windowingStrategy,
             keyCoder,
-            keyCoder != null ? new KvToByteBufferKeySelector<>(keyCoder) : 
null);
+            keyCoder != null ? new KvToByteBufferKeySelector<>(keyCoder, null) 
: null);
 
     Whitebox.setInternalState(operator, "stateRequestHandler", 
stateRequestHandler);
     return operator;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index 371cbd0..f37a875 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -32,13 +32,13 @@ import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -212,11 +212,14 @@ public class WindowDoFnOperatorTest {
         (Coder) inputCoder,
         outputTag,
         emptyList(),
-        new MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
+        new MultiOutputOutputManagerFactory<>(
+            outputTag,
+            outputCoder,
+            new SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
         windowingStrategy,
         emptyMap(),
         emptyList(),
-        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        FlinkPipelineOptions.defaults(),
         VarLongCoder.of(),
         new WorkItemKeySelector(VarLongCoder.of()));
   }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
index 60204c7..a714643 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
@@ -125,7 +125,7 @@ public class UnboundedSourceWrapperTest {
     @Test(timeout = 30_000)
     public void testValueEmission() throws Exception {
       final int numElementsPerShard = 20;
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
 
       final long[] numElementsReceived = {0L};
       final int[] numWatermarksReceived = {0};
@@ -608,7 +608,7 @@ public class UnboundedSourceWrapperTest {
 
     private static void testSourceDoesNotShutdown(boolean shouldHaveReaders) 
throws Exception {
       final int parallelism = 2;
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
       // Make sure we do not shut down
       options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE);
 
@@ -696,7 +696,7 @@ public class UnboundedSourceWrapperTest {
           new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
               CountingSource.upTo(1000));
 
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
 
       UnboundedSourceWrapper<
               Long, 
UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint<Long>>
@@ -763,7 +763,7 @@ public class UnboundedSourceWrapperTest {
 
     @Test
     public void testAccumulatorRegistrationOnOperatorClose() throws Exception {
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
 
       TestCountingSource source = new 
TestCountingSource(20).withoutSplitting();
 
@@ -800,7 +800,7 @@ public class UnboundedSourceWrapperTest {
           new IdlingUnboundedSource<>(
               Arrays.asList("first", "second", "third"), StringUtf8Coder.of());
 
-      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
       options.setShutdownSourcesAfterIdleMs(0L);
       options.setParallelism(4);
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
index 27e5100..a1feb51 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
@@ -23,6 +23,8 @@ import static org.hamcrest.Matchers.is;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -166,6 +168,7 @@ public class BufferingDoFnRunnerTest {
         WindowedValue.getFullCoder(VarIntCoder.of(), 
GlobalWindow.Coder.INSTANCE),
         operatorStateBackend,
         null,
-        concurrentCheckpoints);
+        concurrentCheckpoints,
+        new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
   }
 }
diff --git 
a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html 
b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
index 9564469..2656acb 100644
--- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
@@ -73,6 +73,11 @@ Should be called before running the tests.
   <td>Default: <code>true</code></td>
 </tr>
 <tr>
+  <td><code>fasterCopy</code></td>
+  <td>Remove unneeded deep copy between operators. See 
https://issues.apache.org/jira/browse/BEAM-11146</td>
+  <td>Default: <code>false</code></td>
+</tr>
+<tr>
   <td><code>filesToStage</code></td>
   <td>Jar-Files to send to all workers and put on the classpath. The default 
value is all files from the classpath.</td>
   <td></td>
diff --git 
a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html 
b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
index 9abe508..17fb6f1 100644
--- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
@@ -73,6 +73,11 @@ Should be called before running the tests.
   <td>Default: <code>true</code></td>
 </tr>
 <tr>
+  <td><code>faster_copy</code></td>
+  <td>Remove unneeded deep copy between operators. See 
https://issues.apache.org/jira/browse/BEAM-11146</td>
+  <td>Default: <code>false</code></td>
+</tr>
+<tr>
   <td><code>files_to_stage</code></td>
   <td>Jar-Files to send to all workers and put on the classpath. The default 
value is all files from the classpath.</td>
   <td></td>

Reply via email to