This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new af71782 Fix SplittableParDoNaiveBounded DoFnInvoker.
new c00dc4a Merge pull request #11475 from boyuanzz/fix
af71782 is described below
commit af717820535d3990aec15575b012e1df296fb3cc
Author: Boyuan Zhang <[email protected]>
AuthorDate: Mon Apr 20 20:52:24 2020 -0700
Fix SplittableParDoNaiveBounded DoFnInvoker.
---
.../runners/core/construction/SplittableParDoNaiveBounded.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index b002169..31ce3b6 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -42,6 +42,7 @@ import
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -300,7 +301,11 @@ public class SplittableParDoNaiveBounded {
// Fetch the watermark before splitting to ensure that the watermark
applies to both
// the primary and the residual.
watermarkEstimatorState = watermarkEstimator.getState();
- restriction = tracker.trySplit(0).getResidual();
+ SplitResult<RestrictionT> split = tracker.trySplit(0);
+ if (split == null) {
+ break;
+ }
+ restriction = split.getResidual();
Uninterruptibles.sleepUninterruptibly(
continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
} else {