This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3a64a8b7c [CELEBORN-890][BUG] PushHandler should check whether
FileWriter has closed to avoid data lost
3a64a8b7c is described below
commit 3a64a8b7cb80485444e9da922b710557b76d1f55
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Sat Aug 12 00:13:34 2023 +0800
[CELEBORN-890][BUG] PushHandler should check whether FileWriter has closed
to avoid data lost
### What changes were proposed in this pull request?
This PR fixes a bug that in rare cases it may cause data lost.
### Why are the changes needed?
I received a bug report from one of the users that in an extreme case small
data lost happens. I
reproduced the bug under the following conditions:
1. Shuffle data size for one partition id is relatively large, for example
400GB
2. `celeborn.client.shuffle.partitionSplit.mode` is set to HARD
3. `celeborn.client.shuffle.batchHandleCommitPartition.enabled` is enabled
At the mean time, there are warning messages in worker's log
```
23/08/11 17:10:04,501 WARN [push-server-6-44] PushDataHandler: Append data
failed for task(shuffle application_1691635581416_0021-0, map 746, attempt 0),
caused by AlreadyClosedException, endedAttempt -1, error message: FileWriter
has already closed!, fileName
/mnt/disk1/celeborn-worker/shuffle_data/application_1691635581416_0021/0/0-107-0
23/08/11 17:12:04,445 WARN [push-server-6-35] PushDataHandler: Append data
failed for task(shuffle application_1691635581416_0021-0, map 3016, attempt 0),
caused by AlreadyClosedException, endedAttempt -1, error message: FileWriter
has already closed!, fileName
/mnt/disk3/celeborn-worker/shuffle_data/application_1691635581416_0021/0/0-356-0
```

After digging into it, I found the reason for the data lost is as follows:
1. For some partition id in some worker, the file size exceeds
`celeborn.client.shuffle.partitionSplit.threshold`, then
`CommitManager` in `LifecycleManager` will trigger `CommitFiles` because
`batchHandleCommitPartition` is enabled
2. Before `CommitFile` finishes, `PushDataHandler` receives `PushData` or
`PushMergedData`, it finds that the partition has not committed yet, and is
preparing to call `fileWriter.incrementPendingWrites()` and `callback.onSuccess`
3. Before `PushDataHandler` calls `fileWriter.incrementPendingWrites()`,
the `CommitFiles` finishes and the FileWriter
successfully closes.
4. Then `PushDataHandler` calls `fileWriter.incrementPendingWrites()` and
`callback.onSuccess`. After this time,
`ShuffleClient` thinks the `PushData` succeeds. However, when
`PushDataHandler` calls `fileWriter.write()`, it
finds it already closed and throws the above exception. However, the
exception is ignored, so the data lost happens.
This PR fixes this by checking whether FileWriter has closed after calling
`incrementPendingWrites`. If true,
`PushDataHandler` calls `onFailure`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1808 from waitinfuture/890.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../service/deploy/worker/storage/FileWriter.java | 4 +++
.../service/deploy/worker/PushDataHandler.scala | 40 ++++++++++++++++++----
2 files changed, 38 insertions(+), 6 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 01f7d9705..7054ad108 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -249,6 +249,10 @@ public abstract class FileWriter implements DeviceObserver
{
void run() throws IOException;
}
+ public boolean isClosed() {
+ return closed;
+ }
+
protected synchronized long close(
RunnableWithIOException tryClose,
RunnableWithIOException streamClose,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 97deb7f6b..32da0ed6a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -238,6 +238,14 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
fileWriter.incrementPendingWrites()
+ if (fileWriter.isClosed) {
+ logWarning(
+ s"[handlePushData] FileWriter is already closed! File path
${fileWriter.getFileInfo.getFilePath}")
+ callbackWithTimer.onFailure(new CelebornIOException("File already
closed!"))
+ fileWriter.decrementPendingWrites()
+ return;
+ }
+
// for primary, send data to replica
if (doReplicate) {
pushData.body().retain()
@@ -376,8 +384,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
shuffleMapperAttempts.get(shuffleKey).get(mapId)
} else -1
// TODO just info log for ended attempt
- logWarning(s"Append data failed for task(shuffle $shuffleKey, map
$mapId, attempt" +
- s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
+ logError(
+ s"[handlePushData] Append data failed for task(shuffle $shuffleKey,
map $mapId, attempt" +
+ s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
case e: Exception =>
logError("Exception encountered when write.", e)
}
@@ -499,6 +508,15 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
}
fileWriters.foreach(_.incrementPendingWrites())
+ val closedFileWriter = fileWriters.find(_.isClosed)
+ if (closedFileWriter.isDefined) {
+ logWarning(
+ s"[handlePushMergedData] FileWriter is already closed! File path
${closedFileWriter.get.getFileInfo.getFilePath}")
+ callbackWithTimer.onFailure(new CelebornIOException("File already
closed!"))
+ fileWriters.foreach(_.decrementPendingWrites())
+ return
+ }
+
// for primary, send data to replica
if (doReplicate) {
pushMergedData.body().retain()
@@ -649,8 +667,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
shuffleMapperAttempts.get(shuffleKey).get(mapId)
} else -1
// TODO just info log for ended attempt
- logWarning(s"Append data failed for task(shuffle $shuffleKey, map
$mapId, attempt" +
- s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
+ logError(
+ s"[handlePushMergedData] Append data failed for task(shuffle
$shuffleKey, map $mapId, attempt" +
+ s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
case e: Exception =>
logError("Exception encountered when write.", e)
}
@@ -802,6 +821,14 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
fileWriter.incrementPendingWrites()
+ if (fileWriter.isClosed) {
+ logWarning(
+ s"[handleMapPartitionPushData] FileWriter is already closed! File path
${fileWriter.getFileInfo.getFilePath}")
+ callback.onFailure(new CelebornIOException("File already closed!"))
+ fileWriter.decrementPendingWrites()
+ return;
+ }
+
// for primary, send data to replica
if (location.hasPeer && isPrimary) {
// to do
@@ -821,8 +848,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
shuffleMapperAttempts.get(shuffleKey).get(mapId)
} else -1
// TODO just info log for ended attempt
- logWarning(s"Append data failed for task(shuffle $shuffleKey, map
$mapId, attempt" +
- s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
+ logError(
+ s"[handleMapPartitionPushData] Append data failed for task(shuffle
$shuffleKey, map $mapId, attempt" +
+ s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
case e: Exception =>
logError("Exception encountered when write.", e)
}