Copilot commented on code in PR #3397:
URL: https://github.com/apache/celeborn/pull/3397#discussion_r2239499337


##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }
+      lock.notifyAll();
     }
   }
 
   public void removeBatch(int batchId, String hostAndPushPort) {
-    Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
-    if (batchIdSet != null) {
-      batchIdSet.remove(batchId);
-    } else {
-      logger.info("Batches of {} in flight is null.", hostAndPushPort);
-    }
-    totalInflightReqs.decrement();
-    if (maxInFlightBytesSizeEnabled) {
-      int inflightBatchBytesSize =
-          
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
-      LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
-      if (inflightBytesSize != null) {
-        inflightBytesSize.add(inflightBatchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
+      if (batchIdSet != null) {
+        batchIdSet.remove(batchId);
+      } else {
+        logger.info("Batches of {} in flight is null.", hostAndPushPort);
+      }
+      totalInflightReqs.decrement();
+      if (maxInFlightBytesSizeEnabled) {
+        int inflightBatchBytesSize =
+            
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
+        LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
+        if (inflightBytesSize != null) {
+          inflightBytesSize.add(inflightBatchBytesSize);
+        }
+        totalInflightBytes.add(inflightBatchBytesSize);
       }
-      totalInflightBytes.add(inflightBatchBytesSize);
+      lock.notifyAll();

Review Comment:
   The notifyAll() call is placed outside the condition checks that determine 
if a batch was actually removed. This will wake up waiting threads even when no 
batch was removed (when batchIdSet is null or doesn't contain the batchId). 
Consider moving notifyAll() inside the conditional blocks where actual removal 
occurs.
   ```suggestion
             lock.notifyAll(); // Notify only if inflightBytesSize was updated
           }
           totalInflightBytes.add(inflightBatchBytesSize);
         }
   ```



##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }
+      lock.notifyAll();

Review Comment:
   The notifyAll() call is placed outside the condition check that determines 
if a batch was actually added. This will wake up waiting threads even when no 
batch was added (when batchIdSetPerPair.add(batchId) returns false). Consider 
moving notifyAll() inside the if block at line 88 to only notify when state 
actually changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to