This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 7ae1fcc1c [CELEBORN-1558] Fix the incorrect decrement of pendingWrites
in handlePushMergeData
7ae1fcc1c is described below
commit 7ae1fcc1c49e89bd17ceea908a9df6a8faac77ef
Author: Shuang <[email protected]>
AuthorDate: Mon Aug 12 15:18:17 2024 +0800
[CELEBORN-1558] Fix the incorrect decrement of pendingWrites in
handlePushMergeData
### What changes were proposed in this pull request?
1. Fix the incorrect decrement of pendingWrites for FileWriter
2. Improve some logs about hardSplit/ExceptionLogs
### Why are the changes needed?
There are multiple file writers that write data in handlePushMergeData. If
the previous FileWriter has already been closed, the next
decrementPendingWrites will use an incorrect FileWriter. And this will cause
timeout when commitFiles.
> java.io.IOException: Wait pending actions timeout, counter 1
at
org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriter.waitOnNoPending(PartitionDataWriter.java)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA & manual test
Closes #2677 from RexXiong/CELEBORN-1558.
Authored-by: Shuang <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 6b24f5d9bd4f5693493e25ac5abb64bef4cea8d2)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/worker/storage/PartitionDataWriter.java | 2 +-
.../celeborn/service/deploy/worker/PushDataHandler.scala | 16 +++++++++++-----
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index f40f8ee66..b8cf6300f 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -553,7 +553,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
waitTime -= WAIT_INTERVAL_MS;
}
if (counter.get() > 0) {
- IOException ioe = new IOException("Wait pending actions timeout.");
+ IOException ioe = new IOException("Wait pending actions timeout,
Counter: " + counter.get());
notifier.setException(ioe);
throw ioe;
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 0fe387f58..895e029f4 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -265,8 +265,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
fileWriter.incrementPendingWrites()
if (fileWriter.isClosed) {
+ val diskFileInfo = fileWriter.getDiskFileInfo
logWarning(
- s"[handlePushData] FileWriter is already closed! File path
${fileWriter.getDiskFileInfo.getFilePath}")
+ s"[handlePushData] FileWriter is already closed! File path
${diskFileInfo.getFilePath} " +
+ s"length ${diskFileInfo.getFileLength}")
callbackWithTimer.onFailure(new CelebornIOException("File already
closed!"))
fileWriter.decrementPendingWrites()
return
@@ -536,8 +538,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val closedFileWriter = fileWriters.find(_.isClosed)
if (closedFileWriter.isDefined) {
+ val diskFileInfo = closedFileWriter.get.getDiskFileInfo
logWarning(
- s"[handlePushMergedData] FileWriter is already closed! File path
${closedFileWriter.get.getDiskFileInfo.getFilePath}")
+ s"[handlePushMergedData] FileWriter is already closed! File path
${diskFileInfo.getFilePath} " +
+ s"length ${diskFileInfo.getFileLength}")
callbackWithTimer.onFailure(new CelebornIOException("File already
closed!"))
fileWriters.foreach(_.decrementPendingWrites())
return
@@ -812,8 +816,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
fileWriter.incrementPendingWrites()
if (fileWriter.isClosed) {
+ val diskFileInfo = fileWriter.getDiskFileInfo
logWarning(
- s"[handleMapPartitionPushData] FileWriter is already closed! File path
${fileWriter.getDiskFileInfo.getFilePath}")
+ s"[handleMapPartitionPushData] FileWriter is already closed! File path
${diskFileInfo.getFilePath} " +
+ s"length ${diskFileInfo.getFileLength}")
callback.onFailure(new CelebornIOException("File already closed!"))
fileWriter.decrementPendingWrites()
return
@@ -1223,7 +1229,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
} else {
workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
- logTrace(
+ logInfo(
s"""
|CheckDiskFullAndSplit hardSplit
|diskFull:$diskFull,
@@ -1287,8 +1293,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
var index = 0
var fileWriter: PartitionDataWriter = null
while (index < fileWriters.length) {
+ fileWriter = fileWriters(index)
if (!writePromise.isCompleted) {
- fileWriter = fileWriters(index)
val offset = body.readerIndex() + batchOffsets(index)
val length =
if (index == fileWriters.length - 1) {