lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464651698



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -515,15 +515,18 @@
               && Iterables.get(mainOutputConsumers, 0) instanceof 
HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
+                  private final HandlesSplits splitDelegate =
+                      (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
       nit: here and below on 550
   ```suggestion
                         (HandlesSplits) 
Iterables.getOnlyElement(mainOutputConsumers);
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the 
current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would 
have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of 
recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, 
primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = 
ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, 
residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              
.setTransformId(windowedResidualRoot.getApplication().getTransformId())

Review comment:
       ditto, we should be using the `truncate` transfrom id and truncate 
`input` id.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the 
current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would 
have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of 
recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, 
primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())

Review comment:
       we should be using the `truncate` transform id and not the `process 
sized elements` transform id
   
   This will remove the assumption about what the input coder is and we can 
append our additional primaries and residuals on top of any additional 
residuals/primaries added by the downstream split.
   
   Doing this might require fixing Dataflow runner v2.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the 
current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would 
have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of 
recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, 
primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = 
ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, 
residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              
.setTransformId(windowedResidualRoot.getApplication().getTransformId())
+              .setInputId(windowedResidualRoot.getApplication().getInputId())
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+      // We don't want to change the output watermarks or set the checkpoint 
resume time since
+      // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       We should always be using the initial watermark state for the 
unprocessed windows and not reporting any watermark for these residual roots 
since it should be the same as it was.
   ```suggestion
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1193,6 +1328,7 @@ public Object restriction() {
               .setElement(bytesOut.toByteString());
       // We don't want to change the output watermarks or set the checkpoint 
resume time since
       // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       Same as above.
   ```suggestion
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
       We should be scaling the fraction of the remainder relative to the 
number of windows we have to find the best split point.
   
   This logic should essentially mirror 
https://github.com/apache/beam/blob/13c77a8f7a7c726aaa6dadad7812acdf8772ba7c/sdks/python/apache_beam/runners/common.py#L892
 except that we will be using the `truncate` transform id, input id, and coder.
   
   I suggest following the approach there where we have a static method that 
does all the heavy lifting so we can test it well and we have some simple 
wrappers which pass forward all the necessary arguments to the static method 
that is visible for testing.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -1452,11 +1452,22 @@ public Instant getInitialWatermarkEstimatorState() {
   static class WindowObservingTestSplittableDoFn extends 
NonWindowObservingTestSplittableDoFn {
 
     private final PCollectionView<String> singletonSideInput;
+    private static final long PROCESSED_WINDOW = 1;
+    private boolean splitAtTruncate = false;
+    private long processedWindowCount = 0;

Review comment:
       ```suggestion
       private final boolean splitAtTruncate;
       private long processedWindowCount;
   ```

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -1452,11 +1452,22 @@ public Instant getInitialWatermarkEstimatorState() {
   static class WindowObservingTestSplittableDoFn extends 
NonWindowObservingTestSplittableDoFn {
 
     private final PCollectionView<String> singletonSideInput;
+    private static final long PROCESSED_WINDOW = 1;
+    private boolean splitAtTruncate = false;
+    private long processedWindowCount = 0;
 
     private WindowObservingTestSplittableDoFn(PCollectionView<String> 
singletonSideInput) {
       this.singletonSideInput = singletonSideInput;
     }
 
+    private static WindowObservingTestSplittableDoFn forSplitAtTruncate(
+        PCollectionView<String> singletonSideInput) {
+      WindowObservingTestSplittableDoFn doFn =
+          new WindowObservingTestSplittableDoFn(singletonSideInput);

Review comment:
       nit: update the constructor to take this parameter instead of mutating 
it on the instance.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3013,19 +3069,107 @@ public void 
testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser
             startFunctionRegistry,
             finishFunctionRegistry,
             teardownFunctions::add,
-            null /* addProgressRequestCallback */,
-            null /* bundleSplitListener */,
+            progressRequestCallbacks::add /* addProgressRequestCallback */,
+            splitListener /* bundleSplitListener */,
             null /* bundleFinalizer */);
+
+    assertThat(consumers.keySet(), containsInAnyOrder(inputPCollectionId, 
outputPCollectionId));
     FnDataReceiver<WindowedValue<?>> mainInput =
         consumers.getMultiplexingConsumer(inputPCollectionId);
     assertThat(mainInput, instanceOf(HandlesSplits.class));
 
-    assertEquals(0, ((HandlesSplits) mainInput).getProgress(), 0.0);
-    assertNull(((HandlesSplits) mainInput).trySplit(0.4));
+    mainOutputValues.clear();
+    BoundedWindow window1 = new IntervalWindow(new Instant(5), new 
Instant(10));
+    BoundedWindow window2 = new IntervalWindow(new Instant(6), new 
Instant(11));
+    BoundedWindow window3 = new IntervalWindow(new Instant(7), new 
Instant(12));
+    // Setup and launch the trySplit thread.
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Future<HandlesSplits.SplitResult> trySplitFuture =
+        executorService.submit(
+            () -> {
+              try {
+                doFn.waitForSplitElementToBeProcessed();
+
+                return ((HandlesSplits) mainInput).trySplit(0);
+              } finally {
+                doFn.releaseWaitingProcessElementThread();
+              }
+            });
+
+    WindowedValue<?> splitValue =
+        valueInWindows(
+            KV.of(KV.of("7", KV.of(new OffsetRange(0, 6), 
GlobalWindow.TIMESTAMP_MIN_VALUE)), 6.0),
+            window1,
+            window2,
+            window3);
+    mainInput.accept(splitValue);
+    HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
+
+    // We expect that there are outputs from window1 and window2
+    assertThat(
+        mainOutputValues,
+        contains(
+            WindowedValue.of(
+                KV.of(
+                    KV.of("7", KV.of(new OffsetRange(0, 3), 
GlobalWindow.TIMESTAMP_MIN_VALUE)),
+                    3.0),
+                splitValue.getTimestamp(),
+                window1,
+                splitValue.getPane()),
+            WindowedValue.of(
+                KV.of(
+                    KV.of("7", KV.of(new OffsetRange(0, 3), 
GlobalWindow.TIMESTAMP_MIN_VALUE)),
+                    3.0),
+                splitValue.getTimestamp(),
+                window2,
+                splitValue.getPane())));
+
+    SplitResult expectedElementSplit = createSplitResult(0);

Review comment:
       We'll want to cover more scenarios then just this, see the ones I added 
in 
https://github.com/apache/beam/commit/ac6d80ed0cd31ec2b5233d6b3805ffb3ad71a0f1#diff-1b0bb2e59974e0a12a61a762a9add92a
   
   This would likely require refactoring the trySplit method to be a static 
method where all the parameters are passed to the method so that we don't have 
to worry about all the additional setup and we'll just be testing the core of 
the method. We can keep one variant around using this approach which well help 
ensure that we are using the correct locks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to