This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 94336fa0187 Make RestrictionTrackers.getProgress unblocking until 
initial progress successfully returned (#36750)
94336fa0187 is described below

commit 94336fa01878b12bc71073b76c386ede7d10b3d7
Author: Yi Hu <[email protected]>
AuthorDate: Mon Nov 24 15:21:21 2025 -0500

    Make RestrictionTrackers.getProgress unblocking until initial progress 
successfully returned (#36750)
    
    * Make RestrictionTrackers.getProgress unblocking
    
    * comments
    
    * address comments - add 1 min blocking time
    
    * Add log
    
    * only change behavior when initial progress never evaluated
    
    * simplify tests
    
    * changed to waitUntilBlocking
---
 .../sdk/fn/splittabledofn/RestrictionTrackers.java | 94 ++++++++++++++++++----
 .../fn/splittabledofn/RestrictionTrackersTest.java | 90 ++++++++++++++++++++-
 2 files changed, 165 insertions(+), 19 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
index 8879392d42a..6fefc6b184a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.fn.splittabledofn;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 
 /** Support utilities for interacting with {@link RestrictionTracker 
RestrictionTrackers}. */
 @SuppressWarnings({
@@ -45,6 +48,8 @@ public class RestrictionTrackers {
   private static class RestrictionTrackerObserver<RestrictionT, PositionT>
       extends RestrictionTracker<RestrictionT, PositionT> {
     protected final RestrictionTracker<RestrictionT, PositionT> delegate;
+    protected ReentrantLock lock = new ReentrantLock();
+    protected volatile boolean hasInitialProgress = false;
     private final ClaimObserver<PositionT> claimObserver;
 
     protected RestrictionTrackerObserver(
@@ -55,35 +60,66 @@ public class RestrictionTrackers {
     }
 
     @Override
-    public synchronized boolean tryClaim(PositionT position) {
-      if (delegate.tryClaim(position)) {
-        claimObserver.onClaimed(position);
-        return true;
-      } else {
-        claimObserver.onClaimFailed(position);
-        return false;
+    public boolean tryClaim(PositionT position) {
+      lock.lock();
+      try {
+        if (delegate.tryClaim(position)) {
+          claimObserver.onClaimed(position);
+          return true;
+        } else {
+          claimObserver.onClaimFailed(position);
+          return false;
+        }
+      } finally {
+        lock.unlock();
       }
     }
 
     @Override
-    public synchronized RestrictionT currentRestriction() {
-      return delegate.currentRestriction();
+    public RestrictionT currentRestriction() {
+      lock.lock();
+      try {
+        return delegate.currentRestriction();
+      } finally {
+        lock.unlock();
+      }
     }
 
     @Override
-    public synchronized SplitResult<RestrictionT> trySplit(double 
fractionOfRemainder) {
-      return delegate.trySplit(fractionOfRemainder);
+    public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
+      lock.lock();
+      try {
+        SplitResult<RestrictionT> result = 
delegate.trySplit(fractionOfRemainder);
+        return result;
+      } finally {
+        lock.unlock();
+      }
     }
 
     @Override
-    public synchronized void checkDone() throws IllegalStateException {
-      delegate.checkDone();
+    public void checkDone() throws IllegalStateException {
+      lock.lock();
+      try {
+        delegate.checkDone();
+      } finally {
+        lock.unlock();
+      }
     }
 
     @Override
     public IsBounded isBounded() {
       return delegate.isBounded();
     }
+
+    /** Evaluate progress if requested. */
+    protected Progress getProgressBlocking() {
+      lock.lock();
+      try {
+        return ((HasProgress) delegate).getProgress();
+      } finally {
+        lock.unlock();
+      }
+    }
   }
 
   /**
@@ -91,8 +127,9 @@ public class RestrictionTrackers {
    * RestrictionTracker}.
    */
   @ThreadSafe
-  private static class RestrictionTrackerObserverWithProgress<RestrictionT, 
PositionT>
+  static class RestrictionTrackerObserverWithProgress<RestrictionT, PositionT>
       extends RestrictionTrackerObserver<RestrictionT, PositionT> implements 
HasProgress {
+    private static final int FIRST_PROGRESS_TIMEOUT_SEC = 60;
 
     protected RestrictionTrackerObserverWithProgress(
         RestrictionTracker<RestrictionT, PositionT> delegate,
@@ -101,8 +138,33 @@ public class RestrictionTrackers {
     }
 
     @Override
-    public synchronized Progress getProgress() {
-      return ((HasProgress) delegate).getProgress();
+    public Progress getProgress() {
+      return getProgress(FIRST_PROGRESS_TIMEOUT_SEC);
+    }
+
+    @VisibleForTesting
+    Progress getProgress(int timeOutSec) {
+      if (!hasInitialProgress) {
+        Progress progress = Progress.NONE;
+        try {
+          // lock can be held long by long-running tryClaim/trySplit. We 
tolerate this scenario
+          // by returning zero progress when initial progress never evaluated 
before due to lock
+          // timeout.
+          if (lock.tryLock(timeOutSec, TimeUnit.SECONDS)) {
+            try {
+              progress = getProgressBlocking();
+              hasInitialProgress = true;
+            } finally {
+              lock.unlock();
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        return progress;
+      } else {
+        return getProgressBlocking();
+      }
     }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
index 41d8ca88b95..8f7ee9eb25d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
@@ -24,11 +24,14 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -38,6 +41,8 @@ import org.junit.runners.JUnit4;
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class RestrictionTrackersTest {
+  @Rule public Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+
   @Test
   public void testObservingClaims() {
     RestrictionTracker<String, String> observedTracker =
@@ -95,14 +100,37 @@ public class RestrictionTrackersTest {
 
   private static class RestrictionTrackerWithProgress extends 
RestrictionTracker<Object, Object>
       implements HasProgress {
+    private boolean blockTryClaim;
+    private boolean blockTrySplit;
+    private boolean isBlocked;
+    public static final Progress REPORT_PROGRESS = Progress.from(2.0, 3.0);
+
+    public RestrictionTrackerWithProgress() {
+      this(false, false);
+    }
+
+    public RestrictionTrackerWithProgress(boolean blockTryClaim, boolean 
blockTrySplit) {
+      this.blockTryClaim = blockTryClaim;
+      this.blockTrySplit = blockTrySplit;
+      this.isBlocked = false;
+    }
 
     @Override
     public Progress getProgress() {
-      return RestrictionTracker.Progress.from(2.0, 3.0);
+      return REPORT_PROGRESS;
     }
 
     @Override
-    public boolean tryClaim(Object position) {
+    public synchronized boolean tryClaim(Object position) {
+      while (blockTryClaim) {
+        isBlocked = true;
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      isBlocked = false;
       return false;
     }
 
@@ -112,7 +140,16 @@ public class RestrictionTrackersTest {
     }
 
     @Override
-    public SplitResult<Object> trySplit(double fractionOfRemainder) {
+    public synchronized SplitResult<Object> trySplit(double 
fractionOfRemainder) {
+      while (blockTrySplit) {
+        isBlocked = true;
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      isBlocked = false;
       return null;
     }
 
@@ -123,6 +160,19 @@ public class RestrictionTrackersTest {
     public IsBounded isBounded() {
       return IsBounded.BOUNDED;
     }
+
+    public synchronized void releaseLock() {
+      blockTrySplit = false;
+      blockTryClaim = false;
+      notifyAll();
+    }
+
+    /** Wait until RestrictionTracker becomes blocking or unblocking. */
+    public void waitUntilBlocking(boolean blocking) throws 
InterruptedException {
+      while (isBlocked != blocking) {
+        Thread.sleep(1);
+      }
+    }
   }
 
   @Test
@@ -131,4 +181,38 @@ public class RestrictionTrackersTest {
         RestrictionTrackers.observe(new RestrictionTrackerWithProgress(), 
null);
     assertThat(hasSize, instanceOf(HasProgress.class));
   }
+
+  @Test
+  public void testClaimObserversProgressNonBlockingOnTryClaim() throws 
InterruptedException {
+    RestrictionTrackerWithProgress withProgress = new 
RestrictionTrackerWithProgress(true, false);
+    RestrictionTracker<Object, Object> tracker =
+        RestrictionTrackers.observe(withProgress, new 
RestrictionTrackers.NoopClaimObserver<>());
+    Thread blocking = new Thread(() -> tracker.tryClaim(new Object()));
+    blocking.start();
+    withProgress.waitUntilBlocking(true);
+    RestrictionTracker.Progress progress =
+        ((RestrictionTrackers.RestrictionTrackerObserverWithProgress) 
tracker).getProgress(1);
+    assertEquals(RestrictionTracker.Progress.NONE, progress);
+    withProgress.releaseLock();
+    withProgress.waitUntilBlocking(false);
+    progress = ((HasProgress) tracker).getProgress();
+    assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
+  }
+
+  @Test
+  public void testClaimObserversProgressNonBlockingOnTrySplit() throws 
InterruptedException {
+    RestrictionTrackerWithProgress withProgress = new 
RestrictionTrackerWithProgress(false, true);
+    RestrictionTracker<Object, Object> tracker =
+        RestrictionTrackers.observe(withProgress, new 
RestrictionTrackers.NoopClaimObserver<>());
+    Thread blocking = new Thread(() -> tracker.trySplit(0.5));
+    blocking.start();
+    withProgress.waitUntilBlocking(true);
+    RestrictionTracker.Progress progress =
+        ((RestrictionTrackers.RestrictionTrackerObserverWithProgress) 
tracker).getProgress(1);
+    assertEquals(RestrictionTracker.Progress.NONE, progress);
+    withProgress.releaseLock();
+    withProgress.waitUntilBlocking(false);
+    progress = ((HasProgress) tracker).getProgress();
+    assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
+  }
 }

Reply via email to