This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit e6657e708b07c80132f460594a142b1bc5cd7438 Author: Mridul Muralidharan <[email protected]> AuthorDate: Sun Oct 22 22:19:51 2023 +0800 [CELEBORN-1071] Ensure guardedBy is satisfied, fix DCL bugs as well Ensure appropriate lock is held when accessing/mutating state - as marked with `GuardedBy`. More [here](https://errorprone.info/bugpattern/GuardedBy). This also fixes [DCL](https://errorprone.info/bugpattern/DoubleCheckedLocking) bugs observed. Fix bug with locking as identified by error-prone No Unit tests Closes #2018 from mridulm/fix-locking-issues-found. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <[email protected]> --- .../deploy/worker/storage/CreditStreamManager.java | 36 ++++++++-------- .../service/deploy/worker/storage/FileWriter.java | 41 ++++++++++-------- .../worker/storage/MapDataPartitionReader.java | 50 ++++++++++++++++------ 3 files changed, 75 insertions(+), 52 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java index 6199307ca..bf93bbeb8 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java @@ -187,27 +187,25 @@ public class CreditStreamManager { } private void startRecycleThread() { - if (recycleThread == null) { - synchronized (lock) { - if (recycleThread == null) { - recycleThread = - new Thread( - () -> { - while (true) { - try { - DelayedStreamId delayedStreamId = recycleStreamIds.take(); - cleanResource(delayedStreamId.streamId); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } + synchronized (lock) { + if (recycleThread == null) { + recycleThread = + new Thread( + () -> { + while (true) { + try { + DelayedStreamId delayedStreamId = recycleStreamIds.take(); + cleanResource(delayedStreamId.streamId); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); } - }, - "recycle-thread"); - recycleThread.setDaemon(true); - recycleThread.start(); + } + }, + "recycle-thread"); + recycleThread.setDaemon(true); + recycleThread.start(); - logger.info("start stream recycle thread"); - } + logger.info("start stream recycle thread"); } } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java index 8c67ea186..8ec6497b4 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java @@ -152,25 +152,26 @@ public abstract class FileWriter implements DeviceObserver { numPendingWrites.decrementAndGet(); } - @GuardedBy("flushLock") protected void flush(boolean finalFlush) throws IOException { - // flushBuffer == null here means writer already closed - if (flushBuffer != null) { - int numBytes = flushBuffer.readableBytes(); - if (numBytes != 0) { - notifier.checkException(); - notifier.numPendingFlushes.incrementAndGet(); - FlushTask task = null; - if (channel != null) { - task = new LocalFlushTask(flushBuffer, channel, notifier); - } else if (fileInfo.isHdfs()) { - task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier); - } - addTask(task); - flushBuffer = null; - fileInfo.updateBytesFlushed(numBytes); - if (!finalFlush) { - takeBuffer(); + synchronized (flushLock) { + // flushBuffer == null here means writer already closed + if (flushBuffer != null) { + int numBytes = flushBuffer.readableBytes(); + if (numBytes != 0) { + notifier.checkException(); + notifier.numPendingFlushes.incrementAndGet(); + FlushTask task = null; + if (channel != null) { + task = new LocalFlushTask(flushBuffer, channel, notifier); + } else if (fileInfo.isHdfs()) { + task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier); + } + addTask(task); + flushBuffer = null; + fileInfo.updateBytesFlushed(numBytes); + if (!finalFlush) { + takeBuffer(); + } } } } @@ -372,7 +373,9 @@ public abstract class FileWriter implements DeviceObserver { } // real action - flushBuffer = flusher.takeBuffer(); + synchronized (flushLock) { + flushBuffer = flusher.takeBuffer(); + } // metrics end if (source.metricsCollectCriticalEnabled()) { diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java index 82b2295ca..050289631 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java @@ -193,20 +193,30 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader } } - public synchronized void sendData() { - while (!buffersToSend.isEmpty() && credits.get() > 0) { - RecyclableBuffer wrappedBuffer; - synchronized (lock) { - if (!isReleased) { - wrappedBuffer = buffersToSend.poll(); - } else { - return; - } + private RecyclableBuffer fetchBufferToSend() { + synchronized (lock) { + if (!buffersToSend.isEmpty() && credits.get() > 0 && !isReleased) { + return buffersToSend.poll(); + } else { + return null; } + } + } + + private int getNumBuffersToSend() { + synchronized (lock) { + return buffersToSend.size(); + } + } - int backlog = buffersToSend.size(); + public synchronized void sendData() { + RecyclableBuffer buffer; + while (null != (buffer = fetchBufferToSend())) { + final RecyclableBuffer wrappedBuffer = buffer; int readableBytes = wrappedBuffer.byteBuf.readableBytes(); - logger.debug("send data start: {}, {}, {}", streamId, readableBytes, backlog); + if (logger.isDebugEnabled()) { + logger.debug("send data start: {}, {}, {}", streamId, readableBytes, getNumBuffersToSend()); + } ReadData readData = new ReadData(streamId, wrappedBuffer.byteBuf); associatedChannel .writeAndFlush(readData) @@ -228,7 +238,15 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader logger.debug("stream {} credit {}", streamId, currentCredit); } - if (readFinished && buffersToSend.isEmpty()) { + boolean shouldRecycle = false; + synchronized (lock) { + if (isReleased) return; + if (readFinished && buffersToSend.isEmpty()) { + shouldRecycle = true; + } + } + + if (shouldRecycle) { recycle(); } } @@ -358,7 +376,9 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader return true; } catch (Throwable throwable) { logger.error("Failed to read partition file.", throwable); - isReleased = true; + synchronized (lock) { + isReleased = true; + } throw throwable; } } @@ -497,6 +517,8 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader } public boolean shouldReadData() { - return !isReleased && !readFinished; + synchronized (lock) { + return !isReleased && !readFinished; + } } }
