This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 94336fa0187 Make RestrictionTrackers.getProgress unblocking until
initial progress successfully returned (#36750)
94336fa0187 is described below
commit 94336fa01878b12bc71073b76c386ede7d10b3d7
Author: Yi Hu <[email protected]>
AuthorDate: Mon Nov 24 15:21:21 2025 -0500
Make RestrictionTrackers.getProgress unblocking until initial progress
successfully returned (#36750)
* Make RestrictionTrackers.getProgress unblocking
* comments
* address comments - add 1 min blocking time
* Add log
* only change behavior when initial progress never evaluated
* simplify tests
* changed to waitUntilBlocking
---
.../sdk/fn/splittabledofn/RestrictionTrackers.java | 94 ++++++++++++++++++----
.../fn/splittabledofn/RestrictionTrackersTest.java | 90 ++++++++++++++++++++-
2 files changed, 165 insertions(+), 19 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
index 8879392d42a..6fefc6b184a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.fn.splittabledofn;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
/** Support utilities for interacting with {@link RestrictionTracker
RestrictionTrackers}. */
@SuppressWarnings({
@@ -45,6 +48,8 @@ public class RestrictionTrackers {
private static class RestrictionTrackerObserver<RestrictionT, PositionT>
extends RestrictionTracker<RestrictionT, PositionT> {
protected final RestrictionTracker<RestrictionT, PositionT> delegate;
+ protected ReentrantLock lock = new ReentrantLock();
+ protected volatile boolean hasInitialProgress = false;
private final ClaimObserver<PositionT> claimObserver;
protected RestrictionTrackerObserver(
@@ -55,35 +60,66 @@ public class RestrictionTrackers {
}
@Override
- public synchronized boolean tryClaim(PositionT position) {
- if (delegate.tryClaim(position)) {
- claimObserver.onClaimed(position);
- return true;
- } else {
- claimObserver.onClaimFailed(position);
- return false;
+ public boolean tryClaim(PositionT position) {
+ lock.lock();
+ try {
+ if (delegate.tryClaim(position)) {
+ claimObserver.onClaimed(position);
+ return true;
+ } else {
+ claimObserver.onClaimFailed(position);
+ return false;
+ }
+ } finally {
+ lock.unlock();
}
}
@Override
- public synchronized RestrictionT currentRestriction() {
- return delegate.currentRestriction();
+ public RestrictionT currentRestriction() {
+ lock.lock();
+ try {
+ return delegate.currentRestriction();
+ } finally {
+ lock.unlock();
+ }
}
@Override
- public synchronized SplitResult<RestrictionT> trySplit(double
fractionOfRemainder) {
- return delegate.trySplit(fractionOfRemainder);
+ public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
+ lock.lock();
+ try {
+ SplitResult<RestrictionT> result =
delegate.trySplit(fractionOfRemainder);
+ return result;
+ } finally {
+ lock.unlock();
+ }
}
@Override
- public synchronized void checkDone() throws IllegalStateException {
- delegate.checkDone();
+ public void checkDone() throws IllegalStateException {
+ lock.lock();
+ try {
+ delegate.checkDone();
+ } finally {
+ lock.unlock();
+ }
}
@Override
public IsBounded isBounded() {
return delegate.isBounded();
}
+
+ /** Evaluate progress if requested. */
+ protected Progress getProgressBlocking() {
+ lock.lock();
+ try {
+ return ((HasProgress) delegate).getProgress();
+ } finally {
+ lock.unlock();
+ }
+ }
}
/**
@@ -91,8 +127,9 @@ public class RestrictionTrackers {
* RestrictionTracker}.
*/
@ThreadSafe
- private static class RestrictionTrackerObserverWithProgress<RestrictionT,
PositionT>
+ static class RestrictionTrackerObserverWithProgress<RestrictionT, PositionT>
extends RestrictionTrackerObserver<RestrictionT, PositionT> implements
HasProgress {
+ private static final int FIRST_PROGRESS_TIMEOUT_SEC = 60;
protected RestrictionTrackerObserverWithProgress(
RestrictionTracker<RestrictionT, PositionT> delegate,
@@ -101,8 +138,33 @@ public class RestrictionTrackers {
}
@Override
- public synchronized Progress getProgress() {
- return ((HasProgress) delegate).getProgress();
+ public Progress getProgress() {
+ return getProgress(FIRST_PROGRESS_TIMEOUT_SEC);
+ }
+
+ @VisibleForTesting
+ Progress getProgress(int timeOutSec) {
+ if (!hasInitialProgress) {
+ Progress progress = Progress.NONE;
+ try {
+ // lock can be held long by long-running tryClaim/trySplit. We
tolerate this scenario
+ // by returning zero progress when initial progress never evaluated
before due to lock
+ // timeout.
+ if (lock.tryLock(timeOutSec, TimeUnit.SECONDS)) {
+ try {
+ progress = getProgressBlocking();
+ hasInitialProgress = true;
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return progress;
+ } else {
+ return getProgressBlocking();
+ }
}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
index 41d8ca88b95..8f7ee9eb25d 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
@@ -24,11 +24,14 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -38,6 +41,8 @@ import org.junit.runners.JUnit4;
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
public class RestrictionTrackersTest {
+ @Rule public Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+
@Test
public void testObservingClaims() {
RestrictionTracker<String, String> observedTracker =
@@ -95,14 +100,37 @@ public class RestrictionTrackersTest {
private static class RestrictionTrackerWithProgress extends
RestrictionTracker<Object, Object>
implements HasProgress {
+ private boolean blockTryClaim;
+ private boolean blockTrySplit;
+ private boolean isBlocked;
+ public static final Progress REPORT_PROGRESS = Progress.from(2.0, 3.0);
+
+ public RestrictionTrackerWithProgress() {
+ this(false, false);
+ }
+
+ public RestrictionTrackerWithProgress(boolean blockTryClaim, boolean
blockTrySplit) {
+ this.blockTryClaim = blockTryClaim;
+ this.blockTrySplit = blockTrySplit;
+ this.isBlocked = false;
+ }
@Override
public Progress getProgress() {
- return RestrictionTracker.Progress.from(2.0, 3.0);
+ return REPORT_PROGRESS;
}
@Override
- public boolean tryClaim(Object position) {
+ public synchronized boolean tryClaim(Object position) {
+ while (blockTryClaim) {
+ isBlocked = true;
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ isBlocked = false;
return false;
}
@@ -112,7 +140,16 @@ public class RestrictionTrackersTest {
}
@Override
- public SplitResult<Object> trySplit(double fractionOfRemainder) {
+ public synchronized SplitResult<Object> trySplit(double
fractionOfRemainder) {
+ while (blockTrySplit) {
+ isBlocked = true;
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ isBlocked = false;
return null;
}
@@ -123,6 +160,19 @@ public class RestrictionTrackersTest {
public IsBounded isBounded() {
return IsBounded.BOUNDED;
}
+
+ public synchronized void releaseLock() {
+ blockTrySplit = false;
+ blockTryClaim = false;
+ notifyAll();
+ }
+
+ /** Wait until RestrictionTracker becomes blocking or unblocking. */
+ public void waitUntilBlocking(boolean blocking) throws
InterruptedException {
+ while (isBlocked != blocking) {
+ Thread.sleep(1);
+ }
+ }
}
@Test
@@ -131,4 +181,38 @@ public class RestrictionTrackersTest {
RestrictionTrackers.observe(new RestrictionTrackerWithProgress(),
null);
assertThat(hasSize, instanceOf(HasProgress.class));
}
+
+ @Test
+ public void testClaimObserversProgressNonBlockingOnTryClaim() throws
InterruptedException {
+ RestrictionTrackerWithProgress withProgress = new
RestrictionTrackerWithProgress(true, false);
+ RestrictionTracker<Object, Object> tracker =
+ RestrictionTrackers.observe(withProgress, new
RestrictionTrackers.NoopClaimObserver<>());
+ Thread blocking = new Thread(() -> tracker.tryClaim(new Object()));
+ blocking.start();
+ withProgress.waitUntilBlocking(true);
+ RestrictionTracker.Progress progress =
+ ((RestrictionTrackers.RestrictionTrackerObserverWithProgress)
tracker).getProgress(1);
+ assertEquals(RestrictionTracker.Progress.NONE, progress);
+ withProgress.releaseLock();
+ withProgress.waitUntilBlocking(false);
+ progress = ((HasProgress) tracker).getProgress();
+ assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
+ }
+
+ @Test
+ public void testClaimObserversProgressNonBlockingOnTrySplit() throws
InterruptedException {
+ RestrictionTrackerWithProgress withProgress = new
RestrictionTrackerWithProgress(false, true);
+ RestrictionTracker<Object, Object> tracker =
+ RestrictionTrackers.observe(withProgress, new
RestrictionTrackers.NoopClaimObserver<>());
+ Thread blocking = new Thread(() -> tracker.trySplit(0.5));
+ blocking.start();
+ withProgress.waitUntilBlocking(true);
+ RestrictionTracker.Progress progress =
+ ((RestrictionTrackers.RestrictionTrackerObserverWithProgress)
tracker).getProgress(1);
+ assertEquals(RestrictionTracker.Progress.NONE, progress);
+ withProgress.releaseLock();
+ withProgress.waitUntilBlocking(false);
+ progress = ((HasProgress) tracker).getProgress();
+ assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
+ }
}