Copilot commented on code in PR #3397:
URL: https://github.com/apache/celeborn/pull/3397#discussion_r2238491493


##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }

Review Comment:
   The notifyAll() call should be placed outside the condition block at line 
94. Currently, it's only called when a batch is successfully added, but threads 
waiting on the lock should be notified regardless of whether the batch was 
added or already existed.
   ```suggestion
         }
       }
       synchronized (lock) {
   ```



##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }
+      lock.notifyAll();
     }
   }
 
   public void removeBatch(int batchId, String hostAndPushPort) {
     Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
-    if (batchIdSet != null) {
-      batchIdSet.remove(batchId);
-    } else {
-      logger.info("Batches of {} in flight is null.", hostAndPushPort);
-    }
-    totalInflightReqs.decrement();
-    if (maxInFlightBytesSizeEnabled) {
-      int inflightBatchBytesSize =
-          
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
-      LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
-      if (inflightBytesSize != null) {
-        inflightBytesSize.add(inflightBatchBytesSize);
+    synchronized (lock) {
+      if (batchIdSet != null) {
+        batchIdSet.remove(batchId);
+      } else {
+        logger.info("Batches of {} in flight is null.", hostAndPushPort);
+      }
+      totalInflightReqs.decrement();
+      if (maxInFlightBytesSizeEnabled) {
+        int inflightBatchBytesSize =
+            
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
+        LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
+        if (inflightBytesSize != null) {
+          inflightBytesSize.add(inflightBatchBytesSize);
+        }
+        totalInflightBytes.add(inflightBatchBytesSize);
       }

Review Comment:
   The notifyAll() call should be placed outside the condition block at line 
116. Currently, it's only called when a batch is successfully removed, but 
threads waiting on the lock should be notified regardless of whether the batch 
existed or not.
   ```suggestion
         }
       }
       synchronized (lock) {
   ```



##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }
+      lock.notifyAll();
     }
   }
 
   public void removeBatch(int batchId, String hostAndPushPort) {
     Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
-    if (batchIdSet != null) {
-      batchIdSet.remove(batchId);
-    } else {
-      logger.info("Batches of {} in flight is null.", hostAndPushPort);
-    }
-    totalInflightReqs.decrement();
-    if (maxInFlightBytesSizeEnabled) {
-      int inflightBatchBytesSize =
-          
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
-      LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
-      if (inflightBytesSize != null) {
-        inflightBytesSize.add(inflightBatchBytesSize);
+    synchronized (lock) {

Review Comment:
   The synchronized block should start earlier, before accessing 
`inflightBatchesPerAddress.get(hostAndPushPort)` at line 98. The current 
implementation has a race condition where the batch set could be modified 
between the get() call and the synchronized block.
   ```suggestion
       synchronized (lock) {
         Set<Integer> batchIdSet = 
inflightBatchesPerAddress.get(hostAndPushPort);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to