boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477498287



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) 
splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) 
splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), 
primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), 
residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> 
trySplitForTruncate(

Review comment:
       If we decided to unify `trySplitForTruncate ` and `trySplitForProcess `, 
then it's natural to unify `trySplitForWindowObservingTruncate` and 
`trySplitForWindowObservingProcess `. 
   I was trying to unify `trySplitForTruncate ` and `trySplitForProcess ` and 
here were concerns that prevented me doing so:
   
   - In order to compute scaled fractionOfRemainer produce element split, 
`trySplitForProcess` needs `currentRestrictionTracker`, where 
`trySplitForTruncate` needs `splitDelegate`.  If unifying these 2, we need both 
`RestrictionTracker` and `HandleSplits` as part of parameter list, which is 
already very long. In the function body, we also need separate branches to get 
element progress and splits from one of `currentRestrictionTracker` and 
`splitDelegate`.
   
   - `trySplitForProcess` returns `<WindowedSplitResult, Integer>` where 
`trySplitForTruncate` returns `<<WindowedSplitResult, 
HandlesSplits.SplitResult>, Integer>`. If unifying, we need to return 
`<WindowedSplitResult,  HandlesSplits.SplitResult, Integer>`, then in the 
caller function body, we still need separate branches to construct primary and 
residual.
   
   I'm OK to unify them but the way we can do only works for splitting at 
`process` and `truncate`. I don't think it's extendable to other splitting in 
the future if any. Also I feel like it makes the logic harder to understand, 
given that it's already complicated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to