gemini-code-assist[bot] commented on code in PR #35713: URL: https://github.com/apache/beam/pull/35713#discussion_r2236775668
########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java: ########## @@ -211,6 +211,6 @@ public static <RestrictionT> TruncateResult of(RestrictionT restriction) { return new AutoValue_RestrictionTracker_TruncateResult(restriction); } - public abstract RestrictionT getTruncatedRestriction(); + public abstract @Nullable RestrictionT getTruncatedRestriction(); Review Comment:  The `@Nullable` annotation was added to the `getTruncatedRestriction` method. This indicates that the truncated restriction can be null, which needs to be handled appropriately in the calling code. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -643,6 +736,90 @@ } } + private void processElementForTruncateRestriction( + WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) { + currentElement = elem.withValue(elem.getValue().getKey().getKey()); + currentRestriction = elem.getValue().getKey().getValue().getKey(); + currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue(); + // For truncation, we don't set currentTrackerClaimed so that we enable checkpointing even if no + // progress is made. + currentTracker = + RestrictionTrackers.observe( + doFnInvoker.invokeNewTracker(processContext), + new ClaimObserver<PositionT>() { + @Override + public void onClaimed(PositionT position) {} + + @Override + public void onClaimFailed(PositionT position) {} + }); + try { + TruncateResult<OutputT> truncatedRestriction = + doFnInvoker.invokeTruncateRestriction(processContext); + if (truncatedRestriction != null) { + processContext.output(truncatedRestriction.getTruncatedRestriction()); + } + } finally { + currentTracker = null; + currentElement = null; + currentRestriction = null; + currentWatermarkEstimatorState = null; + } + + this.stateAccessor.finalizeState(); + } + + private void processElementForWindowObservingTruncateRestriction( + WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) { + currentElement = elem.withValue(elem.getValue().getKey().getKey()); + windowCurrentIndex = -1; + windowStopIndex = currentElement.getWindows().size(); + currentWindows = ImmutableList.copyOf(currentElement.getWindows()); + while (true) { + synchronized (splitLock) { + windowCurrentIndex++; + if (windowCurrentIndex >= windowStopIndex) { + // Careful to reset the split state under the same synchronized block. + windowCurrentIndex = -1; + windowStopIndex = 0; + currentElement = null; + currentWindows = null; + currentRestriction = null; + currentWatermarkEstimatorState = null; + currentWindow = null; + currentTracker = null; + currentWatermarkEstimator = null; + initialWatermark = null; + break; + } + currentRestriction = elem.getValue().getKey().getValue().getKey(); + currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue(); + currentWindow = currentWindows.get(windowCurrentIndex); + // We leave currentTrackerClaimed unset as we want to split regardless of if tryClaim is + // called. + currentTracker = + RestrictionTrackers.observe( + doFnInvoker.invokeNewTracker(processContext), + new ClaimObserver<PositionT>() { + @Override + public void onClaimed(PositionT position) {} + + @Override + public void onClaimFailed(PositionT position) {} + }); + currentWatermarkEstimator = + WatermarkEstimators.threadSafe(doFnInvoker.invokeNewWatermarkEstimator(processContext)); + initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey(); + } + TruncateResult<OutputT> truncatedRestriction = + doFnInvoker.invokeTruncateRestriction(processContext); + if (truncatedRestriction != null) { + processContext.output(truncatedRestriction.getTruncatedRestriction()); + } + } + this.stateAccessor.finalizeState(); + } Review Comment:  This code block was added to handle window observing truncate restriction. Ensure that the `splitLock` is properly synchronized and that all resources are released correctly in the `finally` block to prevent memory leaks or resource exhaustion. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -643,6 +736,90 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) { } } + private void processElementForTruncateRestriction( + WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) { + currentElement = elem.withValue(elem.getValue().getKey().getKey()); + currentRestriction = elem.getValue().getKey().getValue().getKey(); + currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue(); + // For truncation, we don't set currentTrackerClaimed so that we enable checkpointing even if no + // progress is made. + currentTracker = + RestrictionTrackers.observe( + doFnInvoker.invokeNewTracker(processContext), + new ClaimObserver<PositionT>() { + @Override + public void onClaimed(PositionT position) {} + + @Override + public void onClaimFailed(PositionT position) {} + }); + try { + TruncateResult<OutputT> truncatedRestriction = + doFnInvoker.invokeTruncateRestriction(processContext); + if (truncatedRestriction != null) { + processContext.output(truncatedRestriction.getTruncatedRestriction()); + } + } finally { + currentTracker = null; + currentElement = null; + currentRestriction = null; + currentWatermarkEstimatorState = null; + } + + this.stateAccessor.finalizeState(); + } Review Comment:  This code block was added to handle truncation. Ensure that the `currentTrackerClaimed` is correctly handled to avoid checkpointing issues. Also, ensure proper error handling and logging are in place. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -1842,6 +2101,327 @@ } } + /** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size>. */ + private class SizedRestrictionWindowObservingProcessBundleContext + extends WindowObservingProcessBundleContextBase { + private final String errorContextPrefix; + + SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) { + this.errorContextPrefix = errorContextPrefix; + } + + @Override + // OutputT == RestrictionT + public void output(OutputT output) { + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return currentElement.getTimestamp(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + // Don't need to check timestamp since we can always output using the input timestamp. + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + currentElement.getTimestamp(), + currentWindow, + currentElement.getPaneInfo())); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + // OutputT == RestrictionT + public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + currentWindow, + currentElement.getPaneInfo())); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + public <T> void outputWindowedValue( + TupleTag<T> tag, + T output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + public State state(String stateId, boolean alwaysFetched) { + throw new UnsupportedOperationException( + String.format("State unsupported in %s", errorContextPrefix)); + } + + @Override + public org.apache.beam.sdk.state.Timer timer(String timerId) { + throw new UnsupportedOperationException( + String.format("Timer unsupported in %s", errorContextPrefix)); + } + + @Override + public TimerMap timerFamily(String tagId) { + throw new UnsupportedOperationException( + String.format("Timer unsupported in %s", errorContextPrefix)); + } + } + + /** This context outputs KV<KV<Element, KV<Restriction, WatermarkEstimatorState>>, Size>. */ + private class SizedRestrictionNonWindowObservingProcessBundleContext + extends NonWindowObservingProcessBundleContextBase { + private final String errorContextPrefix; + + SizedRestrictionNonWindowObservingProcessBundleContext(String errorContextPrefix) { + this.errorContextPrefix = errorContextPrefix; + } + + @Override + // OutputT == RestrictionT + public void output(OutputT output) { + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return currentElement.getTimestamp(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + // Don't need to check timestamp since we can always output using the input timestamp. + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + currentElement.withValue( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size))); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + // OutputT == RestrictionT + public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + currentElement.getWindows(), + currentElement.getPaneInfo())); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + public <T> void outputWindowedValue( + TupleTag<T> tag, + T output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + } Review Comment:  The class `SizedRestrictionNonWindowObservingProcessBundleContext` was added. Ensure that all methods are correctly implemented and that all exceptions are properly handled. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -892,7 +1152,7 @@ } @VisibleForTesting - static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcess( + static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate( Review Comment:  The method `computeSplitForProcessOrTruncate` was made `@VisibleForTesting`. Ensure that the logic for computing split is correct and that the method is thoroughly tested to handle various edge cases. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -1842,6 +2101,327 @@ } } + /** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size>. */ + private class SizedRestrictionWindowObservingProcessBundleContext + extends WindowObservingProcessBundleContextBase { + private final String errorContextPrefix; + + SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) { + this.errorContextPrefix = errorContextPrefix; + } + + @Override + // OutputT == RestrictionT + public void output(OutputT output) { + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return currentElement.getTimestamp(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + // Don't need to check timestamp since we can always output using the input timestamp. + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + currentElement.getTimestamp(), + currentWindow, + currentElement.getPaneInfo())); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + // OutputT == RestrictionT + public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + currentWindow, + currentElement.getPaneInfo())); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue<OutputT>) + WindowedValues.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + public <T> void outputWindowedValue( + TupleTag<T> tag, + T output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + + @Override + public State state(String stateId, boolean alwaysFetched) { + throw new UnsupportedOperationException( + String.format("State unsupported in %s", errorContextPrefix)); + } + + @Override + public org.apache.beam.sdk.state.Timer timer(String timerId) { + throw new UnsupportedOperationException( + String.format("Timer unsupported in %s", errorContextPrefix)); + } + + @Override + public TimerMap timerFamily(String tagId) { + throw new UnsupportedOperationException( + String.format("Timer unsupported in %s", errorContextPrefix)); + } + } Review Comment:  The class `SizedRestrictionWindowObservingProcessBundleContext` was added. Ensure that all methods are correctly implemented and that all exceptions are properly handled. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java: ########## @@ -359,7 +359,7 @@ public void output(RestrictionT subrestriction) { /** This context outputs KV<KV<Element, KV<Restriction, WatermarkEstimatorState>>, Size>. */ private class SizedRestrictionNonWindowObservingArgumentProvider - extends SplitRestrictionArgumentProvider { + extends SplitRestrictionArgumentProvider implements OutputReceiver<RestrictionT> { Review Comment:  The class `SizedRestrictionNonWindowObservingArgumentProvider` now implements `OutputReceiver<RestrictionT>`. Ensure that the output method is correctly implemented and that all exceptions are properly handled. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -835,6 +1040,61 @@ splitResult.getResidualInUnprocessedWindowsRoot().getPaneInfo())); } + private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction( + double fractionOfRemainder, HandlesSplits splitDelegate) { + WindowedSplitResult windowedSplitResult = null; + HandlesSplits.SplitResult downstreamSplitResult = null; + synchronized (splitLock) { + // There is nothing to split if we are between truncate processing calls. + if (currentWindow == null) { + return null; + } + // We are requesting a checkpoint but have not yet progressed on the restriction, skip + // request. + if (fractionOfRemainder == 0 + && currentTrackerClaimed != null + && !currentTrackerClaimed.get()) { + return null; + } + + SplitResultsWithStopIndex splitResult = + computeSplitForProcessOrTruncate( + currentElement, + currentRestriction, + currentWindow, + currentWindows, + currentWatermarkEstimatorState, + fractionOfRemainder, + null, + splitDelegate, + null, + windowCurrentIndex, + windowStopIndex); + if (splitResult == null) { + return null; + } + windowStopIndex = splitResult.getNewWindowStopIndex(); + windowedSplitResult = + calculateRestrictionSize( + splitResult.getWindowSplit(), + PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize"); + downstreamSplitResult = splitResult.getDownstreamSplit(); + } + // Note that the assumption here is the fullInputCoder of the Truncate transform should be the + // the same as the SDF/Process transform. + Coder fullInputCoder = WindowedValues.getFullCoder(inputCoder, windowCoder); + return constructSplitResult( + windowedSplitResult, + downstreamSplitResult, + fullInputCoder, + initialWatermark, + null, + pTransformId, + mainInputId, + pTransform.getOutputsMap().keySet(), + null); + } Review Comment:  This code block was added to handle `trySplitForWindowObservingTruncateRestriction`. Ensure that the logic for splitting is correct and that all resources are released correctly in the `synchronized` block to prevent race conditions or deadlocks. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java: ########## @@ -745,13 +922,41 @@ private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress && currentWindow != null) { - return ProgressUtils.scaleProgress( + return scaleProgress( ((HasProgress) currentTracker).getProgress(), windowCurrentIndex, windowStopIndex); } } return null; } + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { + synchronized (splitLock) { + if (currentWindow != null) { + return scaleProgress( + Progress.from(elementCompleted, 1 - elementCompleted), + windowCurrentIndex, + windowStopIndex); + } + } + return null; + } + + @VisibleForTesting + static Progress scaleProgress(Progress progress, int currentWindowIndex, int stopWindowIndex) { + checkArgument( + currentWindowIndex < stopWindowIndex, + "Current window index (%s) must be less than stop window index (%s)", + currentWindowIndex, + stopWindowIndex); + + double totalWorkPerWindow = progress.getWorkCompleted() + progress.getWorkRemaining(); + double completed = totalWorkPerWindow * currentWindowIndex + progress.getWorkCompleted(); + double remaining = + totalWorkPerWindow * (stopWindowIndex - currentWindowIndex - 1) + + progress.getWorkRemaining(); + return Progress.from(completed, remaining); Review Comment:  The method `scaleProgress` was made `static` and `@VisibleForTesting`. Ensure that the logic for scaling progress is correct and that the method is thoroughly tested to handle various edge cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org