This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6912011 [BEAM-9430] Fix coder sent to Dataflow service for
non-portable pipelines due to WatermarkEstimators migration change
new 1c35224 Merge pull request #11192 from lukecwik/splittabledofn
6912011 is described below
commit 6912011bd7b9745d5a3fd195165482bcc39ffa32
Author: Luke Cwik <[email protected]>
AuthorDate: Mon Mar 23 08:24:58 2020 -0700
[BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines
due to WatermarkEstimators migration change
---
.../runners/dataflow/DataflowPipelineTranslator.java | 20 ++++++++++++++------
.../dataflow/DataflowPipelineTranslatorTest.java | 3 ++-
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 5f7ec26..8be785e 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -978,12 +978,16 @@ public class DataflowPipelineTranslator {
if (context.isFnApi()) {
DoFnSignature signature =
DoFnSignatures.signatureForDoFn(transform.getFn());
if (signature.processElement().isSplittable()) {
- Coder<?> restrictionCoder =
- DoFnInvokers.invokerFor(transform.getFn())
- .invokeGetRestrictionCoder(
-
context.getInput(transform).getPipeline().getCoderRegistry());
+ DoFnInvoker<?, ?> doFnInvoker =
DoFnInvokers.invokerFor(transform.getFn());
+ Coder<?> restrictionAndWatermarkStateCoder =
+ KvCoder.of(
+ doFnInvoker.invokeGetRestrictionCoder(
+
context.getInput(transform).getPipeline().getCoderRegistry()),
+ doFnInvoker.invokeGetWatermarkEstimatorStateCoder(
+
context.getInput(transform).getPipeline().getCoderRegistry()));
stepContext.addInput(
- PropertyNames.RESTRICTION_ENCODING,
translateCoder(restrictionCoder, context));
+ PropertyNames.RESTRICTION_ENCODING,
+ translateCoder(restrictionAndWatermarkStateCoder,
context));
}
}
}
@@ -1190,7 +1194,11 @@ public class DataflowPipelineTranslator {
stepContext.addInput(
PropertyNames.RESTRICTION_CODER,
- translateCoder(transform.getRestrictionCoder(), context));
+ translateCoder(
+ KvCoder.of(
+ transform.getRestrictionCoder(),
+ transform.getWatermarkEstimatorStateCoder()),
+ context));
}
});
}
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d48f060..775c782 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -733,7 +733,8 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
Structs.getObject(
processKeyedStep.getProperties(),
PropertyNames.RESTRICTION_CODER));
- assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+ assertEquals(
+ KvCoder.of(SerializableCoder.of(OffsetRange.class), VoidCoder.of()),
restrictionCoder);
}
/** Smoke test to fail fast if translation of a splittable ParDo in FnAPI. */