gemini-code-assist[bot] commented on code in PR #35713:
URL: https://github.com/apache/beam/pull/35713#discussion_r2236775668


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java:
##########
@@ -211,6 +211,6 @@ public static <RestrictionT> TruncateResult of(RestrictionT 
restriction) {
       return new AutoValue_RestrictionTracker_TruncateResult(restriction);
     }
 
-    public abstract RestrictionT getTruncatedRestriction();
+    public abstract @Nullable RestrictionT getTruncatedRestriction();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `@Nullable` annotation was added to the `getTruncatedRestriction` 
method. This indicates that the truncated restriction can be null, which needs 
to be handled appropriately in the calling code.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -643,6 +736,90 @@
     }
   }
 
+  private void processElementForTruncateRestriction(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    currentRestriction = elem.getValue().getKey().getValue().getKey();
+    currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
+    // For truncation, we don't set currentTrackerClaimed so that we enable 
checkpointing even if no
+    // progress is made.
+    currentTracker =
+        RestrictionTrackers.observe(
+            doFnInvoker.invokeNewTracker(processContext),
+            new ClaimObserver<PositionT>() {
+              @Override
+              public void onClaimed(PositionT position) {}
+
+              @Override
+              public void onClaimFailed(PositionT position) {}
+            });
+    try {
+      TruncateResult<OutputT> truncatedRestriction =
+          doFnInvoker.invokeTruncateRestriction(processContext);
+      if (truncatedRestriction != null) {
+        processContext.output(truncatedRestriction.getTruncatedRestriction());
+      }
+    } finally {
+      currentTracker = null;
+      currentElement = null;
+      currentRestriction = null;
+      currentWatermarkEstimatorState = null;
+    }
+
+    this.stateAccessor.finalizeState();
+  }
+
+  private void processElementForWindowObservingTruncateRestriction(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    windowCurrentIndex = -1;
+    windowStopIndex = currentElement.getWindows().size();
+    currentWindows = ImmutableList.copyOf(currentElement.getWindows());
+    while (true) {
+      synchronized (splitLock) {
+        windowCurrentIndex++;
+        if (windowCurrentIndex >= windowStopIndex) {
+          // Careful to reset the split state under the same synchronized 
block.
+          windowCurrentIndex = -1;
+          windowStopIndex = 0;
+          currentElement = null;
+          currentWindows = null;
+          currentRestriction = null;
+          currentWatermarkEstimatorState = null;
+          currentWindow = null;
+          currentTracker = null;
+          currentWatermarkEstimator = null;
+          initialWatermark = null;
+          break;
+        }
+        currentRestriction = elem.getValue().getKey().getValue().getKey();
+        currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
+        currentWindow = currentWindows.get(windowCurrentIndex);
+        // We leave currentTrackerClaimed unset as we want to split regardless 
of if tryClaim is
+        // called.
+        currentTracker =
+            RestrictionTrackers.observe(
+                doFnInvoker.invokeNewTracker(processContext),
+                new ClaimObserver<PositionT>() {
+                  @Override
+                  public void onClaimed(PositionT position) {}
+
+                  @Override
+                  public void onClaimFailed(PositionT position) {}
+                });
+        currentWatermarkEstimator =
+            
WatermarkEstimators.threadSafe(doFnInvoker.invokeNewWatermarkEstimator(processContext));
+        initialWatermark = 
currentWatermarkEstimator.getWatermarkAndState().getKey();
+      }
+      TruncateResult<OutputT> truncatedRestriction =
+          doFnInvoker.invokeTruncateRestriction(processContext);
+      if (truncatedRestriction != null) {
+        processContext.output(truncatedRestriction.getTruncatedRestriction());
+      }
+    }
+    this.stateAccessor.finalizeState();
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This code block was added to handle window observing truncate restriction. 
Ensure that the `splitLock` is properly synchronized and that all resources are 
released correctly in the `finally` block to prevent memory leaks or resource 
exhaustion.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -643,6 +736,90 @@ private void 
processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
     }
   }
 
+  private void processElementForTruncateRestriction(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    currentRestriction = elem.getValue().getKey().getValue().getKey();
+    currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
+    // For truncation, we don't set currentTrackerClaimed so that we enable 
checkpointing even if no
+    // progress is made.
+    currentTracker =
+        RestrictionTrackers.observe(
+            doFnInvoker.invokeNewTracker(processContext),
+            new ClaimObserver<PositionT>() {
+              @Override
+              public void onClaimed(PositionT position) {}
+
+              @Override
+              public void onClaimFailed(PositionT position) {}
+            });
+    try {
+      TruncateResult<OutputT> truncatedRestriction =
+          doFnInvoker.invokeTruncateRestriction(processContext);
+      if (truncatedRestriction != null) {
+        processContext.output(truncatedRestriction.getTruncatedRestriction());
+      }
+    } finally {
+      currentTracker = null;
+      currentElement = null;
+      currentRestriction = null;
+      currentWatermarkEstimatorState = null;
+    }
+
+    this.stateAccessor.finalizeState();
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This code block was added to handle truncation. Ensure that the 
`currentTrackerClaimed` is correctly handled to avoid checkpointing issues. 
Also, ensure proper error handling and logging are in place.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1842,6 +2101,327 @@
     }
   }
 
+  /** This context outputs KV<KV<Element, KV<Restriction, 
WatemarkEstimatorState>>, Size>. */
+  private class SizedRestrictionWindowObservingProcessBundleContext
+      extends WindowObservingProcessBundleContextBase {
+    private final String errorContextPrefix;
+
+    SizedRestrictionWindowObservingProcessBundleContext(String 
errorContextPrefix) {
+      this.errorContextPrefix = errorContextPrefix;
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the 
input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  currentElement.getTimestamp(),
+                  currentWindow,
+                  currentElement.getPaneInfo()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  currentWindow,
+                  currentElement.getPaneInfo()));
+    }
+
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      checkTimestamp(timestamp);
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  windows,
+                  paneInfo));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    public <T> void outputWindowedValue(
+        TupleTag<T> tag,
+        T output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    public State state(String stateId, boolean alwaysFetched) {
+      throw new UnsupportedOperationException(
+          String.format("State unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public TimerMap timerFamily(String tagId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
+  }
+
+  /** This context outputs KV<KV<Element, KV<Restriction, 
WatermarkEstimatorState>>, Size>. */
+  private class SizedRestrictionNonWindowObservingProcessBundleContext
+      extends NonWindowObservingProcessBundleContextBase {
+    private final String errorContextPrefix;
+
+    SizedRestrictionNonWindowObservingProcessBundleContext(String 
errorContextPrefix) {
+      this.errorContextPrefix = errorContextPrefix;
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the 
input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              currentElement.withValue(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size)));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  currentElement.getWindows(),
+                  currentElement.getPaneInfo()));
+    }
+
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      checkTimestamp(timestamp);
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  windows,
+                  paneInfo));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    public <T> void outputWindowedValue(
+        TupleTag<T> tag,
+        T output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The class `SizedRestrictionNonWindowObservingProcessBundleContext` was 
added. Ensure that all methods are correctly implemented and that all 
exceptions are properly handled.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -892,7 +1152,7 @@
   }
 
   @VisibleForTesting
-  static <WatermarkEstimatorStateT> SplitResultsWithStopIndex 
computeSplitForProcess(
+  static <WatermarkEstimatorStateT> SplitResultsWithStopIndex 
computeSplitForProcessOrTruncate(

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The method `computeSplitForProcessOrTruncate` was made `@VisibleForTesting`. 
Ensure that the logic for computing split is correct and that the method is 
thoroughly tested to handle various edge cases.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1842,6 +2101,327 @@
     }
   }
 
+  /** This context outputs KV<KV<Element, KV<Restriction, 
WatemarkEstimatorState>>, Size>. */
+  private class SizedRestrictionWindowObservingProcessBundleContext
+      extends WindowObservingProcessBundleContextBase {
+    private final String errorContextPrefix;
+
+    SizedRestrictionWindowObservingProcessBundleContext(String 
errorContextPrefix) {
+      this.errorContextPrefix = errorContextPrefix;
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the 
input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  currentElement.getTimestamp(),
+                  currentWindow,
+                  currentElement.getPaneInfo()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  currentWindow,
+                  currentElement.getPaneInfo()));
+    }
+
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      checkTimestamp(timestamp);
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValues.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, 
currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  windows,
+                  paneInfo));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    public <T> void outputWindowedValue(
+        TupleTag<T> tag,
+        T output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use 
the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, 
errorContextPrefix));
+    }
+
+    @Override
+    public State state(String stateId, boolean alwaysFetched) {
+      throw new UnsupportedOperationException(
+          String.format("State unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public TimerMap timerFamily(String tagId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The class `SizedRestrictionWindowObservingProcessBundleContext` was added. 
Ensure that all methods are correctly implemented and that all exceptions are 
properly handled.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java:
##########
@@ -359,7 +359,7 @@ public void output(RestrictionT subrestriction) {
 
   /** This context outputs KV<KV<Element, KV<Restriction, 
WatermarkEstimatorState>>, Size>. */
   private class SizedRestrictionNonWindowObservingArgumentProvider
-      extends SplitRestrictionArgumentProvider {
+      extends SplitRestrictionArgumentProvider implements 
OutputReceiver<RestrictionT> {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The class `SizedRestrictionNonWindowObservingArgumentProvider` now 
implements `OutputReceiver<RestrictionT>`. Ensure that the output method is 
correctly implemented and that all exceptions are properly handled.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -835,6 +1040,61 @@
                 
splitResult.getResidualInUnprocessedWindowsRoot().getPaneInfo()));
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindow == null) {
+        return null;
+      }
+      // We are requesting a checkpoint but have not yet progressed on the 
restriction, skip
+      // request.
+      if (fractionOfRemainder == 0
+          && currentTrackerClaimed != null
+          && !currentTrackerClaimed.get()) {
+        return null;
+      }
+
+      SplitResultsWithStopIndex splitResult =
+          computeSplitForProcessOrTruncate(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              currentWindows,
+              currentWatermarkEstimatorState,
+              fractionOfRemainder,
+              null,
+              splitDelegate,
+              null,
+              windowCurrentIndex,
+              windowStopIndex);
+      if (splitResult == null) {
+        return null;
+      }
+      windowStopIndex = splitResult.getNewWindowStopIndex();
+      windowedSplitResult =
+          calculateRestrictionSize(
+              splitResult.getWindowSplit(),
+              PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN 
+ "/GetSize");
+      downstreamSplitResult = splitResult.getDownstreamSplit();
+    }
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValues.getFullCoder(inputCoder, 
windowCoder);
+    return constructSplitResult(
+        windowedSplitResult,
+        downstreamSplitResult,
+        fullInputCoder,
+        initialWatermark,
+        null,
+        pTransformId,
+        mainInputId,
+        pTransform.getOutputsMap().keySet(),
+        null);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This code block was added to handle 
`trySplitForWindowObservingTruncateRestriction`. Ensure that the logic for 
splitting is correct and that all resources are released correctly in the 
`synchronized` block to prevent race conditions or deadlocks.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -745,13 +922,41 @@
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress && 
currentWindow != null) {
-        return ProgressUtils.scaleProgress(
+        return scaleProgress(
             ((HasProgress) currentTracker).getProgress(), windowCurrentIndex, 
windowStopIndex);
       }
     }
     return null;
   }
 
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindow != null) {
+        return scaleProgress(
+            Progress.from(elementCompleted, 1 - elementCompleted),
+            windowCurrentIndex,
+            windowStopIndex);
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  static Progress scaleProgress(Progress progress, int currentWindowIndex, int 
stopWindowIndex) {
+    checkArgument(
+        currentWindowIndex < stopWindowIndex,
+        "Current window index (%s) must be less than stop window index (%s)",
+        currentWindowIndex,
+        stopWindowIndex);
+
+    double totalWorkPerWindow = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+    double completed = totalWorkPerWindow * currentWindowIndex + 
progress.getWorkCompleted();
+    double remaining =
+        totalWorkPerWindow * (stopWindowIndex - currentWindowIndex - 1)
+            + progress.getWorkRemaining();
+    return Progress.from(completed, remaining);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The method `scaleProgress` was made `static` and `@VisibleForTesting`. 
Ensure that the logic for scaling progress is correct and that the method is 
thoroughly tested to handle various edge cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to