This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4adfeaf09 [CELEBORN-1071] Ensure guardedBy is satisfied, fix DCL bugs
as well
4adfeaf09 is described below
commit 4adfeaf0993d699f78080bf0f185d3a9ea9641b7
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Sun Oct 22 22:19:51 2023 +0800
[CELEBORN-1071] Ensure guardedBy is satisfied, fix DCL bugs as well
### What changes were proposed in this pull request?
Ensure appropriate lock is held when accessing/mutating state - as marked
with `GuardedBy`.
More [here](https://errorprone.info/bugpattern/GuardedBy).
This also fixes
[DCL](https://errorprone.info/bugpattern/DoubleCheckedLocking) bugs observed.
### Why are the changes needed?
Fix bug with locking as identified by error-prone
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Closes #2018 from mridulm/fix-locking-issues-found.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/worker/storage/CreditStreamManager.java | 36 ++++++++--------
.../service/deploy/worker/storage/FileWriter.java | 41 ++++++++++--------
.../worker/storage/MapDataPartitionReader.java | 50 ++++++++++++++++------
3 files changed, 75 insertions(+), 52 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index 6199307ca..bf93bbeb8 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -187,27 +187,25 @@ public class CreditStreamManager {
}
private void startRecycleThread() {
- if (recycleThread == null) {
- synchronized (lock) {
- if (recycleThread == null) {
- recycleThread =
- new Thread(
- () -> {
- while (true) {
- try {
- DelayedStreamId delayedStreamId =
recycleStreamIds.take();
- cleanResource(delayedStreamId.streamId);
- } catch (Throwable e) {
- logger.warn(e.getMessage(), e);
- }
+ synchronized (lock) {
+ if (recycleThread == null) {
+ recycleThread =
+ new Thread(
+ () -> {
+ while (true) {
+ try {
+ DelayedStreamId delayedStreamId =
recycleStreamIds.take();
+ cleanResource(delayedStreamId.streamId);
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
}
- },
- "recycle-thread");
- recycleThread.setDaemon(true);
- recycleThread.start();
+ }
+ },
+ "recycle-thread");
+ recycleThread.setDaemon(true);
+ recycleThread.start();
- logger.info("start stream recycle thread");
- }
+ logger.info("start stream recycle thread");
}
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 8c67ea186..8ec6497b4 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -152,25 +152,26 @@ public abstract class FileWriter implements
DeviceObserver {
numPendingWrites.decrementAndGet();
}
- @GuardedBy("flushLock")
protected void flush(boolean finalFlush) throws IOException {
- // flushBuffer == null here means writer already closed
- if (flushBuffer != null) {
- int numBytes = flushBuffer.readableBytes();
- if (numBytes != 0) {
- notifier.checkException();
- notifier.numPendingFlushes.incrementAndGet();
- FlushTask task = null;
- if (channel != null) {
- task = new LocalFlushTask(flushBuffer, channel, notifier);
- } else if (fileInfo.isHdfs()) {
- task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(),
notifier);
- }
- addTask(task);
- flushBuffer = null;
- fileInfo.updateBytesFlushed(numBytes);
- if (!finalFlush) {
- takeBuffer();
+ synchronized (flushLock) {
+ // flushBuffer == null here means writer already closed
+ if (flushBuffer != null) {
+ int numBytes = flushBuffer.readableBytes();
+ if (numBytes != 0) {
+ notifier.checkException();
+ notifier.numPendingFlushes.incrementAndGet();
+ FlushTask task = null;
+ if (channel != null) {
+ task = new LocalFlushTask(flushBuffer, channel, notifier);
+ } else if (fileInfo.isHdfs()) {
+ task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(),
notifier);
+ }
+ addTask(task);
+ flushBuffer = null;
+ fileInfo.updateBytesFlushed(numBytes);
+ if (!finalFlush) {
+ takeBuffer();
+ }
}
}
}
@@ -372,7 +373,9 @@ public abstract class FileWriter implements DeviceObserver {
}
// real action
- flushBuffer = flusher.takeBuffer();
+ synchronized (flushLock) {
+ flushBuffer = flusher.takeBuffer();
+ }
// metrics end
if (source.metricsCollectCriticalEnabled()) {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index 82b2295ca..050289631 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -193,20 +193,30 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
}
}
- public synchronized void sendData() {
- while (!buffersToSend.isEmpty() && credits.get() > 0) {
- RecyclableBuffer wrappedBuffer;
- synchronized (lock) {
- if (!isReleased) {
- wrappedBuffer = buffersToSend.poll();
- } else {
- return;
- }
+ private RecyclableBuffer fetchBufferToSend() {
+ synchronized (lock) {
+ if (!buffersToSend.isEmpty() && credits.get() > 0 && !isReleased) {
+ return buffersToSend.poll();
+ } else {
+ return null;
}
+ }
+ }
+
+ private int getNumBuffersToSend() {
+ synchronized (lock) {
+ return buffersToSend.size();
+ }
+ }
- int backlog = buffersToSend.size();
+ public synchronized void sendData() {
+ RecyclableBuffer buffer;
+ while (null != (buffer = fetchBufferToSend())) {
+ final RecyclableBuffer wrappedBuffer = buffer;
int readableBytes = wrappedBuffer.byteBuf.readableBytes();
- logger.debug("send data start: {}, {}, {}", streamId, readableBytes,
backlog);
+ if (logger.isDebugEnabled()) {
+ logger.debug("send data start: {}, {}, {}", streamId, readableBytes,
getNumBuffersToSend());
+ }
ReadData readData = new ReadData(streamId, wrappedBuffer.byteBuf);
associatedChannel
.writeAndFlush(readData)
@@ -228,7 +238,15 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
logger.debug("stream {} credit {}", streamId, currentCredit);
}
- if (readFinished && buffersToSend.isEmpty()) {
+ boolean shouldRecycle = false;
+ synchronized (lock) {
+ if (isReleased) return;
+ if (readFinished && buffersToSend.isEmpty()) {
+ shouldRecycle = true;
+ }
+ }
+
+ if (shouldRecycle) {
recycle();
}
}
@@ -358,7 +376,9 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
return true;
} catch (Throwable throwable) {
logger.error("Failed to read partition file.", throwable);
- isReleased = true;
+ synchronized (lock) {
+ isReleased = true;
+ }
throw throwable;
}
}
@@ -497,6 +517,8 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
}
public boolean shouldReadData() {
- return !isReleased && !readFinished;
+ synchronized (lock) {
+ return !isReleased && !readFinished;
+ }
}
}