boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464686872



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the 
current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would 
have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of 
recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, 
primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = 
ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, 
residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              
.setTransformId(windowedResidualRoot.getApplication().getTransformId())
+              .setInputId(windowedResidualRoot.getApplication().getInputId())
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+      // We don't want to change the output watermarks or set the checkpoint 
resume time since
+      // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       > We should always be using the initial watermark state for the 
unprocessed windows
   
   Yes, my point is we should considering set residual's watermarkHold as 
initial watermark. Otherwise, the runner will use `MIN_TIMESTAMP ` as default, 
which may hold back the watermark unnecessarily.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to