This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a64a8b7c [CELEBORN-890][BUG] PushHandler should check whether 
FileWriter has closed to avoid data lost
3a64a8b7c is described below

commit 3a64a8b7cb80485444e9da922b710557b76d1f55
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Sat Aug 12 00:13:34 2023 +0800

    [CELEBORN-890][BUG] PushHandler should check whether FileWriter has closed 
to avoid data lost
    
    ### What changes were proposed in this pull request?
    This PR fixes a bug that in rare cases it may cause data lost.
    
    ### Why are the changes needed?
    I received a bug report from one of the users that in an extreme case small 
data lost happens. I
    reproduced the bug under the following conditions:
    1. Shuffle data size for one partition id is relatively large, for example 
400GB
    2. `celeborn.client.shuffle.partitionSplit.mode` is set to HARD
    3. `celeborn.client.shuffle.batchHandleCommitPartition.enabled` is enabled
    
    At the mean time, there are warning messages in worker's log
    ```
    23/08/11 17:10:04,501 WARN [push-server-6-44] PushDataHandler: Append data 
failed for task(shuffle application_1691635581416_0021-0, map 746, attempt 0), 
caused by AlreadyClosedException, endedAttempt -1, error message: FileWriter 
has already closed!, fileName 
/mnt/disk1/celeborn-worker/shuffle_data/application_1691635581416_0021/0/0-107-0
    23/08/11 17:12:04,445 WARN [push-server-6-35] PushDataHandler: Append data 
failed for task(shuffle application_1691635581416_0021-0, map 3016, attempt 0), 
caused by AlreadyClosedException, endedAttempt -1, error message: FileWriter 
has already closed!, fileName 
/mnt/disk3/celeborn-worker/shuffle_data/application_1691635581416_0021/0/0-356-0
    ```
    
    
![image](https://github.com/apache/incubator-celeborn/assets/948245/c05f25ba-4b24-4483-8baf-96915e40da17)
    
    After digging into it, I found the reason for the data lost is as follows:
    1. For some partition id in some worker, the file size exceeds 
`celeborn.client.shuffle.partitionSplit.threshold`, then
       `CommitManager` in `LifecycleManager` will trigger `CommitFiles` because 
`batchHandleCommitPartition` is enabled
    2. Before `CommitFile` finishes, `PushDataHandler` receives `PushData` or 
`PushMergedData`, it finds that the partition has not committed yet, and is 
preparing to call `fileWriter.incrementPendingWrites()` and `callback.onSuccess`
    3. Before `PushDataHandler` calls `fileWriter.incrementPendingWrites()`, 
the `CommitFiles` finishes and the FileWriter
        successfully closes.
    4. Then `PushDataHandler` calls `fileWriter.incrementPendingWrites()` and  
`callback.onSuccess`. After this time,
        `ShuffleClient` thinks the `PushData` succeeds. However, when 
`PushDataHandler` calls `fileWriter.write()`, it
        finds it already closed and throws the above exception. However, the 
exception is ignored, so the data lost happens.
    
    This PR fixes this by checking whether FileWriter has closed after calling 
`incrementPendingWrites`. If true,
    `PushDataHandler` calls `onFailure`.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1808 from waitinfuture/890.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../service/deploy/worker/storage/FileWriter.java  |  4 +++
 .../service/deploy/worker/PushDataHandler.scala    | 40 ++++++++++++++++++----
 2 files changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 01f7d9705..7054ad108 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -249,6 +249,10 @@ public abstract class FileWriter implements DeviceObserver 
{
     void run() throws IOException;
   }
 
+  public boolean isClosed() {
+    return closed;
+  }
+
   protected synchronized long close(
       RunnableWithIOException tryClose,
       RunnableWithIOException streamClose,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 97deb7f6b..32da0ed6a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -238,6 +238,14 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
 
     fileWriter.incrementPendingWrites()
 
+    if (fileWriter.isClosed) {
+      logWarning(
+        s"[handlePushData] FileWriter is already closed! File path 
${fileWriter.getFileInfo.getFilePath}")
+      callbackWithTimer.onFailure(new CelebornIOException("File already 
closed!"))
+      fileWriter.decrementPendingWrites()
+      return;
+    }
+
     // for primary, send data to replica
     if (doReplicate) {
       pushData.body().retain()
@@ -376,8 +384,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             shuffleMapperAttempts.get(shuffleKey).get(mapId)
           } else -1
         // TODO just info log for ended attempt
-        logWarning(s"Append data failed for task(shuffle $shuffleKey, map 
$mapId, attempt" +
-          s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
+        logError(
+          s"[handlePushData] Append data failed for task(shuffle $shuffleKey, 
map $mapId, attempt" +
+            s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
       case e: Exception =>
         logError("Exception encountered when write.", e)
     }
@@ -499,6 +508,15 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     }
     fileWriters.foreach(_.incrementPendingWrites())
 
+    val closedFileWriter = fileWriters.find(_.isClosed)
+    if (closedFileWriter.isDefined) {
+      logWarning(
+        s"[handlePushMergedData] FileWriter is already closed! File path 
${closedFileWriter.get.getFileInfo.getFilePath}")
+      callbackWithTimer.onFailure(new CelebornIOException("File already 
closed!"))
+      fileWriters.foreach(_.decrementPendingWrites())
+      return
+    }
+
     // for primary, send data to replica
     if (doReplicate) {
       pushMergedData.body().retain()
@@ -649,8 +667,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
               shuffleMapperAttempts.get(shuffleKey).get(mapId)
             } else -1
           // TODO just info log for ended attempt
-          logWarning(s"Append data failed for task(shuffle $shuffleKey, map 
$mapId, attempt" +
-            s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
+          logError(
+            s"[handlePushMergedData] Append data failed for task(shuffle 
$shuffleKey, map $mapId, attempt" +
+              s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
         case e: Exception =>
           logError("Exception encountered when write.", e)
       }
@@ -802,6 +821,14 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
 
     fileWriter.incrementPendingWrites()
 
+    if (fileWriter.isClosed) {
+      logWarning(
+        s"[handleMapPartitionPushData] FileWriter is already closed! File path 
${fileWriter.getFileInfo.getFilePath}")
+      callback.onFailure(new CelebornIOException("File already closed!"))
+      fileWriter.decrementPendingWrites()
+      return;
+    }
+
     // for primary, send data to replica
     if (location.hasPeer && isPrimary) {
       // to do
@@ -821,8 +848,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             shuffleMapperAttempts.get(shuffleKey).get(mapId)
           } else -1
         // TODO just info log for ended attempt
-        logWarning(s"Append data failed for task(shuffle $shuffleKey, map 
$mapId, attempt" +
-          s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
+        logError(
+          s"[handleMapPartitionPushData] Append data failed for task(shuffle 
$shuffleKey, map $mapId, attempt" +
+            s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
       case e: Exception =>
         logError("Exception encountered when write.", e)
     }

Reply via email to