boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r476125331



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the 
current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would 
have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of 
recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, 
primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())

Review comment:
       I think we should still use `transform id` and `main input id` from 
`process sized elements`. There are 2 possible cases:
   
   - We only have splits on window boundary
   - We only have element split on the last window
   - We have both element splits and window splits.
   
   If we choose `transfrom id` and `input id` from `truncate` for element 
split, it's also wrong conceptually. I prefer `process sized elements` because 
it will simplify the implementation on the runner side. For example, the runner 
doesn't need to distinguish window splits or element splits. If we treat 
`truncate` and `process` as a whole, it also make sense to use `transform id` 
from `process`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to