lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477506877
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
return Progress.from(completed, remaining);
}
+ private WindowedSplitResult calculateRestrictionSize(
+ WindowedSplitResult splitResult, String errorContext) {
+ double fullSize =
+ splitResult.getResidualInUnprocessedWindowsRoot() == null
+ && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+ ? 0
+ : doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT,
OutputT>(processContext, errorContext) {
+ @Override
+ public Object restriction() {
+ return currentRestriction;
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+ double primarySize =
+ splitResult.getPrimarySplitRoot() == null
+ ? 0
+ : doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT,
OutputT>(processContext, errorContext) {
+ @Override
+ public Object restriction() {
+ return ((KV<?, KV<?, ?>>)
splitResult.getPrimarySplitRoot().getValue())
+ .getValue()
+ .getKey();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+ double residualSize =
+ splitResult.getResidualSplitRoot() == null
+ ? 0
+ : doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT,
OutputT>(processContext, errorContext) {
+ @Override
+ public Object restriction() {
+ return ((KV<?, KV<?, ?>>)
splitResult.getResidualSplitRoot().getValue())
+ .getValue()
+ .getKey();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+ return WindowedSplitResult.forRoots(
+ splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+ ? null
+ : WindowedValue.of(
+
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+ splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+ splitResult.getPrimarySplitRoot() == null
+ ? null
+ : WindowedValue.of(
+ KV.of(splitResult.getPrimarySplitRoot().getValue(),
primarySize),
+ splitResult.getPrimarySplitRoot().getTimestamp(),
+ splitResult.getPrimarySplitRoot().getWindows(),
+ splitResult.getPrimarySplitRoot().getPane()),
+ splitResult.getResidualSplitRoot() == null
+ ? null
+ : WindowedValue.of(
+ KV.of(splitResult.getResidualSplitRoot().getValue(),
residualSize),
+ splitResult.getResidualSplitRoot().getTimestamp(),
+ splitResult.getResidualSplitRoot().getWindows(),
+ splitResult.getResidualSplitRoot().getPane()),
+ splitResult.getResidualInUnprocessedWindowsRoot() == null
+ ? null
+ : WindowedValue.of(
+
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+ splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+ splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+ }
+
+ @VisibleForTesting
+ static <WatermarkEstimatorStateT>
+ KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer>
trySplitForTruncate(
Review comment:
I think since we have all the tests we can merge it as is and do a
follow-up exploring clean-up options.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]