boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477498287
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
return Progress.from(completed, remaining);
}
+ private WindowedSplitResult calculateRestrictionSize(
+ WindowedSplitResult splitResult, String errorContext) {
+ double fullSize =
+ splitResult.getResidualInUnprocessedWindowsRoot() == null
+ && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+ ? 0
+ : doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT,
OutputT>(processContext, errorContext) {
+ @Override
+ public Object restriction() {
+ return currentRestriction;
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+ double primarySize =
+ splitResult.getPrimarySplitRoot() == null
+ ? 0
+ : doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT,
OutputT>(processContext, errorContext) {
+ @Override
+ public Object restriction() {
+ return ((KV<?, KV<?, ?>>)
splitResult.getPrimarySplitRoot().getValue())
+ .getValue()
+ .getKey();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+ double residualSize =
+ splitResult.getResidualSplitRoot() == null
+ ? 0
+ : doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT,
OutputT>(processContext, errorContext) {
+ @Override
+ public Object restriction() {
+ return ((KV<?, KV<?, ?>>)
splitResult.getResidualSplitRoot().getValue())
+ .getValue()
+ .getKey();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+ return WindowedSplitResult.forRoots(
+ splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+ ? null
+ : WindowedValue.of(
+
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+ splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+ splitResult.getPrimarySplitRoot() == null
+ ? null
+ : WindowedValue.of(
+ KV.of(splitResult.getPrimarySplitRoot().getValue(),
primarySize),
+ splitResult.getPrimarySplitRoot().getTimestamp(),
+ splitResult.getPrimarySplitRoot().getWindows(),
+ splitResult.getPrimarySplitRoot().getPane()),
+ splitResult.getResidualSplitRoot() == null
+ ? null
+ : WindowedValue.of(
+ KV.of(splitResult.getResidualSplitRoot().getValue(),
residualSize),
+ splitResult.getResidualSplitRoot().getTimestamp(),
+ splitResult.getResidualSplitRoot().getWindows(),
+ splitResult.getResidualSplitRoot().getPane()),
+ splitResult.getResidualInUnprocessedWindowsRoot() == null
+ ? null
+ : WindowedValue.of(
+
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+ splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+ splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+ }
+
+ @VisibleForTesting
+ static <WatermarkEstimatorStateT>
+ KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer>
trySplitForTruncate(
Review comment:
If we decided to unify `trySplitForTruncate ` and `trySplitForProcess `,
then it's natural to unify `trySplitForWindowObservingTruncate` and
`trySplitForWindowObservingProcess `.
I was trying to unify `trySplitForTruncate ` and `trySplitForProcess ` and
here were concerns that prevented me doing so:
- In order to compute scaled fractionOfRemainer produce element split,
`trySplitForProcess` needs `currentRestrictionTracker`, where
`trySplitForTruncate` needs `splitDelegate`. If unifying these 2, we need both
`RestrictionTracker` and `HandleSplits` as part of parameter list, which is
already very long. In the function body, we also need separate branches to get
element progress and splits from one of `currentRestrictionTracker` and
`splitDelegate`.
- `trySplitForProcess` returns `<WindowedSplitResult, Integer>` where
`trySplitForTruncate` returns `<<WindowedSplitResult,
HandlesSplits.SplitResult>, Integer>`. If unifying, we need to return
`<WindowedSplitResult, HandlesSplits.SplitResult, Integer>`, then in the
caller function body, we still need separate branches to construct primary and
residual.
I'm OK to unify them but the way we can do only works for splitting at
`process` and `truncate`. I don't think it's extendable to other splitting in
the future if any. Also I feel like it makes the logic harder to understand,
given that it's already complicated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]