gemini-code-assist[bot] commented on code in PR #36750:
URL: https://github.com/apache/beam/pull/36750#discussion_r2500969843


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java:
##########
@@ -123,6 +160,16 @@ public void checkDone() throws IllegalStateException {}
     public IsBounded isBounded() {
       return IsBounded.BOUNDED;
     }
+
+    public synchronized void releaseLock() {
+      blockTrySplit = false;
+      blockTryClaim = false;
+      notifyAll();
+    }
+
+    public boolean isBlocked() {
+      return isBlocked;
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `isBlocked` field is accessed by multiple threads. It is written in 
`synchronized` methods (`tryClaim`, `trySplit`), but read here in a 
non-`synchronized` method. This can lead to a data race where the reading 
thread does not see the most recent value of `isBlocked`, potentially causing 
test flakiness or hangs.
   
   To ensure visibility of changes to `isBlocked` across threads, you should 
make this `isBlocked()` method `synchronized`. This establishes a 
happens-before relationship and guarantees that the reading thread sees the 
latest value. An alternative would be to declare the `isBlocked` field as 
`volatile`.
   
   ```suggestion
       public synchronized boolean isBlocked() {
         return isBlocked;
       }
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java:
##########
@@ -55,35 +59,71 @@ protected RestrictionTrackerObserver(
     }
 
     @Override
-    public synchronized boolean tryClaim(PositionT position) {
-      if (delegate.tryClaim(position)) {
-        claimObserver.onClaimed(position);
-        return true;
-      } else {
-        claimObserver.onClaimFailed(position);
-        return false;
+    public boolean tryClaim(PositionT position) {
+      lock.lock();
+      try {
+        if (delegate.tryClaim(position)) {
+          claimObserver.onClaimed(position);
+          evalueteProgress();
+          return true;
+        } else {
+          claimObserver.onClaimFailed(position);
+          return false;
+        }
+      } finally {
+        lock.unlock();
       }
     }
 
     @Override
-    public synchronized RestrictionT currentRestriction() {
-      return delegate.currentRestriction();
+    public RestrictionT currentRestriction() {
+      lock.lock();
+      try {
+        return delegate.currentRestriction();
+      } finally {
+        lock.unlock();
+      }
     }
 
     @Override
-    public synchronized SplitResult<RestrictionT> trySplit(double 
fractionOfRemainder) {
-      return delegate.trySplit(fractionOfRemainder);
+    public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
+      lock.lock();
+      try {
+        SplitResult<RestrictionT> result = 
delegate.trySplit(fractionOfRemainder);
+        evalueteProgress();
+        return result;
+      } finally {
+        lock.unlock();
+      }
     }
 
     @Override
-    public synchronized void checkDone() throws IllegalStateException {
-      delegate.checkDone();
+    public void checkDone() throws IllegalStateException {
+      lock.lock();
+      try {
+        delegate.checkDone();
+      } finally {
+        lock.unlock();
+      }
     }
 
     @Override
     public IsBounded isBounded() {
       return delegate.isBounded();
     }
+
+    /** Evaluate progress if requested. */
+    protected void evalueteProgress() {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There's a typo in the method name: `evalueteProgress` should be 
`evaluateProgress`. Please correct this here and at all call sites (lines 67, 
93, and 148).
   
   Also, consider making this method `private` to improve encapsulation as it's 
an internal helper.
   
   ```suggestion
       private void evaluateProgress() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to