This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4836937 [BEAM-11146] Add fasterCopy option to Flink runner (#13240)
4836937 is described below
commit 4836937762a026bf0ce7a681e7af513d124bc26f
Author: Teodor Spæren <[email protected]>
AuthorDate: Tue Nov 3 14:06:55 2020 +0100
[BEAM-11146] Add fasterCopy option to Flink runner (#13240)
* [BEAM-11146] Add copyFaster option to Flink runner
The copyFaster option, removes an unnecessary deep copy in the Flink
runner between each operator. This should lead to improved performance
in all cases.
* Add defaults() function to FlinkPipelineOptions
---
CHANGES.md | 1 +
.../translation/types/CoderTypeSerializer.java | 31 ++--
.../translation/types/CoderTypeSerializerTest.java | 6 +-
.../FlinkBatchPortablePipelineTranslator.java | 14 +-
.../flink/FlinkBatchTransformTranslators.java | 13 +-
.../flink/FlinkBatchTranslationContext.java | 2 +-
.../beam/runners/flink/FlinkPipelineOptions.java | 12 ++
.../org/apache/beam/runners/flink/FlinkRunner.java | 5 +
.../FlinkStreamingPortablePipelineTranslator.java | 43 ++++--
.../flink/FlinkStreamingTransformTranslators.java | 53 +++++--
.../flink/FlinkStreamingTranslationContext.java | 2 +-
.../apache/beam/runners/flink/TestFlinkRunner.java | 3 +-
.../translation/types/CoderTypeInformation.java | 11 +-
.../wrappers/streaming/DoFnOperator.java | 35 +++--
.../streaming/ExecutableStageDoFnOperator.java | 13 +-
.../streaming/KvToByteBufferKeySelector.java | 7 +-
.../streaming/io/UnboundedSourceWrapper.java | 4 +-
.../streaming/stableinput/BufferingDoFnRunner.java | 12 +-
.../state/FlinkBroadcastStateInternals.java | 45 ++++--
.../streaming/state/FlinkStateInternals.java | 95 ++++++++-----
.../flink/FlinkExecutionEnvironmentsTest.java | 57 ++++----
.../FlinkPipelineExecutionEnvironmentTest.java | 21 ++-
.../runners/flink/FlinkPipelineOptionsTest.java | 10 +-
.../flink/FlinkRequiresStableInputTest.java | 3 +-
.../apache/beam/runners/flink/FlinkRunnerTest.java | 3 +-
.../beam/runners/flink/FlinkSavepointTest.java | 3 +-
.../beam/runners/flink/FlinkSubmissionTest.java | 2 +-
.../runners/flink/FlinkTransformOverridesTest.java | 3 +-
.../FlinkBroadcastStateInternalsTest.java | 7 +-
.../flink/streaming/FlinkStateInternalsTest.java | 18 ++-
.../flink/streaming/GroupByWithNullValuesTest.java | 3 +-
.../wrappers/streaming/DoFnOperatorTest.java | 156 ++++++++++++++-------
.../streaming/ExecutableStageDoFnOperatorTest.java | 67 ++++++---
.../wrappers/streaming/WindowDoFnOperatorTest.java | 9 +-
.../streaming/io/UnboundedSourceWrapperTest.java | 10 +-
.../stableinput/BufferingDoFnRunnerTest.java | 5 +-
.../shortcodes/flink_java_pipeline_options.html | 5 +
.../shortcodes/flink_python_pipeline_options.html | 5 +
38 files changed, 521 insertions(+), 273 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index c288da2..e9b1085 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
* Added support for json payload format in Beam SQL Kafka Table
([BEAM-10893](https://issues.apache.org/jira/browse/BEAM-10893))
* Added support for protobuf payload format in Beam SQL Kafka Table
([BEAM-10892](https://issues.apache.org/jira/browse/BEAM-10892))
* Added support for avro payload format in Beam SQL Pubsub Table
([BEAM-10857](https://issues.apache.org/jira/browse/BEAM-10857))
+* Added option to disable unnecessary copying between operators in Flink
Runner (Java) ([BEAM-11146](https://issues.apache.org/jira/browse/BEAM-11146))
* X feature added (Java/Python)
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
## Breaking Changes
diff --git
a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index 04f4bae..758ccc0 100644
---
a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++
b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation.types;
import java.io.EOFException;
import java.io.IOException;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -49,20 +50,18 @@ public class CoderTypeSerializer<T> extends
TypeSerializer<T> {
* org.apache.beam.sdk.io.FileSystems} registration needed for {@link
* org.apache.beam.sdk.transforms.Reshuffle} translation.
*/
- @SuppressWarnings("unused")
- private final @Nullable SerializablePipelineOptions pipelineOptions;
+ private final SerializablePipelineOptions pipelineOptions;
- public CoderTypeSerializer(Coder<T> coder) {
- Preconditions.checkNotNull(coder);
- this.coder = coder;
- this.pipelineOptions = null;
- }
+ private final boolean fasterCopy;
- public CoderTypeSerializer(
- Coder<T> coder, @Nullable SerializablePipelineOptions pipelineOptions) {
+ public CoderTypeSerializer(Coder<T> coder, SerializablePipelineOptions
pipelineOptions) {
Preconditions.checkNotNull(coder);
+ Preconditions.checkNotNull(pipelineOptions);
this.coder = coder;
this.pipelineOptions = pipelineOptions;
+
+ FlinkPipelineOptions options =
pipelineOptions.get().as(FlinkPipelineOptions.class);
+ this.fasterCopy = options.getFasterCopy();
}
@Override
@@ -72,7 +71,7 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T>
{
@Override
public CoderTypeSerializer<T> duplicate() {
- return new CoderTypeSerializer<>(coder);
+ return new CoderTypeSerializer<>(coder, pipelineOptions);
}
@Override
@@ -82,10 +81,14 @@ public class CoderTypeSerializer<T> extends
TypeSerializer<T> {
@Override
public T copy(T t) {
- try {
- return CoderUtils.clone(coder, t);
- } catch (CoderException e) {
- throw new RuntimeException("Could not clone.", e);
+ if (fasterCopy) {
+ return t;
+ } else {
+ try {
+ return CoderUtils.clone(coder, t);
+ } catch (CoderException e) {
+ throw new RuntimeException("Could not clone.", e);
+ }
}
}
diff --git
a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
b/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
index e88f5cc..286fd01 100644
---
a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
+++
b/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
@@ -24,9 +24,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.junit.Test;
@@ -59,7 +61,9 @@ public class CoderTypeSerializerTest implements Serializable {
}
private void testWriteAndReadConfigSnapshot(Coder<String> coder) throws
IOException {
- CoderTypeSerializer<String> serializer = new CoderTypeSerializer<>(coder);
+ CoderTypeSerializer<String> serializer =
+ new CoderTypeSerializer<>(
+ coder, new
SerializablePipelineOptions(PipelineOptionsFactory.create()));
TypeSerializerSnapshot writtenSnapshot =
serializer.snapshotConfiguration();
ComparatorTestBase.TestOutputView outView = new
ComparatorTestBase.TestOutputView();
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index d8989d4..fe1d762 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -312,7 +312,8 @@ public class FlinkBatchPortablePipelineTranslator
unionCoders.add(coder);
}
UnionCoder unionCoder = UnionCoder.of(unionCoders);
- TypeInformation<RawUnionValue> typeInformation = new
CoderTypeInformation<>(unionCoder);
+ TypeInformation<RawUnionValue> typeInformation =
+ new CoderTypeInformation<>(unionCoder, context.getPipelineOptions());
RunnerApi.ExecutableStagePayload stagePayload;
try {
@@ -423,7 +424,8 @@ public class FlinkBatchPortablePipelineTranslator
.returns(
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
- (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE)));
+ (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions()));
} else {
for (String pCollectionId : allInputs.values()) {
DataSet<WindowedValue<T>> current =
context.getDataSetOrThrow(pCollectionId);
@@ -497,7 +499,7 @@ public class FlinkBatchPortablePipelineTranslator
windowingStrategy.getWindowFn().windowCoder());
TypeInformation<WindowedValue<KV<K, List<V>>>> partialReduceTypeInfo =
- new CoderTypeInformation<>(outputCoder);
+ new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());
Grouping<WindowedValue<KV<K, V>>> inputGrouping =
inputDataSet.groupBy(new
KvKeySelector<>(inputElementCoder.getKeyCoder()));
@@ -538,7 +540,8 @@ public class FlinkBatchPortablePipelineTranslator
PTransformNode transform, RunnerApi.Pipeline pipeline,
BatchTranslationContext context) {
TypeInformation<WindowedValue<byte[]>> typeInformation =
new CoderTypeInformation<>(
- WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions());
DataSource<WindowedValue<byte[]>> dataSource =
new DataSource<>(
context.getExecutionEnvironment(),
@@ -613,7 +616,8 @@ public class FlinkBatchPortablePipelineTranslator
Coder<WindowedValue<?>> outputCoder,
String transformName,
String collectionId) {
- TypeInformation<WindowedValue<?>> outputType = new
CoderTypeInformation<>(outputCoder);
+ TypeInformation<WindowedValue<?>> outputType =
+ new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());
FlinkExecutableStagePruningFunction pruningFunction =
new FlinkExecutableStagePruningFunction(unionTag,
context.getPipelineOptions());
FlatMapOperator<RawUnionValue, WindowedValue<?>> pruningOperator =
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index aac9c94..b0464be 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -300,7 +300,8 @@ class FlinkBatchTransformTranslators {
WindowedValue.getFullCoder(
KvCoder.of(
inputCoder.getKeyCoder(),
IterableCoder.of(inputCoder.getValueCoder())),
- windowingStrategy.getWindowFn().windowCoder()));
+ windowingStrategy.getWindowFn().windowCoder()),
+ context.getPipelineOptions());
final DataSet<WindowedValue<KV<K, Iterable<InputT>>>> outputDataSet =
new GroupReduceOperator<>(
inputGrouping,
@@ -349,7 +350,8 @@ class FlinkBatchTransformTranslators {
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
- windowingStrategy.getWindowFn().windowCoder()));
+ windowingStrategy.getWindowFn().windowCoder()),
+ context.getPipelineOptions());
Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
inputDataSet.groupBy(new KvKeySelector<>(inputCoder.getKeyCoder()));
@@ -693,8 +695,8 @@ class FlinkBatchTransformTranslators {
TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
new CoderTypeInformation<>(
- WindowedValue.getFullCoder(
- unionCoder, windowingStrategy.getWindowFn().windowCoder()));
+ WindowedValue.getFullCoder(unionCoder,
windowingStrategy.getWindowFn().windowCoder()),
+ context.getPipelineOptions());
List<PCollectionView<?>> sideInputs;
try {
@@ -842,7 +844,8 @@ class FlinkBatchTransformTranslators {
.returns(
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
- (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE)));
+ (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions()));
} else {
for (PValue taggedPc : allInputs.values()) {
checkArgument(
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 75ee568..68114bf 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -143,7 +143,7 @@ class FlinkBatchTranslationContext {
WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
WindowedValue.getFullCoder(coder,
windowingStrategy.getWindowFn().windowCoder());
- return new CoderTypeInformation<>(windowedValueCoder);
+ return new CoderTypeInformation<>(windowedValueCoder, options);
}
Map<TupleTag<?>, PCollection<?>> getInputs(PTransform<?, ?> transform) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 6c42e2f..9afaa0b 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
/**
@@ -274,4 +275,15 @@ public interface FlinkPipelineOptions
Boolean getReIterableGroupByKeyResult();
void setReIterableGroupByKeyResult(Boolean reIterableGroupByKeyResult);
+
+ @Description(
+ "Remove unneeded deep copy between operators. See
https://issues.apache.org/jira/browse/BEAM-11146")
+ @Default.Boolean(false)
+ Boolean getFasterCopy();
+
+ void setFasterCopy(Boolean fasterCopy);
+
+ static FlinkPipelineOptions defaults() {
+ return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ }
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 3635954..e97930b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -79,6 +79,11 @@ public class FlinkRunner extends
PipelineRunner<PipelineResult> {
LOG.info("Executing pipeline using FlinkRunner.");
+ if (!options.getFasterCopy()) {
+ LOG.warn(
+ "For maximum performance you should set the 'fasterCopy' option. See
more at https://issues.apache.org/jira/browse/BEAM-11146");
+ }
+
FlinkPipelineExecutionEnvironment env = new
FlinkPipelineExecutionEnvironment(options);
LOG.info("Translating pipeline to Flink program.");
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 6eabacb..9ebba24 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -299,7 +299,8 @@ public class FlinkStreamingPortablePipelineTranslator
.returns(
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
- (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE)));
+ (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions()));
context.addDataStream(Iterables.getOnlyElement(transform.getOutputsMap().values()),
result);
} else {
DataStream<T> result = null;
@@ -402,7 +403,7 @@ public class FlinkStreamingPortablePipelineTranslator
WindowedValue.getFullCoder(workItemCoder,
windowingStrategy.getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, V>>>
workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
+ new CoderTypeInformation<>(windowedWorkItemCoder,
context.getPipelineOptions());
DataStream<WindowedValue<SingletonKeyedWorkItem<K, V>>> workItemStream =
inputDataStream
@@ -429,7 +430,7 @@ public class FlinkStreamingPortablePipelineTranslator
windowingStrategy.getWindowFn().windowCoder());
TypeInformation<WindowedValue<KV<K, Iterable<V>>>> outputTypeInfo =
- new CoderTypeInformation<>(outputCoder);
+ new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());
TupleTag<KV<K, Iterable<V>>> mainTag = new TupleTag<>("main output");
@@ -440,7 +441,10 @@ public class FlinkStreamingPortablePipelineTranslator
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory(mainTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainTag,
+ outputCoder,
+ new SerializablePipelineOptions(context.getPipelineOptions())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
@@ -497,7 +501,8 @@ public class FlinkStreamingPortablePipelineTranslator
Coder<WindowedValue<T>> windowCoder =
instantiateCoder(outputCollectionId, pipeline.getComponents());
- TypeInformation<WindowedValue<T>> outputTypeInfo = new
CoderTypeInformation<>(windowCoder);
+ TypeInformation<WindowedValue<T>> outputTypeInfo =
+ new CoderTypeInformation<>(windowCoder, pipelineOptions);
WindowingStrategy windowStrategy =
getWindowingStrategy(outputCollectionId, pipeline.getComponents());
@@ -506,7 +511,8 @@ public class FlinkStreamingPortablePipelineTranslator
WindowedValue.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(
((WindowedValueCoder) windowCoder).getValueCoder()),
- windowStrategy.getWindowFn().windowCoder()));
+ windowStrategy.getWindowFn().windowCoder()),
+ pipelineOptions);
UnboundedSource unboundedSource =
ReadTranslation.unboundedSourceFromProto(payload);
@@ -547,7 +553,8 @@ public class FlinkStreamingPortablePipelineTranslator
TypeInformation<WindowedValue<byte[]>> typeInfo =
new CoderTypeInformation<>(
- WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions());
long shutdownAfterIdleSourcesMs =
context.getPipelineOptions().getShutdownSourcesAfterIdleMs();
SingleOutputStreamOperator<WindowedValue<byte[]>> source =
@@ -575,7 +582,8 @@ public class FlinkStreamingPortablePipelineTranslator
TypeInformation<WindowedValue<byte[]>> typeInfo =
new CoderTypeInformation<>(
- WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions());
ObjectMapper objectMapper = new ObjectMapper();
final int intervalMillis;
@@ -641,7 +649,7 @@ public class FlinkStreamingPortablePipelineTranslator
outputCoders.put(localOutputName, windowCoder);
TupleTag<?> tupleTag = new TupleTag<>(localOutputName);
CoderTypeInformation<WindowedValue<?>> typeInformation =
- new CoderTypeInformation(windowCoder);
+ new CoderTypeInformation(windowCoder, context.getPipelineOptions());
tagsToOutputTags.put(tupleTag, new OutputTag<>(localOutputName,
typeInformation));
tagsToCoders.put(tupleTag, windowCoder);
tagsToIds.put(tupleTag, outputIndexMap.get(localOutputName));
@@ -654,7 +662,8 @@ public class FlinkStreamingPortablePipelineTranslator
CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
(!outputs.isEmpty())
- ? new CoderTypeInformation(outputCoders.get(mainOutputTag.getId()))
+ ? new CoderTypeInformation(
+ outputCoders.get(mainOutputTag.getId()),
context.getPipelineOptions())
: null;
ArrayList<TupleTag<?>> additionalOutputTags = Lists.newArrayList();
@@ -684,13 +693,19 @@ public class FlinkStreamingPortablePipelineTranslator
valueCoder.getClass().getSimpleName()));
}
keyCoder = ((KvCoder) valueCoder).getKeyCoder();
- keySelector = new KvToByteBufferKeySelector(keyCoder);
+ keySelector =
+ new KvToByteBufferKeySelector(
+ keyCoder, new
SerializablePipelineOptions(context.getPipelineOptions()));
inputDataStream = inputDataStream.keyBy(keySelector);
}
DoFnOperator.MultiOutputOutputManagerFactory<OutputT> outputManagerFactory
=
new DoFnOperator.MultiOutputOutputManagerFactory<>(
- mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds);
+ mainOutputTag,
+ tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
+ new SerializablePipelineOptions(context.getPipelineOptions()));
DoFnOperator<InputT, OutputT> doFnOperator =
new ExecutableStageDoFnOperator<>(
@@ -794,7 +809,7 @@ public class FlinkStreamingPortablePipelineTranslator
.addSource(
new TestStreamSource<>(
testStreamDecoder,
transform.getSpec().getPayload().toByteArray()),
- new CoderTypeInformation<>(coder));
+ new CoderTypeInformation<>(coder,
context.getPipelineOptions()));
context.addDataStream(outputPCollectionId, source);
}
@@ -903,7 +918,7 @@ public class FlinkStreamingPortablePipelineTranslator
UnionCoder unionCoder = UnionCoder.of(viewCoders);
CoderTypeInformation<RawUnionValue> unionTypeInformation =
- new CoderTypeInformation<>(unionCoder);
+ new CoderTypeInformation<>(unionCoder, context.getPipelineOptions());
// transform each side input to RawUnionValue and union them
DataStream<RawUnionValue> sideInputUnion = null;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 68af4fb..dbf5114 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -198,7 +198,8 @@ class FlinkStreamingTransformTranslators {
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
- output.getWindowingStrategy().getWindowFn().windowCoder()));
+ output.getWindowingStrategy().getWindowFn().windowCoder()),
+ context.getPipelineOptions());
UnboundedSource<T, ?> rawSource;
try {
@@ -296,7 +297,8 @@ class FlinkStreamingTransformTranslators {
TypeInformation<WindowedValue<byte[]>> typeInfo =
new CoderTypeInformation<>(
- WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(ByteArrayCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions());
long shutdownAfterIdleSourcesMs =
context
@@ -434,7 +436,7 @@ class FlinkStreamingTransformTranslators {
UnionCoder unionCoder = UnionCoder.of(inputCoders);
CoderTypeInformation<RawUnionValue> unionTypeInformation =
- new CoderTypeInformation<>(unionCoder);
+ new CoderTypeInformation<>(unionCoder, context.getPipelineOptions());
// transform each side input to RawUnionValue and union them
DataStream<RawUnionValue> sideInputUnion = null;
@@ -544,7 +546,9 @@ class FlinkStreamingTransformTranslators {
// Based on the fact that the signature is stateful, DoFnSignatures
ensures
// that it is also keyed
keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
- keySelector = new KvToByteBufferKeySelector(keyCoder);
+ keySelector =
+ new KvToByteBufferKeySelector(
+ keyCoder, new
SerializablePipelineOptions(context.getPipelineOptions()));
inputDataStream = inputDataStream.keyBy(keySelector);
stateful = true;
} else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
@@ -556,7 +560,8 @@ class FlinkStreamingTransformTranslators {
CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
new CoderTypeInformation<>(
- context.getWindowedInputCoder((PCollection<OutputT>)
outputs.get(mainOutputTag)));
+ context.getWindowedInputCoder((PCollection<OutputT>)
outputs.get(mainOutputTag)),
+ context.getPipelineOptions());
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT> doFnOperator =
@@ -732,7 +737,11 @@ class FlinkStreamingTransformTranslators {
mainOutputTag1,
additionalOutputTags1,
new DoFnOperator.MultiOutputOutputManagerFactory<>(
- mainOutputTag1, tagsToOutputTags, tagsToCoders,
tagsToIds),
+ mainOutputTag1,
+ tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
+ new
SerializablePipelineOptions(context.getPipelineOptions())),
windowingStrategy,
transformedSideInputs,
sideInputs1,
@@ -793,7 +802,11 @@ class FlinkStreamingTransformTranslators {
mainOutputTag,
additionalOutputTags,
new DoFnOperator.MultiOutputOutputManagerFactory<>(
- mainOutputTag, tagsToOutputTags, tagsToCoders,
tagsToIds),
+ mainOutputTag,
+ tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
+ new
SerializablePipelineOptions(context.getPipelineOptions())),
windowingStrategy,
transformedSideInputs,
sideInputs,
@@ -904,7 +917,7 @@ class FlinkStreamingTransformTranslators {
workItemCoder,
input.getWindowingStrategy().getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>>
workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
+ new CoderTypeInformation<>(windowedWorkItemCoder,
context.getPipelineOptions());
DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>>
workItemStream =
inputDataStream
@@ -936,7 +949,10 @@ class FlinkStreamingTransformTranslators {
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainTag,
+ outputCoder,
+ new
SerializablePipelineOptions(context.getPipelineOptions())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
@@ -1004,7 +1020,7 @@ class FlinkStreamingTransformTranslators {
workItemCoder,
input.getWindowingStrategy().getWindowFn().windowCoder());
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>>
workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
+ new CoderTypeInformation<>(windowedWorkItemCoder,
context.getPipelineOptions());
DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>>
workItemStream =
inputDataStream
@@ -1039,7 +1055,10 @@ class FlinkStreamingTransformTranslators {
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainTag,
+ outputCoder,
+ new
SerializablePipelineOptions(context.getPipelineOptions())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
@@ -1067,7 +1086,10 @@ class FlinkStreamingTransformTranslators {
(Coder) windowedWorkItemCoder,
mainTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainTag,
+ outputCoder,
+ new
SerializablePipelineOptions(context.getPipelineOptions())),
windowingStrategy,
transformSideInputs.f0,
sideInputs,
@@ -1137,7 +1159,7 @@ class FlinkStreamingTransformTranslators {
windowedWorkItemCoder =
WindowedValue.getValueOnlyCoder(workItemCoder);
CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>>
workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
+ new CoderTypeInformation<>(windowedWorkItemCoder,
context.getPipelineOptions());
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream =
context.getInputDataStream(input);
@@ -1220,7 +1242,8 @@ class FlinkStreamingTransformTranslators {
.returns(
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
- (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE)));
+ (Coder<T>) VoidCoder.of(),
GlobalWindow.Coder.INSTANCE),
+ context.getPipelineOptions()));
context.setOutputDataStream(context.getOutput(transform), result);
} else {
@@ -1386,7 +1409,7 @@ class FlinkStreamingTransformTranslators {
.getExecutionEnvironment()
.addSource(
new TestStreamSource<>(testStreamDecoder, payload),
- new CoderTypeInformation<>(elementCoder));
+ new CoderTypeInformation<>(elementCoder,
context.getPipelineOptions()));
context.setOutputDataStream(context.getOutput(testStream), source);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 75c2a20..ca98b6c 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -115,7 +115,7 @@ class FlinkStreamingTranslationContext {
WindowedValue.getFullCoder(
valueCoder,
collection.getWindowingStrategy().getWindowFn().windowCoder());
- return new CoderTypeInformation<>(windowedValueCoder);
+ return new CoderTypeInformation<>(windowedValueCoder, options);
}
public AppliedPTransform<?, ?, ?> getCurrentTransform() {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index d9a99a6..1808470 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -22,7 +22,6 @@ import
org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.UserCodeException;
/** Test Flink runner. */
@@ -46,7 +45,7 @@ public class TestFlinkRunner extends
PipelineRunner<PipelineResult> {
}
public static TestFlinkRunner create(boolean streaming) {
- FlinkPipelineOptions flinkOptions =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions flinkOptions = FlinkPipelineOptions.defaults();
flinkOptions.setStreaming(streaming);
return TestFlinkRunner.fromOptions(flinkOptions);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 10101cb..99f4617 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -36,16 +36,11 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public class CoderTypeInformation<T> extends TypeInformation<T> implements
AtomicType<T> {
private final Coder<T> coder;
- private final @Nullable SerializablePipelineOptions pipelineOptions;
+ private final SerializablePipelineOptions pipelineOptions;
- public CoderTypeInformation(Coder<T> coder) {
- checkNotNull(coder);
- this.coder = coder;
- this.pipelineOptions = null;
- }
-
- private CoderTypeInformation(Coder<T> coder, PipelineOptions
pipelineOptions) {
+ public CoderTypeInformation(Coder<T> coder, PipelineOptions pipelineOptions)
{
checkNotNull(coder);
+ checkNotNull(pipelineOptions);
this.coder = coder;
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 448dacb..0ccb969 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -387,7 +387,8 @@ public class DoFnOperator<InputT, OutputT>
ListStateDescriptor<WindowedValue<InputT>> pushedBackStateDescriptor =
new ListStateDescriptor<>(
- "pushed-back-elements", new
CoderTypeSerializer<>(windowedInputCoder));
+ "pushed-back-elements",
+ new CoderTypeSerializer<>(windowedInputCoder, serializedOptions));
if (keySelector != null) {
pushedBackElementsHandler =
@@ -409,7 +410,9 @@ public class DoFnOperator<InputT, OutputT>
FlinkBroadcastStateInternals sideInputStateInternals =
new FlinkBroadcastStateInternals<>(
- getContainingTask().getIndexInSubtaskGroup(),
getOperatorStateBackend());
+ getContainingTask().getIndexInSubtaskGroup(),
+ getOperatorStateBackend(),
+ serializedOptions);
sideInputHandler = new SideInputHandler(sideInputs,
sideInputStateInternals);
sideInputReader = sideInputHandler;
@@ -425,11 +428,13 @@ public class DoFnOperator<InputT, OutputT>
// StatefulPardo or WindowDoFn
if (keyCoder != null) {
keyedStateInternals =
- new FlinkStateInternals<>((KeyedStateBackend)
getKeyedStateBackend(), keyCoder);
+ new FlinkStateInternals<>(
+ (KeyedStateBackend) getKeyedStateBackend(), keyCoder,
serializedOptions);
if (timerService == null) {
timerService =
- getInternalTimerService("beam-timer", new
CoderTypeSerializer<>(timerCoder), this);
+ getInternalTimerService(
+ "beam-timer", new CoderTypeSerializer<>(timerCoder,
serializedOptions), this);
}
timerInternals = new FlinkTimerInternals();
@@ -486,7 +491,8 @@ public class DoFnOperator<InputT, OutputT>
windowingStrategy.getWindowFn().windowCoder(),
getOperatorStateBackend(),
getKeyedStateBackend(),
- options.getNumConcurrentCheckpoints());
+ options.getNumConcurrentCheckpoints(),
+ serializedOptions);
}
doFnRunner = createWrappingDoFnRunner(doFnRunner, stepContext);
earlyBindStateIfNeeded();
@@ -534,7 +540,7 @@ public class DoFnOperator<InputT, OutputT>
if (doFn != null) {
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
FlinkStateInternals.EarlyBinder earlyBinder =
- new FlinkStateInternals.EarlyBinder(getKeyedStateBackend());
+ new FlinkStateInternals.EarlyBinder(getKeyedStateBackend(),
serializedOptions);
for (DoFnSignature.StateDeclaration value :
signature.stateDeclarations().values()) {
StateSpec<?> spec =
(StateSpec<?>)
signature.stateDeclarations().get(value.id()).field().get(doFn);
@@ -1194,29 +1200,35 @@ public class DoFnOperator<InputT, OutputT>
private Map<TupleTag<?>, Integer> tagsToIds;
private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;
+ private final SerializablePipelineOptions pipelineOptions;
// There is no side output.
@SuppressWarnings("unchecked")
public MultiOutputOutputManagerFactory(
- TupleTag<OutputT> mainTag, Coder<WindowedValue<OutputT>> mainCoder) {
+ TupleTag<OutputT> mainTag,
+ Coder<WindowedValue<OutputT>> mainCoder,
+ SerializablePipelineOptions pipelineOptions) {
this(
mainTag,
new HashMap<>(),
ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
.put(mainTag, (Coder) mainCoder)
.build(),
- ImmutableMap.<TupleTag<?>, Integer>builder().put(mainTag,
0).build());
+ ImmutableMap.<TupleTag<?>, Integer>builder().put(mainTag, 0).build(),
+ pipelineOptions);
}
public MultiOutputOutputManagerFactory(
TupleTag<OutputT> mainTag,
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
- Map<TupleTag<?>, Integer> tagsToIds) {
+ Map<TupleTag<?>, Integer> tagsToIds,
+ SerializablePipelineOptions pipelineOptions) {
this.mainTag = mainTag;
this.tagsToOutputTags = tagsToOutputTags;
this.tagsToCoders = tagsToCoders;
this.tagsToIds = tagsToIds;
+ this.pipelineOptions = pipelineOptions;
}
@Override
@@ -1231,7 +1243,8 @@ public class DoFnOperator<InputT, OutputT>
TaggedKvCoder taggedKvCoder = buildTaggedKvCoder();
ListStateDescriptor<KV<Integer, WindowedValue<?>>>
taggedOutputPushbackStateDescriptor =
- new ListStateDescriptor<>("bundle-buffer-tag", new
CoderTypeSerializer<>(taggedKvCoder));
+ new ListStateDescriptor<>(
+ "bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder,
pipelineOptions));
ListState<KV<Integer, WindowedValue<?>>> listStateBuffer =
operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
PushedBackElementsHandler<KV<Integer, WindowedValue<?>>>
pushedBackElementsHandler =
@@ -1292,7 +1305,7 @@ public class DoFnOperator<InputT, OutputT>
new MapStateDescriptor<>(
PENDING_TIMERS_STATE_NAME,
new StringSerializer(),
- new CoderTypeSerializer<>(timerCoder));
+ new CoderTypeSerializer<>(timerCoder, serializedOptions));
this.pendingTimersById =
getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor);
populateOutputTimestampQueue();
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 08d7a20..d655cb9 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -54,6 +54,7 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.UserStateReference;
@@ -133,6 +134,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
/** A lock which has to be acquired when concurrently accessing state and
timers. */
private final ReentrantLock stateBackendLock;
+ private final SerializablePipelineOptions pipelineOptions;
+
private final boolean isStateful;
private transient ExecutableStageContext stageContext;
@@ -197,6 +200,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
this.outputMap = outputMap;
this.sideInputIds = sideInputIds;
this.stateBackendLock = new ReentrantLock();
+ this.pipelineOptions = new SerializablePipelineOptions(options);
}
@Override
@@ -207,7 +211,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
@Override
public void open() throws Exception {
executableStage = ExecutableStage.fromPayload(payload);
- initializeUserState(executableStage, getKeyedStateBackend());
+ initializeUserState(executableStage, getKeyedStateBackend(),
pipelineOptions);
// TODO: Wire this into the distributed cache and make it pluggable.
// TODO: Do we really want this layer of indirection when accessing the
stage bundle factory?
// It's a little strange because this operator is responsible for the
lifetime of the stage
@@ -1022,7 +1026,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
* Eagerly create the user state to work around
https://jira.apache.org/jira/browse/FLINK-12653.
*/
private static void initializeUserState(
- ExecutableStage executableStage, @Nullable KeyedStateBackend
keyedStateBackend) {
+ ExecutableStage executableStage,
+ @Nullable KeyedStateBackend keyedStateBackend,
+ SerializablePipelineOptions pipelineOptions) {
executableStage
.getUserStates()
.forEach(
@@ -1031,7 +1037,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
keyedStateBackend.getOrCreateKeyedState(
StringSerializer.INSTANCE,
new ListStateDescriptor<>(
- ref.localName(), new
CoderTypeSerializer<>(ByteStringCoder.of())));
+ ref.localName(),
+ new CoderTypeSerializer<>(ByteStringCoder.of(),
pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException("Couldn't initialize user states.",
e);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
index 3e5e22c..68c891d 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming;
import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
@@ -35,9 +36,11 @@ public class KvToByteBufferKeySelector<K, V>
implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>,
ResultTypeQueryable<ByteBuffer> {
private final Coder<K> keyCoder;
+ private final SerializablePipelineOptions pipelineOptions;
- public KvToByteBufferKeySelector(Coder<K> keyCoder) {
+ public KvToByteBufferKeySelector(Coder<K> keyCoder,
SerializablePipelineOptions pipelineOptions) {
this.keyCoder = keyCoder;
+ this.pipelineOptions = pipelineOptions;
}
@Override
@@ -48,6 +51,6 @@ public class KvToByteBufferKeySelector<K, V>
@Override
public TypeInformation<ByteBuffer> getProducedType() {
- return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of());
+ return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(),
pipelineOptions.get());
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 06729cd..9fae533 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -430,7 +430,9 @@ public class UnboundedSourceWrapper<OutputT,
CheckpointMarkT extends UnboundedSo
OperatorStateStore stateStore = context.getOperatorStateStore();
@SuppressWarnings("unchecked")
CoderTypeInformation<KV<? extends UnboundedSource<OutputT,
CheckpointMarkT>, CheckpointMarkT>>
- typeInformation = (CoderTypeInformation) new
CoderTypeInformation<>(checkpointCoder);
+ typeInformation =
+ (CoderTypeInformation)
+ new CoderTypeInformation<>(checkpointCoder,
serializedOptions.get());
stateForCheckpoint =
stateStore.getListState(
new ListStateDescriptor<>(
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 31709cd..b6a1ef2 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
@@ -55,7 +56,8 @@ public class BufferingDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT,
org.apache.beam.sdk.coders.Coder windowCoder,
OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend<Object> keyedStateBackend,
- int maxConcurrentCheckpoints)
+ int maxConcurrentCheckpoints,
+ SerializablePipelineOptions pipelineOptions)
throws Exception {
return new BufferingDoFnRunner<>(
doFnRunner,
@@ -64,7 +66,8 @@ public class BufferingDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT,
windowCoder,
operatorStateBackend,
keyedStateBackend,
- maxConcurrentCheckpoints);
+ maxConcurrentCheckpoints,
+ pipelineOptions);
}
/** The underlying DoFnRunner that any buffered data will be handed over to
eventually. */
@@ -87,7 +90,8 @@ public class BufferingDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT,
org.apache.beam.sdk.coders.Coder windowCoder,
OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
- int maxConcurrentCheckpoints)
+ int maxConcurrentCheckpoints,
+ SerializablePipelineOptions pipelineOptions)
throws Exception {
Preconditions.checkArgument(
maxConcurrentCheckpoints > 0 && maxConcurrentCheckpoints <
Short.MAX_VALUE,
@@ -104,7 +108,7 @@ public class BufferingDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT,
new ListStateDescriptor<>(
stateName + stateId,
new CoderTypeSerializer<>(
- new BufferedElements.Coder(inputCoder, windowCoder,
null)));
+ new BufferedElements.Coder(inputCoder, windowCoder,
null), pipelineOptions));
if (keyedStateBackend != null) {
return KeyedBufferingElementsHandler.create(keyedStateBackend,
stateDescriptor);
} else {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 794a0e4..2c46c9e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -27,11 +27,13 @@ import java.util.Map;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
@@ -68,9 +70,15 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
// stateName -> <namespace, state>
private Map<String, Map<String, ?>> stateForNonZeroOperator;
- public FlinkBroadcastStateInternals(int indexInSubtaskGroup,
OperatorStateBackend stateBackend) {
+ private final SerializablePipelineOptions pipelineOptions;
+
+ public FlinkBroadcastStateInternals(
+ int indexInSubtaskGroup,
+ OperatorStateBackend stateBackend,
+ SerializablePipelineOptions pipelineOptions) {
this.stateBackend = stateBackend;
this.indexInSubtaskGroup = indexInSubtaskGroup;
+ this.pipelineOptions = pipelineOptions;
if (indexInSubtaskGroup != 0) {
stateForNonZeroOperator = new HashMap<>();
}
@@ -91,13 +99,15 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
@Override
public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>>
address, Coder<T2> coder) {
- return new FlinkBroadcastValueState<>(stateBackend, address,
namespace, coder);
+ return new FlinkBroadcastValueState<>(
+ stateBackend, address, namespace, coder,
pipelineOptions.get());
}
@Override
public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address,
Coder<T2> elemCoder) {
- return new FlinkBroadcastBagState<>(stateBackend, address,
namespace, elemCoder);
+ return new FlinkBroadcastBagState<>(
+ stateBackend, address, namespace, elemCoder,
pipelineOptions.get());
}
@Override
@@ -130,7 +140,7 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
return new FlinkCombiningState<>(
- stateBackend, address, combineFn, namespace, accumCoder);
+ stateBackend, address, combineFn, namespace, accumCoder,
pipelineOptions.get());
}
@Override
@@ -175,14 +185,15 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
OperatorStateBackend flinkStateBackend,
String name,
StateNamespace namespace,
- Coder<T> coder) {
+ Coder<T> coder,
+ PipelineOptions pipelineOptions) {
this.name = name;
this.namespace = namespace;
this.flinkStateBackend = flinkStateBackend;
CoderTypeInformation<Map<String, T>> typeInfo =
- new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder));
+ new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder),
pipelineOptions);
flinkStateDescriptor =
new ListStateDescriptor<>(name, typeInfo.createSerializer(new
ExecutionConfig()));
@@ -290,8 +301,9 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
OperatorStateBackend flinkStateBackend,
StateTag<ValueState<T>> address,
StateNamespace namespace,
- Coder<T> coder) {
- super(flinkStateBackend, address.getId(), namespace, coder);
+ Coder<T> coder,
+ PipelineOptions pipelineOptions) {
+ super(flinkStateBackend, address.getId(), namespace, coder,
pipelineOptions);
this.namespace = namespace;
this.address = address;
@@ -349,8 +361,9 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
OperatorStateBackend flinkStateBackend,
StateTag<BagState<T>> address,
StateNamespace namespace,
- Coder<T> coder) {
- super(flinkStateBackend, address.getId(), namespace,
ListCoder.of(coder));
+ Coder<T> coder,
+ PipelineOptions pipelineOptions) {
+ super(flinkStateBackend, address.getId(), namespace,
ListCoder.of(coder), pipelineOptions);
this.namespace = namespace;
this.address = address;
@@ -436,8 +449,9 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
- Coder<AccumT> accumCoder) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder);
+ Coder<AccumT> accumCoder,
+ PipelineOptions pipelineOptions) {
+ super(flinkStateBackend, address.getId(), namespace, accumCoder,
pipelineOptions);
this.namespace = namespace;
this.address = address;
@@ -552,8 +566,9 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
- FlinkBroadcastStateInternals<K2> flinkStateInternals) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder);
+ FlinkBroadcastStateInternals<K2> flinkStateInternals,
+ PipelineOptions pipelineOptions) {
+ super(flinkStateBackend, address.getId(), namespace, accumCoder,
pipelineOptions);
this.namespace = namespace;
this.address = address;
@@ -687,7 +702,7 @@ public class FlinkBroadcastStateInternals<K> implements
StateInternals {
Coder<AccumT> accumCoder,
FlinkBroadcastStateInternals<K2> flinkStateInternals,
CombineWithContext.Context context) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder);
+ super(flinkStateBackend, address.getId(), namespace, accumCoder,
pipelineOptions.get());
this.namespace = namespace;
this.address = address;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index e2d85af..55f3628 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.sdk.coders.Coder;
@@ -96,16 +97,24 @@ public class FlinkStateInternals<K> implements
StateInternals {
// Watermark holds for all keys/windows of this partition, allows efficient
lookup of the minimum
private final TreeMultiset<Long> watermarkHolds = TreeMultiset.create();
// State to persist combined watermark holds for all keys of this partition
- private final MapStateDescriptor<String, Instant>
watermarkHoldStateDescriptor =
- new MapStateDescriptor<>(
- "watermark-holds",
- StringSerializer.INSTANCE,
- new CoderTypeSerializer<>(InstantCoder.of()));
+ private final MapStateDescriptor<String, Instant>
watermarkHoldStateDescriptor;
- public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend,
Coder<K> keyCoder)
+ private final SerializablePipelineOptions pipelineOptions;
+
+ public FlinkStateInternals(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ Coder<K> keyCoder,
+ SerializablePipelineOptions pipelineOptions)
throws Exception {
this.flinkStateBackend = flinkStateBackend;
this.keyCoder = keyCoder;
+ watermarkHoldStateDescriptor =
+ new MapStateDescriptor<>(
+ "watermark-holds",
+ StringSerializer.INSTANCE,
+ new CoderTypeSerializer<>(InstantCoder.of(), pipelineOptions));
+ this.pipelineOptions = pipelineOptions;
+
restoreWatermarkHoldsView();
}
@@ -166,7 +175,7 @@ public class FlinkStateInternals<K> implements
StateInternals {
public <T2> ValueState<T2> bindValue(
String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) {
FlinkValueState<T2> valueState =
- new FlinkValueState<>(flinkStateBackend, id, namespace, coder);
+ new FlinkValueState<>(flinkStateBackend, id, namespace, coder,
pipelineOptions);
collectGlobalWindowStateDescriptor(valueState.flinkStateDescriptor);
return valueState;
}
@@ -174,14 +183,15 @@ public class FlinkStateInternals<K> implements
StateInternals {
@Override
public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec,
Coder<T2> elemCoder) {
FlinkBagState<Object, T2> bagState =
- new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder);
+ new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder,
pipelineOptions);
collectGlobalWindowStateDescriptor(bagState.flinkStateDescriptor);
return bagState;
}
@Override
public <T2> SetState<T2> bindSet(String id, StateSpec<SetState<T2>> spec,
Coder<T2> elemCoder) {
- FlinkSetState<T2> setState = new FlinkSetState<>(flinkStateBackend, id,
namespace, elemCoder);
+ FlinkSetState<T2> setState =
+ new FlinkSetState<>(flinkStateBackend, id, namespace, elemCoder,
pipelineOptions);
collectGlobalWindowStateDescriptor(setState.flinkStateDescriptor);
return setState;
}
@@ -193,7 +203,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
FlinkMapState<KeyT, ValueT> mapState =
- new FlinkMapState<>(flinkStateBackend, id, namespace, mapKeyCoder,
mapValueCoder);
+ new FlinkMapState<>(
+ flinkStateBackend, id, namespace, mapKeyCoder, mapValueCoder,
pipelineOptions);
collectGlobalWindowStateDescriptor(mapState.flinkStateDescriptor);
return mapState;
}
@@ -212,7 +223,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
FlinkCombiningState<Object, InputT, AccumT, OutputT> combiningState =
- new FlinkCombiningState<>(flinkStateBackend, id, combineFn,
namespace, accumCoder);
+ new FlinkCombiningState<>(
+ flinkStateBackend, id, combineFn, namespace, accumCoder,
pipelineOptions);
collectGlobalWindowStateDescriptor(combiningState.flinkStateDescriptor);
return combiningState;
}
@@ -231,7 +243,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
combineFn,
namespace,
accumCoder,
- CombineContextFactory.createFromStateContext(stateContext));
+ CombineContextFactory.createFromStateContext(stateContext),
+ pipelineOptions);
collectGlobalWindowStateDescriptor(combiningStateWithContext.flinkStateDescriptor);
return combiningStateWithContext;
}
@@ -263,13 +276,15 @@ public class FlinkStateInternals<K> implements
StateInternals {
KeyedStateBackend<ByteBuffer> flinkStateBackend,
String stateId,
StateNamespace namespace,
- Coder<T> coder) {
+ Coder<T> coder,
+ SerializablePipelineOptions pipelineOptions) {
this.namespace = namespace;
this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
- flinkStateDescriptor = new ValueStateDescriptor<>(stateId, new
CoderTypeSerializer<>(coder));
+ flinkStateDescriptor =
+ new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder,
pipelineOptions));
}
@Override
@@ -347,14 +362,15 @@ public class FlinkStateInternals<K> implements
StateInternals {
KeyedStateBackend<ByteBuffer> flinkStateBackend,
String stateId,
StateNamespace namespace,
- Coder<T> coder) {
+ Coder<T> coder,
+ SerializablePipelineOptions pipelineOptions) {
this.namespace = namespace;
this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
this.storesVoidValues = coder instanceof VoidCoder;
this.flinkStateDescriptor =
- new ListStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder));
+ new ListStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder,
pipelineOptions));
}
@Override
@@ -486,7 +502,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
String stateId,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
- Coder<AccumT> accumCoder) {
+ Coder<AccumT> accumCoder,
+ SerializablePipelineOptions pipelineOptions) {
this.namespace = namespace;
this.stateId = stateId;
@@ -494,7 +511,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
this.flinkStateBackend = flinkStateBackend;
flinkStateDescriptor =
- new ValueStateDescriptor<>(stateId, new
CoderTypeSerializer<>(accumCoder));
+ new ValueStateDescriptor<>(
+ stateId, new CoderTypeSerializer<>(accumCoder, pipelineOptions));
}
@Override
@@ -649,7 +667,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>
combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
- CombineWithContext.Context context) {
+ CombineWithContext.Context context,
+ SerializablePipelineOptions pipelineOptions) {
this.namespace = namespace;
this.stateId = stateId;
@@ -658,7 +677,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
this.context = context;
flinkStateDescriptor =
- new ValueStateDescriptor<>(stateId, new
CoderTypeSerializer<>(accumCoder));
+ new ValueStateDescriptor<>(
+ stateId, new CoderTypeSerializer<>(accumCoder, pipelineOptions));
}
@Override
@@ -934,15 +954,16 @@ public class FlinkStateInternals<K> implements
StateInternals {
String stateId,
StateNamespace namespace,
Coder<KeyT> mapKeyCoder,
- Coder<ValueT> mapValueCoder) {
+ Coder<ValueT> mapValueCoder,
+ SerializablePipelineOptions pipelineOptions) {
this.namespace = namespace;
this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateDescriptor =
new MapStateDescriptor<>(
stateId,
- new CoderTypeSerializer<>(mapKeyCoder),
- new CoderTypeSerializer<>(mapValueCoder));
+ new CoderTypeSerializer<>(mapKeyCoder, pipelineOptions),
+ new CoderTypeSerializer<>(mapValueCoder, pipelineOptions));
}
@Override
@@ -1120,13 +1141,14 @@ public class FlinkStateInternals<K> implements
StateInternals {
KeyedStateBackend<ByteBuffer> flinkStateBackend,
String stateId,
StateNamespace namespace,
- Coder<T> coder) {
+ Coder<T> coder,
+ SerializablePipelineOptions pipelineOptions) {
this.namespace = namespace;
this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateDescriptor =
new MapStateDescriptor<>(
- stateId, new CoderTypeSerializer<>(coder), new
BooleanSerializer());
+ stateId, new CoderTypeSerializer<>(coder, pipelineOptions), new
BooleanSerializer());
}
@Override
@@ -1287,9 +1309,12 @@ public class FlinkStateInternals<K> implements
StateInternals {
public static class EarlyBinder implements StateBinder {
private final KeyedStateBackend keyedStateBackend;
+ private final SerializablePipelineOptions pipelineOptions;
- public EarlyBinder(KeyedStateBackend keyedStateBackend) {
+ public EarlyBinder(
+ KeyedStateBackend keyedStateBackend, SerializablePipelineOptions
pipelineOptions) {
this.keyedStateBackend = keyedStateBackend;
+ this.pipelineOptions = pipelineOptions;
}
@Override
@@ -1297,7 +1322,7 @@ public class FlinkStateInternals<K> implements
StateInternals {
try {
keyedStateBackend.getOrCreateKeyedState(
StringSerializer.INSTANCE,
- new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder)));
+ new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder,
pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1310,7 +1335,7 @@ public class FlinkStateInternals<K> implements
StateInternals {
try {
keyedStateBackend.getOrCreateKeyedState(
StringSerializer.INSTANCE,
- new ListStateDescriptor<>(id, new
CoderTypeSerializer<>(elemCoder)));
+ new ListStateDescriptor<>(id, new CoderTypeSerializer<>(elemCoder,
pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1324,7 +1349,9 @@ public class FlinkStateInternals<K> implements
StateInternals {
keyedStateBackend.getOrCreateKeyedState(
StringSerializer.INSTANCE,
new MapStateDescriptor<>(
- id, new CoderTypeSerializer<>(elemCoder),
VoidSerializer.INSTANCE));
+ id,
+ new CoderTypeSerializer<>(elemCoder, pipelineOptions),
+ VoidSerializer.INSTANCE));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1342,8 +1369,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
StringSerializer.INSTANCE,
new MapStateDescriptor<>(
id,
- new CoderTypeSerializer<>(mapKeyCoder),
- new CoderTypeSerializer<>(mapValueCoder)));
+ new CoderTypeSerializer<>(mapKeyCoder, pipelineOptions),
+ new CoderTypeSerializer<>(mapValueCoder, pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1366,7 +1393,7 @@ public class FlinkStateInternals<K> implements
StateInternals {
try {
keyedStateBackend.getOrCreateKeyedState(
StringSerializer.INSTANCE,
- new ValueStateDescriptor<>(id, new
CoderTypeSerializer<>(accumCoder)));
+ new ValueStateDescriptor<>(id, new
CoderTypeSerializer<>(accumCoder, pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1383,7 +1410,7 @@ public class FlinkStateInternals<K> implements
StateInternals {
try {
keyedStateBackend.getOrCreateKeyedState(
StringSerializer.INSTANCE,
- new ValueStateDescriptor<>(id, new
CoderTypeSerializer<>(accumCoder)));
+ new ValueStateDescriptor<>(id, new
CoderTypeSerializer<>(accumCoder, pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1399,7 +1426,7 @@ public class FlinkStateInternals<K> implements
StateInternals {
new MapStateDescriptor<>(
"watermark-holds",
StringSerializer.INSTANCE,
- new CoderTypeSerializer<>(InstantCoder.of())));
+ new CoderTypeSerializer<>(InstantCoder.of(),
pipelineOptions)));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index f2eb3b1..fc4686a 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -28,7 +28,6 @@ import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.util.Collections;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
@@ -54,7 +53,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSetParallelismBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setParallelism(42);
@@ -68,7 +67,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSetParallelismStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setParallelism(42);
@@ -82,7 +81,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSetMaxParallelismStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setMaxParallelism(42);
@@ -98,7 +97,7 @@ public class FlinkExecutionEnvironmentsTest {
public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
String flinkConfDir = extractFlinkConfig();
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("host:80");
@@ -114,7 +113,7 @@ public class FlinkExecutionEnvironmentsTest {
public void shouldInferParallelismFromEnvironmentStreaming() throws
IOException {
String confDir = extractFlinkConfig();
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("host:80");
@@ -128,7 +127,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldFallbackToDefaultParallelismBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("host:80");
@@ -142,7 +141,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldFallbackToDefaultParallelismStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("host:80");
@@ -156,7 +155,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void useDefaultParallelismFromContextBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
ExecutionEnvironment bev =
@@ -170,7 +169,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void useDefaultParallelismFromContextStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
StreamExecutionEnvironment sev =
@@ -184,7 +183,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldParsePortForRemoteEnvironmentBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("host:1234");
@@ -198,7 +197,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldParsePortForRemoteEnvironmentStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("host:1234");
@@ -212,7 +211,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("host");
@@ -226,7 +225,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("host");
@@ -240,7 +239,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldTreatAutoAndEmptyHostTheSameBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
ExecutionEnvironment sev =
@@ -258,7 +257,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
StreamExecutionEnvironment sev =
@@ -276,7 +275,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldDetectMalformedPortBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("host:p0rt");
@@ -288,7 +287,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldDetectMalformedPortStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("host:p0rt");
@@ -300,7 +299,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSupportIPv4Batch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("192.168.1.1:1234");
@@ -318,7 +317,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSupportIPv4Streaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("192.168.1.1:1234");
@@ -336,7 +335,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSupportIPv6Batch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
@@ -355,7 +354,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSupportIPv6Streaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
@@ -374,7 +373,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldRemoveHttpProtocolFromHostBatch() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
for (String flinkMaster :
@@ -391,7 +390,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldRemoveHttpProtocolFromHostStreaming() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
for (String flinkMaster :
@@ -416,7 +415,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() {
// Checkpointing disabled, shut down sources immediately
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
Collections.emptyList());
assertThat(options.getShutdownSourcesAfterIdleMs(), is(0L));
}
@@ -424,7 +423,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldAutoSetIdleSourcesFlagWithCheckpointing() {
// Checkpointing is enabled, never shut down sources
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setCheckpointingInterval(1000L);
FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
Collections.emptyList());
assertThat(options.getShutdownSourcesAfterIdleMs(), is(Long.MAX_VALUE));
@@ -433,7 +432,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() {
// Checkpointing disabled, accept flag
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setShutdownSourcesAfterIdleMs(42L);
FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
Collections.emptyList());
assertThat(options.getShutdownSourcesAfterIdleMs(), is(42L));
@@ -442,7 +441,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() {
// Checkpointing enable, still accept flag
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setCheckpointingInterval(1000L);
options.setShutdownSourcesAfterIdleMs(42L);
FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
Collections.emptyList());
@@ -452,7 +451,7 @@ public class FlinkExecutionEnvironmentsTest {
@Test
public void shouldSetSavepointRestoreForRemoteStreaming() {
String path = "fakePath";
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("host:80");
options.setSavepointPath(path);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
index 4785f4e..2aafb90 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -46,7 +46,6 @@ import
org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -84,7 +83,7 @@ public class FlinkPipelineExecutionEnvironmentTest implements
Serializable {
@Test
public void shouldRecognizeAndTranslateStreamingPipeline() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("[auto]");
@@ -152,7 +151,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
@Test
public void shouldUseDefaultTempLocationIfNoneSet() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("clusterAddress");
@@ -168,7 +167,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
@Test
public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("clusterAddress");
@@ -189,7 +188,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
@Test
public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws
Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("clusterAddress");
options.setStreaming(true);
@@ -214,7 +213,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
public void shouldUseTransformOverrides() {
boolean[] testParameters = {true, false};
for (boolean streaming : testParameters) {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setStreaming(streaming);
options.setRunner(FlinkRunner.class);
FlinkPipelineExecutionEnvironment flinkEnv = new
FlinkPipelineExecutionEnvironment(options);
@@ -234,7 +233,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
@Test
public void shouldProvideParallelismToTransformOverrides() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setStreaming(true);
options.setRunner(FlinkRunner.class);
FlinkPipelineExecutionEnvironment flinkEnv = new
FlinkPipelineExecutionEnvironment(options);
@@ -278,7 +277,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
@Test
public void shouldUseStreamingTransformOverridesWithUnboundedSources() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
// no explicit streaming mode set
options.setRunner(FlinkRunner.class);
FlinkPipelineExecutionEnvironment flinkEnv = new
FlinkPipelineExecutionEnvironment(options);
@@ -303,7 +302,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
@Test
public void testTranslationModeOverrideWithUnboundedSources() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setStreaming(false);
@@ -319,7 +318,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
public void testTranslationModeNoOverrideWithoutUnboundedSources() {
boolean[] testArgs = new boolean[] {true, false};
for (boolean streaming : testArgs) {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setStreaming(streaming);
@@ -408,7 +407,7 @@ public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
private FlinkPipelineOptions setPipelineOptions(
String flinkMaster, String tempLocation, List<String> filesToStage) {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster(flinkMaster);
options.setTempLocation(tempLocation);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
index f99a41e..b9b9513 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.core.IsNull.nullValue;
import java.util.Collections;
import java.util.HashMap;
import
org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -70,7 +71,7 @@ public class FlinkPipelineOptionsTest {
/** These defaults should only be changed with a very good reason. */
@Test
public void testDefaults() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
assertThat(options.getParallelism(), is(-1));
assertThat(options.getMaxParallelism(), is(-1));
assertThat(options.getFlinkMaster(), is("[auto]"));
@@ -95,6 +96,7 @@ public class FlinkPipelineOptionsTest {
assertThat(options.getSavepointPath(), is(nullValue()));
assertThat(options.getAllowNonRestoredState(), is(false));
assertThat(options.getDisableMetrics(), is(false));
+ assertThat(options.getFasterCopy(), is(false));
}
@Test(expected = Exception.class)
@@ -108,7 +110,8 @@ public class FlinkPipelineOptionsTest {
Collections.emptyMap(),
mainTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainTag, coder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.globalDefault(),
new HashMap<>(),
Collections.emptyList(),
@@ -134,7 +137,8 @@ public class FlinkPipelineOptionsTest {
Collections.emptyMap(),
mainTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainTag, coder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.globalDefault(),
new HashMap<>(),
Collections.emptyList(),
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
index 6baf700..20e7627 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
@@ -123,7 +122,7 @@ public class FlinkRequiresStableInputTest {
*/
@Test(timeout = 30_000)
public void testParDoRequiresStableInput() throws Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setParallelism(1);
// We only want to trigger external savepoints but we require
// checkpointing to be enabled for @RequiresStableInput
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
index 4cda4cf..5aaa8ff 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -57,7 +56,7 @@ public class FlinkRunnerTest extends FlinkRunnerTestCompat {
/** Main method for {@code testEnsureStdoutStdErrIsRestored()}. */
public static void main(String[] args) {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(NotExecutingFlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(GenerateSequence.from(0));
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 9b928d1..d899917 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -33,7 +33,6 @@ import
org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
@@ -151,7 +150,7 @@ public class FlinkSavepointTest implements Serializable {
}
private void runSavepointAndRestore(boolean isPortablePipeline) throws
Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setStreaming(true);
// Initial parallelism
options.setParallelism(2);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
index 9b39917..193a787 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
@@ -161,7 +161,7 @@ public class FlinkSubmissionTest {
/** The Flink program which is executed by the CliFrontend. */
public static void main(String[] args) {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(FlinkRunner.class);
options.setStreaming(streaming);
options.setParallelism(1);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
index 5436137..4f060b5 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTransformOverridesTest.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -55,7 +54,7 @@ public class FlinkTransformOverridesTest {
@Test
public void testRunnerDeterminedSharding() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("[auto]");
options.setParallelism(5);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 0011db9..842852e 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.streaming;
import java.util.Collections;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsTest;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -45,7 +47,10 @@ public class FlinkBroadcastStateInternalsTest extends
StateInternalsTest {
OperatorStateBackend operatorStateBackend =
backend.createOperatorStateBackend(
new DummyEnvironment("test", 1, 0), "", Collections.emptyList(),
null);
- return new FlinkBroadcastStateInternals<>(1, operatorStateBackend);
+ return new FlinkBroadcastStateInternals<>(
+ 1,
+ operatorStateBackend,
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index c92c893..8aa0440 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -28,6 +28,8 @@ import org.apache.beam.runners.core.StateInternalsTest;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -62,7 +64,10 @@ public class FlinkStateInternalsTest extends
StateInternalsTest {
protected StateInternals createStateInternals() {
try {
KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
- return new FlinkStateInternals<>(keyedStateBackend,
StringUtf8Coder.of());
+ return new FlinkStateInternals<>(
+ keyedStateBackend,
+ StringUtf8Coder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -72,7 +77,10 @@ public class FlinkStateInternalsTest extends
StateInternalsTest {
public void testWatermarkHoldsPersistence() throws Exception {
KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
FlinkStateInternals stateInternals =
- new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of());
+ new FlinkStateInternals<>(
+ keyedStateBackend,
+ StringUtf8Coder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
StateTag<WatermarkHoldState> stateTag =
StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
@@ -123,7 +131,11 @@ public class FlinkStateInternalsTest extends
StateInternalsTest {
assertThat(fixedWindow.read(), is(middle));
// Discard watermark view and recover it
- stateInternals = new FlinkStateInternals<>(keyedStateBackend,
StringUtf8Coder.of());
+ stateInternals =
+ new FlinkStateInternals<>(
+ keyedStateBackend,
+ StringUtf8Coder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
globalWindow = stateInternals.state(StateNamespaces.global(), stateTag);
fixedWindow =
stateInternals.state(
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
index d783801..9245962 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.TestFlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -44,7 +43,7 @@ public class GroupByWithNullValuesTest implements
Serializable {
@Test
public void testGroupByWithNullValues() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setRunner(TestFlinkRunner.class);
options.setStreaming(true);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 06dfb53..9411967 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -38,6 +38,7 @@ import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
@@ -139,11 +139,12 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag,
coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag, coder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
null,
null,
DoFnSchemaInformation.create(),
@@ -204,11 +205,15 @@ public class DoFnOperatorTest {
mainOutput,
ImmutableList.of(additionalOutput1, additionalOutput2),
new DoFnOperator.MultiOutputOutputManagerFactory(
- mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds),
+ mainOutput,
+ tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
null,
null,
DoFnSchemaInformation.create(),
@@ -340,11 +345,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ outputCoder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
keyCoder, /* key coder */
keySelector,
DoFnSchemaInformation.create(),
@@ -354,9 +362,12 @@ public class DoFnOperatorTest {
new KeyedOneInputStreamOperatorTestHarness<>(
doFnOperator,
keySelector,
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
- testHarness.setup(new CoderTypeSerializer<>(outputCoder));
+ testHarness.setup(
+ new CoderTypeSerializer<>(
+ outputCoder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())));
testHarness.open();
@@ -449,11 +460,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ outputCoder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
keyCoder, /* key coder */
keySelector,
DoFnSchemaInformation.create(),
@@ -463,7 +477,8 @@ public class DoFnOperatorTest {
new KeyedOneInputStreamOperatorTestHarness<>(
doFnOperator,
keySelector,
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
testHarness.open();
@@ -687,11 +702,12 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag,
coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag, coder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
StringUtf8Coder.of(), /* key coder */
keySelector,
DoFnSchemaInformation.create(),
@@ -703,7 +719,8 @@ public class DoFnOperatorTest {
new KeyedOneInputStreamOperatorTestHarness<>(
doFnOperator,
keySelector,
- new
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
return testHarness;
}
@@ -741,11 +758,12 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag,
coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag, coder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
sideInputMapping, /* side-input mapping */
ImmutableList.of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
keyed ? keyCoder : null,
keyed ? keySelector : null,
DoFnSchemaInformation.create(),
@@ -761,7 +779,8 @@ public class DoFnOperatorTest {
doFnOperator,
keySelector,
null,
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
}
testHarness.open();
@@ -850,14 +869,15 @@ public class DoFnOperatorTest {
TupleTag<KV<String, Long>> outputTag = new TupleTag<>("main-output");
StringUtf8Coder keyCoder = StringUtf8Coder.of();
- KvToByteBufferKeySelector keySelector = new
KvToByteBufferKeySelector<>(keyCoder);
+ KvToByteBufferKeySelector keySelector = new
KvToByteBufferKeySelector<>(keyCoder, null);
KvCoder<String, Long> coder = KvCoder.of(keyCoder, VarLongCoder.of());
FullWindowedValueCoder<KV<String, Long>> kvCoder =
WindowedValue.getFullCoder(coder,
windowingStrategy.getWindowFn().windowCoder());
CoderTypeInformation<ByteBuffer> keyCoderInfo =
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of());
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults());
OneInputStreamOperatorTestHarness<
WindowedValue<KV<String, Long>>, WindowedValue<KV<String, Long>>>
@@ -935,11 +955,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ coder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.globalDefault(),
sideInputMapping, /* side-input mapping */
ImmutableList.of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
null,
null,
DoFnSchemaInformation.create(),
@@ -975,11 +998,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ coder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
sideInputMapping, /* side-input mapping */
ImmutableList.of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
keyCoder,
keySelector,
DoFnSchemaInformation.create(),
@@ -990,7 +1016,8 @@ public class DoFnOperatorTest {
keySelector,
// we use a dummy key for the second input since it is
considered to be broadcast
null,
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
});
}
@@ -1074,11 +1101,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ coder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
sideInputMapping, /* side-input mapping */
ImmutableList.of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
null,
null,
DoFnSchemaInformation.create(),
@@ -1115,11 +1145,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new
DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ coder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
WindowingStrategy.of(FixedWindows.of(Duration.millis(100))),
sideInputMapping, /* side-input mapping */
ImmutableList.of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
keyCoder,
keySelector,
DoFnSchemaInformation.create(),
@@ -1130,7 +1163,8 @@ public class DoFnOperatorTest {
keySelector,
// we use a dummy key for the second input since it is
considered to be broadcast
null,
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
});
}
@@ -1230,9 +1264,11 @@ public class DoFnOperatorTest {
TupleTag<String> outputTag = new TupleTag<>("main-output");
final CoderTypeSerializer<WindowedValue<String>> outputSerializer =
- new CoderTypeSerializer<>(outputCoder);
+ new CoderTypeSerializer<>(
+ outputCoder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
CoderTypeInformation<ByteBuffer> keyCoderInfo =
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of());
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults());
KeySelector<WindowedValue<Integer>, ByteBuffer> keySelector =
e -> FlinkKeyUtils.encodeKey(e.getValue(), keyCoder);
@@ -1310,11 +1346,14 @@ public class DoFnOperatorTest {
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag,
outputCoder),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag,
+ outputCoder,
+ new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
windowingStrategy,
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
keyCoder /* key coder */,
keySelector,
DoFnSchemaInformation.create(),
@@ -1331,7 +1370,7 @@ public class DoFnOperatorTest {
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
TupleTag<String> outputTag = new TupleTag<>("main-output");
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setMaxBundleTimeMills(10L);
@@ -1347,7 +1386,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
DoFnOperator<String, String> doFnOperator =
new DoFnOperator<>(
@@ -1465,13 +1505,15 @@ public class DoFnOperatorTest {
public void testBundleKeyed() throws Exception {
StringUtf8Coder keyCoder = StringUtf8Coder.of();
- KvToByteBufferKeySelector keySelector = new
KvToByteBufferKeySelector<>(keyCoder);
+ KvToByteBufferKeySelector keySelector =
+ new KvToByteBufferKeySelector<>(
+ keyCoder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
KvCoder<String, String> kvCoder = KvCoder.of(keyCoder,
StringUtf8Coder.of());
WindowedValue.ValueOnlyWindowedValueCoder<KV<String, String>>
windowedValueCoder =
WindowedValue.getValueOnlyCoder(kvCoder);
TupleTag<String> outputTag = new TupleTag<>("main-output");
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setMaxBundleTimeMills(10L);
@@ -1493,7 +1535,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(kvCoder.getValueCoder(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(kvCoder.getValueCoder(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator =
new DoFnOperator(
@@ -1596,7 +1639,7 @@ public class DoFnOperatorTest {
@Test
public void testCheckpointBufferingWithMultipleBundles() throws Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(10L);
options.setCheckpointingInterval(1L);
@@ -1609,7 +1652,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory<>(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
@SuppressWarnings("unchecked")
Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
@@ -1700,7 +1744,7 @@ public class DoFnOperatorTest {
@Test
public void testExactlyOnceBuffering() throws Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setCheckpointingInterval(1L);
@@ -1733,7 +1777,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
() ->
@@ -1810,14 +1855,15 @@ public class DoFnOperatorTest {
@Test
@SuppressWarnings("unchecked")
public void testExactlyOnceBufferingKeyed() throws Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setCheckpointingInterval(1L);
TupleTag<String> outputTag = new TupleTag<>("main-output");
StringUtf8Coder keyCoder = StringUtf8Coder.of();
- KvToByteBufferKeySelector keySelector = new
KvToByteBufferKeySelector<>(keyCoder);
+ KvToByteBufferKeySelector keySelector =
+ new KvToByteBufferKeySelector<>(keyCoder, new
SerializablePipelineOptions(options));
KvCoder<String, String> kvCoder = KvCoder.of(keyCoder,
StringUtf8Coder.of());
WindowedValue.ValueOnlyWindowedValueCoder<KV<String, String>>
windowedValueCoder =
WindowedValue.getValueOnlyCoder(kvCoder);
@@ -1848,7 +1894,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
Supplier<DoFnOperator<KV<String, String>, KV<String, String>>>
doFnOperatorSupplier =
() ->
@@ -1942,7 +1989,7 @@ public class DoFnOperatorTest {
TupleTag<String> outputTag = new TupleTag<>("main-output");
StringUtf8Coder keyCoder = StringUtf8Coder.of();
- KvToByteBufferKeySelector keySelector = new
KvToByteBufferKeySelector<>(keyCoder);
+ KvToByteBufferKeySelector keySelector = new
KvToByteBufferKeySelector<>(keyCoder, null);
KvCoder<String, String> kvCoder = KvCoder.of(keyCoder,
StringUtf8Coder.of());
WindowedValue.ValueOnlyWindowedValueCoder<KV<String, String>>
windowedValueCoder =
WindowedValue.getValueOnlyCoder(kvCoder);
@@ -1960,9 +2007,10 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
// should make the DoFnOperator creation fail
options.setCheckpointingInterval(-1L);
new DoFnOperator(
@@ -1985,7 +2033,7 @@ public class DoFnOperatorTest {
@Test
public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws
Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(10L);
options.setCheckpointingInterval(1L);
@@ -1998,7 +2046,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
@SuppressWarnings("unchecked")
DoFnOperator doFnOperator =
@@ -2079,7 +2128,7 @@ public class DoFnOperatorTest {
}
private static DoFnOperator getOperatorForCleanupInspection() {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setParallelism(4);
TupleTag<String> outputTag = new TupleTag<>("main-output");
@@ -2097,7 +2146,8 @@ public class DoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
- WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
+ new SerializablePipelineOptions(options));
return new DoFnOperator<>(
doFn,
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 02684bc..91ff57c 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -60,6 +60,7 @@ import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
@@ -84,7 +85,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -196,7 +196,10 @@ public class ExecutableStageDoFnOperatorTest {
public void sdkErrorsSurfaceOnClose() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VoidCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
ExecutableStageDoFnOperator<Integer, Integer> operator =
getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
@@ -225,7 +228,10 @@ public class ExecutableStageDoFnOperatorTest {
public void expectedInputsAreSent() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VoidCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
ExecutableStageDoFnOperator<Integer, Integer> operator =
getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
@@ -291,7 +297,11 @@ public class ExecutableStageDoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
new DoFnOperator.MultiOutputOutputManagerFactory(
- mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+ mainOutput,
+ tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
WindowedValue<Integer> zero = WindowedValue.valueInGlobalWindow(0);
WindowedValue<Integer> three = WindowedValue.valueInGlobalWindow(3);
@@ -411,7 +421,10 @@ public class ExecutableStageDoFnOperatorTest {
public void testWatermarkHandling() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VoidCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
ExecutableStageDoFnOperator<KV<String, Integer>, Integer> operator =
getOperator(
mainOutput,
@@ -428,7 +441,7 @@ public class ExecutableStageDoFnOperatorTest {
new KeyedOneInputStreamOperatorTestHarness<>(
operator,
val -> val.getValue().getKey(),
- new CoderTypeInformation<>(StringUtf8Coder.of()));
+ new CoderTypeInformation<>(StringUtf8Coder.of(),
FlinkPipelineOptions.defaults()));
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(bundle.getInputReceivers())
.thenReturn(
@@ -544,7 +557,10 @@ public class ExecutableStageDoFnOperatorTest {
public void testStageBundleClosed() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VoidCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
ExecutableStageDoFnOperator<Integer, Integer> operator =
getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
@@ -578,7 +594,10 @@ public class ExecutableStageDoFnOperatorTest {
public void testEnsureStateCleanupWithKeyedInput() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VarIntCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VarIntCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
VarIntCoder keyCoder = VarIntCoder.of();
ExecutableStageDoFnOperator<Integer, Integer> operator =
getOperator(
@@ -592,7 +611,9 @@ public class ExecutableStageDoFnOperatorTest {
KeyedOneInputStreamOperatorTestHarness<Integer, WindowedValue<Integer>,
WindowedValue<Integer>>
testHarness =
new KeyedOneInputStreamOperatorTestHarness(
- operator, val -> val, new CoderTypeInformation<>(keyCoder));
+ operator,
+ val -> val,
+ new CoderTypeInformation<>(keyCoder,
FlinkPipelineOptions.defaults()));
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(bundle.getInputReceivers())
@@ -691,7 +712,10 @@ public class ExecutableStageDoFnOperatorTest {
throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VoidCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
StringUtf8Coder keyCoder = StringUtf8Coder.of();
WindowingStrategy windowingStrategy =
@@ -738,7 +762,8 @@ public class ExecutableStageDoFnOperatorTest {
new KeyedOneInputStreamOperatorTestHarness(
operator,
operator.keySelector,
- new
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
testHarness.open();
@@ -858,7 +883,10 @@ public class ExecutableStageDoFnOperatorTest {
public void testEnsureStateCleanupOnFinalWatermark() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput,
+ VoidCoder.of(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
StringUtf8Coder keyCoder = StringUtf8Coder.of();
@@ -881,7 +909,8 @@ public class ExecutableStageDoFnOperatorTest {
new KeyedOneInputStreamOperatorTestHarness(
operator,
operator.keySelector,
- new
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(bundle.getInputReceivers())
@@ -1060,9 +1089,13 @@ public class ExecutableStageDoFnOperatorTest {
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
new DoFnOperator.MultiOutputOutputManagerFactory(
- mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+ mainOutput,
+ tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
ExecutableStageDoFnOperator<Integer, Integer> operator =
new ExecutableStageDoFnOperator<>(
@@ -1139,14 +1172,14 @@ public class ExecutableStageDoFnOperatorTest {
Collections.emptyMap() /* sideInputTagMapping */,
Collections.emptyList() /* sideInputs */,
Collections.emptyMap() /* sideInputId mapping */,
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
stagePayload,
jobInfo,
contextFactory,
createOutputMap(mainOutput, additionalOutputs),
windowingStrategy,
keyCoder,
- keyCoder != null ? new KvToByteBufferKeySelector<>(keyCoder) :
null);
+ keyCoder != null ? new KvToByteBufferKeySelector<>(keyCoder, null)
: null);
Whitebox.setInternalState(operator, "stateRequestHandler",
stateRequestHandler);
return operator;
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index 371cbd0..f37a875 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -32,13 +32,13 @@ import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -212,11 +212,14 @@ public class WindowDoFnOperatorTest {
(Coder) inputCoder,
outputTag,
emptyList(),
- new MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
+ new MultiOutputOutputManagerFactory<>(
+ outputTag,
+ outputCoder,
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
windowingStrategy,
emptyMap(),
emptyList(),
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ FlinkPipelineOptions.defaults(),
VarLongCoder.of(),
new WorkItemKeySelector(VarLongCoder.of()));
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
index 60204c7..a714643 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
@@ -125,7 +125,7 @@ public class UnboundedSourceWrapperTest {
@Test(timeout = 30_000)
public void testValueEmission() throws Exception {
final int numElementsPerShard = 20;
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
final long[] numElementsReceived = {0L};
final int[] numWatermarksReceived = {0};
@@ -608,7 +608,7 @@ public class UnboundedSourceWrapperTest {
private static void testSourceDoesNotShutdown(boolean shouldHaveReaders)
throws Exception {
final int parallelism = 2;
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
// Make sure we do not shut down
options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE);
@@ -696,7 +696,7 @@ public class UnboundedSourceWrapperTest {
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
CountingSource.upTo(1000));
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
UnboundedSourceWrapper<
Long,
UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint<Long>>
@@ -763,7 +763,7 @@ public class UnboundedSourceWrapperTest {
@Test
public void testAccumulatorRegistrationOnOperatorClose() throws Exception {
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
TestCountingSource source = new
TestCountingSource(20).withoutSplitting();
@@ -800,7 +800,7 @@ public class UnboundedSourceWrapperTest {
new IdlingUnboundedSource<>(
Arrays.asList("first", "second", "third"), StringUtf8Coder.of());
- FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setShutdownSourcesAfterIdleMs(0L);
options.setParallelism(4);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
index 27e5100..a1feb51 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
@@ -23,6 +23,8 @@ import static org.hamcrest.Matchers.is;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -166,6 +168,7 @@ public class BufferingDoFnRunnerTest {
WindowedValue.getFullCoder(VarIntCoder.of(),
GlobalWindow.Coder.INSTANCE),
operatorStateBackend,
null,
- concurrentCheckpoints);
+ concurrentCheckpoints,
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
}
}
diff --git
a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
index 9564469..2656acb 100644
--- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
@@ -73,6 +73,11 @@ Should be called before running the tests.
<td>Default: <code>true</code></td>
</tr>
<tr>
+ <td><code>fasterCopy</code></td>
+ <td>Remove unneeded deep copy between operators. See
https://issues.apache.org/jira/browse/BEAM-11146</td>
+ <td>Default: <code>false</code></td>
+</tr>
+<tr>
<td><code>filesToStage</code></td>
<td>Jar-Files to send to all workers and put on the classpath. The default
value is all files from the classpath.</td>
<td></td>
diff --git
a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
index 9abe508..17fb6f1 100644
--- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
@@ -73,6 +73,11 @@ Should be called before running the tests.
<td>Default: <code>true</code></td>
</tr>
<tr>
+ <td><code>faster_copy</code></td>
+ <td>Remove unneeded deep copy between operators. See
https://issues.apache.org/jira/browse/BEAM-11146</td>
+ <td>Default: <code>false</code></td>
+</tr>
+<tr>
<td><code>files_to_stage</code></td>
<td>Jar-Files to send to all workers and put on the classpath. The default
value is all files from the classpath.</td>
<td></td>