This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit e6657e708b07c80132f460594a142b1bc5cd7438
Author: Mridul Muralidharan <[email protected]>
AuthorDate: Sun Oct 22 22:19:51 2023 +0800

    [CELEBORN-1071] Ensure guardedBy is satisfied, fix DCL bugs as well
    
    Ensure appropriate lock is held when accessing/mutating state - as marked 
with `GuardedBy`.
    More [here](https://errorprone.info/bugpattern/GuardedBy).
    
    This also fixes 
[DCL](https://errorprone.info/bugpattern/DoubleCheckedLocking) bugs observed.
    
    Fix bug with locking as identified by error-prone
    
    No
    
    Unit tests
    
    Closes #2018 from mridulm/fix-locking-issues-found.
    
    Authored-by: Mridul Muralidharan <mridulatgmail.com>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/worker/storage/CreditStreamManager.java | 36 ++++++++--------
 .../service/deploy/worker/storage/FileWriter.java  | 41 ++++++++++--------
 .../worker/storage/MapDataPartitionReader.java     | 50 ++++++++++++++++------
 3 files changed, 75 insertions(+), 52 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index 6199307ca..bf93bbeb8 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -187,27 +187,25 @@ public class CreditStreamManager {
   }
 
   private void startRecycleThread() {
-    if (recycleThread == null) {
-      synchronized (lock) {
-        if (recycleThread == null) {
-          recycleThread =
-              new Thread(
-                  () -> {
-                    while (true) {
-                      try {
-                        DelayedStreamId delayedStreamId = 
recycleStreamIds.take();
-                        cleanResource(delayedStreamId.streamId);
-                      } catch (Throwable e) {
-                        logger.warn(e.getMessage(), e);
-                      }
+    synchronized (lock) {
+      if (recycleThread == null) {
+        recycleThread =
+            new Thread(
+                () -> {
+                  while (true) {
+                    try {
+                      DelayedStreamId delayedStreamId = 
recycleStreamIds.take();
+                      cleanResource(delayedStreamId.streamId);
+                    } catch (Throwable e) {
+                      logger.warn(e.getMessage(), e);
                     }
-                  },
-                  "recycle-thread");
-          recycleThread.setDaemon(true);
-          recycleThread.start();
+                  }
+                },
+                "recycle-thread");
+        recycleThread.setDaemon(true);
+        recycleThread.start();
 
-          logger.info("start stream recycle thread");
-        }
+        logger.info("start stream recycle thread");
       }
     }
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 8c67ea186..8ec6497b4 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -152,25 +152,26 @@ public abstract class FileWriter implements 
DeviceObserver {
     numPendingWrites.decrementAndGet();
   }
 
-  @GuardedBy("flushLock")
   protected void flush(boolean finalFlush) throws IOException {
-    // flushBuffer == null here means writer already closed
-    if (flushBuffer != null) {
-      int numBytes = flushBuffer.readableBytes();
-      if (numBytes != 0) {
-        notifier.checkException();
-        notifier.numPendingFlushes.incrementAndGet();
-        FlushTask task = null;
-        if (channel != null) {
-          task = new LocalFlushTask(flushBuffer, channel, notifier);
-        } else if (fileInfo.isHdfs()) {
-          task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), 
notifier);
-        }
-        addTask(task);
-        flushBuffer = null;
-        fileInfo.updateBytesFlushed(numBytes);
-        if (!finalFlush) {
-          takeBuffer();
+    synchronized (flushLock) {
+      // flushBuffer == null here means writer already closed
+      if (flushBuffer != null) {
+        int numBytes = flushBuffer.readableBytes();
+        if (numBytes != 0) {
+          notifier.checkException();
+          notifier.numPendingFlushes.incrementAndGet();
+          FlushTask task = null;
+          if (channel != null) {
+            task = new LocalFlushTask(flushBuffer, channel, notifier);
+          } else if (fileInfo.isHdfs()) {
+            task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), 
notifier);
+          }
+          addTask(task);
+          flushBuffer = null;
+          fileInfo.updateBytesFlushed(numBytes);
+          if (!finalFlush) {
+            takeBuffer();
+          }
         }
       }
     }
@@ -372,7 +373,9 @@ public abstract class FileWriter implements DeviceObserver {
     }
 
     // real action
-    flushBuffer = flusher.takeBuffer();
+    synchronized (flushLock) {
+      flushBuffer = flusher.takeBuffer();
+    }
 
     // metrics end
     if (source.metricsCollectCriticalEnabled()) {
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index 82b2295ca..050289631 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -193,20 +193,30 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
     }
   }
 
-  public synchronized void sendData() {
-    while (!buffersToSend.isEmpty() && credits.get() > 0) {
-      RecyclableBuffer wrappedBuffer;
-      synchronized (lock) {
-        if (!isReleased) {
-          wrappedBuffer = buffersToSend.poll();
-        } else {
-          return;
-        }
+  private RecyclableBuffer fetchBufferToSend() {
+    synchronized (lock) {
+      if (!buffersToSend.isEmpty() && credits.get() > 0 && !isReleased) {
+        return buffersToSend.poll();
+      } else {
+        return null;
       }
+    }
+  }
+
+  private int getNumBuffersToSend() {
+    synchronized (lock) {
+      return buffersToSend.size();
+    }
+  }
 
-      int backlog = buffersToSend.size();
+  public synchronized void sendData() {
+    RecyclableBuffer buffer;
+    while (null != (buffer = fetchBufferToSend())) {
+      final RecyclableBuffer wrappedBuffer = buffer;
       int readableBytes = wrappedBuffer.byteBuf.readableBytes();
-      logger.debug("send data start: {}, {}, {}", streamId, readableBytes, 
backlog);
+      if (logger.isDebugEnabled()) {
+        logger.debug("send data start: {}, {}, {}", streamId, readableBytes, 
getNumBuffersToSend());
+      }
       ReadData readData = new ReadData(streamId, wrappedBuffer.byteBuf);
       associatedChannel
           .writeAndFlush(readData)
@@ -228,7 +238,15 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
       logger.debug("stream {} credit {}", streamId, currentCredit);
     }
 
-    if (readFinished && buffersToSend.isEmpty()) {
+    boolean shouldRecycle = false;
+    synchronized (lock) {
+      if (isReleased) return;
+      if (readFinished && buffersToSend.isEmpty()) {
+        shouldRecycle = true;
+      }
+    }
+
+    if (shouldRecycle) {
       recycle();
     }
   }
@@ -358,7 +376,9 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
       return true;
     } catch (Throwable throwable) {
       logger.error("Failed to read partition file.", throwable);
-      isReleased = true;
+      synchronized (lock) {
+        isReleased = true;
+      }
       throw throwable;
     }
   }
@@ -497,6 +517,8 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
   }
 
   public boolean shouldReadData() {
-    return !isReleased && !readFinished;
+    synchronized (lock) {
+      return !isReleased && !readFinished;
+    }
   }
 }

Reply via email to