lukecwik commented on code in PR #23882:
URL: https://github.com/apache/beam/pull/23882#discussion_r1015726051


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -964,51 +970,48 @@ public void onClaimFailed(PositionT position) {}
   private void processElementForWindowObservingTruncateRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
-    try {
-      windowCurrentIndex = -1;
-      windowStopIndex = currentElement.getWindows().size();
-      currentWindows = ImmutableList.copyOf(currentElement.getWindows());
-      while (true) {
-        synchronized (splitLock) {
-          windowCurrentIndex++;
-          if (windowCurrentIndex >= windowStopIndex) {
-            break;
-          }
-          currentRestriction = elem.getValue().getKey().getValue().getKey();
-          currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
-          currentWindow = currentWindows.get(windowCurrentIndex);
-          currentTracker =
-              RestrictionTrackers.observe(
-                  doFnInvoker.invokeNewTracker(processContext),
-                  new ClaimObserver<PositionT>() {
-                    @Override
-                    public void onClaimed(PositionT position) {}
-
-                    @Override
-                    public void onClaimFailed(PositionT position) {}
-                  });
-          currentWatermarkEstimator =
-              WatermarkEstimators.threadSafe(
-                  doFnInvoker.invokeNewWatermarkEstimator(processContext));
-          initialWatermark = 
currentWatermarkEstimator.getWatermarkAndState().getKey();
-        }
-        TruncateResult<OutputT> truncatedRestriction =
-            doFnInvoker.invokeTruncateRestriction(processContext);
-        if (truncatedRestriction != null) {
-          
processContext.output(truncatedRestriction.getTruncatedRestriction());
+    windowCurrentIndex = -1;
+    windowStopIndex = currentElement.getWindows().size();
+    currentWindows = ImmutableList.copyOf(currentElement.getWindows());
+    while (true) {
+      synchronized (splitLock) {
+        windowCurrentIndex++;
+        if (windowCurrentIndex >= windowStopIndex) {
+          // Careful to reset the split state under the same synchronized 
block.
+          windowCurrentIndex = -1;
+          windowStopIndex = 0;
+          currentWindows = null;

Review Comment:
   ```suggestion
             currentElement = null;
             currentWindows = null;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1058,72 +1061,68 @@ public static SplitResultsWithStopIndex of(
   private void processElementForWindowObservingSizedElementAndRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
-    try {
-      windowCurrentIndex = -1;
-      windowStopIndex = currentElement.getWindows().size();
-      currentWindows = ImmutableList.copyOf(currentElement.getWindows());
-      while (true) {
-        synchronized (splitLock) {
-          windowCurrentIndex++;
-          if (windowCurrentIndex >= windowStopIndex) {
-            return;
-          }
-          currentRestriction = elem.getValue().getKey().getValue().getKey();
-          currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
-          currentWindow = currentWindows.get(windowCurrentIndex);
-          currentTracker =
-              RestrictionTrackers.observe(
-                  doFnInvoker.invokeNewTracker(processContext),
-                  new ClaimObserver<PositionT>() {
-                    @Override
-                    public void onClaimed(PositionT position) {}
-
-                    @Override
-                    public void onClaimFailed(PositionT position) {}
-                  });
-          currentWatermarkEstimator =
-              WatermarkEstimators.threadSafe(
-                  doFnInvoker.invokeNewWatermarkEstimator(processContext));
-          initialWatermark = 
currentWatermarkEstimator.getWatermarkAndState().getKey();
-        }
-
-        // It is important to ensure that {@code splitLock} is not held during 
#invokeProcessElement
-        DoFn.ProcessContinuation continuation = 
doFnInvoker.invokeProcessElement(processContext);
-        // Ensure that all the work is done if the user tells us that they 
don't want to
-        // resume processing.
-        if (!continuation.shouldResume()) {
-          currentTracker.checkDone();
-          continue;
+    windowCurrentIndex = -1;
+    windowStopIndex = currentElement.getWindows().size();
+    currentWindows = ImmutableList.copyOf(currentElement.getWindows());
+    while (true) {
+      synchronized (splitLock) {
+        windowCurrentIndex++;
+        if (windowCurrentIndex >= windowStopIndex) {
+          // Careful to reset the split state under the same synchronized 
block.
+          windowCurrentIndex = -1;
+          windowStopIndex = 0;
+          currentWindows = null;

Review Comment:
   ```suggestion
             currentElement = null;
             currentWindows = null;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to