This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6912011  [BEAM-9430] Fix coder sent to Dataflow service for 
non-portable pipelines due to WatermarkEstimators migration change
     new 1c35224  Merge pull request #11192 from lukecwik/splittabledofn
6912011 is described below

commit 6912011bd7b9745d5a3fd195165482bcc39ffa32
Author: Luke Cwik <[email protected]>
AuthorDate: Mon Mar 23 08:24:58 2020 -0700

    [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines 
due to WatermarkEstimators migration change
---
 .../runners/dataflow/DataflowPipelineTranslator.java | 20 ++++++++++++++------
 .../dataflow/DataflowPipelineTranslatorTest.java     |  3 ++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 5f7ec26..8be785e 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -978,12 +978,16 @@ public class DataflowPipelineTranslator {
             if (context.isFnApi()) {
               DoFnSignature signature = 
DoFnSignatures.signatureForDoFn(transform.getFn());
               if (signature.processElement().isSplittable()) {
-                Coder<?> restrictionCoder =
-                    DoFnInvokers.invokerFor(transform.getFn())
-                        .invokeGetRestrictionCoder(
-                            
context.getInput(transform).getPipeline().getCoderRegistry());
+                DoFnInvoker<?, ?> doFnInvoker = 
DoFnInvokers.invokerFor(transform.getFn());
+                Coder<?> restrictionAndWatermarkStateCoder =
+                    KvCoder.of(
+                        doFnInvoker.invokeGetRestrictionCoder(
+                            
context.getInput(transform).getPipeline().getCoderRegistry()),
+                        doFnInvoker.invokeGetWatermarkEstimatorStateCoder(
+                            
context.getInput(transform).getPipeline().getCoderRegistry()));
                 stepContext.addInput(
-                    PropertyNames.RESTRICTION_ENCODING, 
translateCoder(restrictionCoder, context));
+                    PropertyNames.RESTRICTION_ENCODING,
+                    translateCoder(restrictionAndWatermarkStateCoder, 
context));
               }
             }
           }
@@ -1190,7 +1194,11 @@ public class DataflowPipelineTranslator {
 
             stepContext.addInput(
                 PropertyNames.RESTRICTION_CODER,
-                translateCoder(transform.getRestrictionCoder(), context));
+                translateCoder(
+                    KvCoder.of(
+                        transform.getRestrictionCoder(),
+                        transform.getWatermarkEstimatorStateCoder()),
+                    context));
           }
         });
   }
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d48f060..775c782 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -733,7 +733,8 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 Structs.getObject(
                     processKeyedStep.getProperties(), 
PropertyNames.RESTRICTION_CODER));
 
-    assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+    assertEquals(
+        KvCoder.of(SerializableCoder.of(OffsetRange.class), VoidCoder.of()), 
restrictionCoder);
   }
 
   /** Smoke test to fail fast if translation of a splittable ParDo in FnAPI. */

Reply via email to