scwhittle commented on code in PR #36750:
URL: https://github.com/apache/beam/pull/36750#discussion_r2555375192


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java:
##########
@@ -179,46 +181,37 @@ public void testClaimObserversMaintainBacklogInterfaces() 
{
     assertThat(hasSize, instanceOf(HasProgress.class));
   }
 
-  private void testBlocking(String testCase) throws InterruptedException {
-    RestrictionTrackerWithProgress withProgress;
-    if ("tryClaim".equals(testCase)) {
-      withProgress = new RestrictionTrackerWithProgress(true, false);
-    } else if ("trySplit".equals(testCase)) {
-      withProgress = new RestrictionTrackerWithProgress(false, true);
-    } else {
-      throw new IllegalArgumentException("unknown test case " + testCase);
-    }
+  @Test
+  public void testClaimObserversProgressNonBlockingOnTryClaim() throws 
InterruptedException {
+    RestrictionTrackerWithProgress withProgress = new 
RestrictionTrackerWithProgress(true, false);
     RestrictionTracker<Object, Object> tracker =
         RestrictionTrackers.observe(withProgress, new 
RestrictionTrackers.NoopClaimObserver<>());
-    Runnable runnable;
-    if ("tryClaim".equals(testCase)) {
-      runnable = () -> tracker.tryClaim(new Object());
-    } else {
-      runnable = () -> tracker.trySplit(0.5);
-    }
-    Thread blocking = new Thread(runnable);
+    Thread blocking = new Thread(() -> tracker.tryClaim(new Object()));
     blocking.start();
-    while (!withProgress.isBlocked()) {
-      Thread.sleep(1);
-    }
+    withProgress.waitUntil(true);

Review Comment:
   nit: it's hard to understand from just reading here what waitUntil(true) 
means. how about either waitUntilBlocking(true) or separate 
waitUntilBlocking()/waitUntilUnblocked() methods
   
   



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java:
##########
@@ -101,8 +138,33 @@ protected RestrictionTrackerObserverWithProgress(
     }
 
     @Override
-    public synchronized Progress getProgress() {
-      return ((HasProgress) delegate).getProgress();
+    public Progress getProgress() {
+      return getProgress(FIRST_PROGRESS_TIMEOUT_SEC);
+    }
+
+    @VisibleForTesting
+    Progress getProgress(int timeOutSec) {
+      if (!hasInitialProgress) {
+        Progress progress = Progress.NONE;
+        try {
+          // lock can be held long by long-running tryClaim/trySplit. We 
tolerate this scenario

Review Comment:
   I was wondering if ReentrantLock was less performant than synchronized 
keyword but that doesn't seem to be a concern from searching some.
   
   Keeping as is seems good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to