This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e1bebb9e5 [CELEBORN-1668] Fix NPE when handle closed file writers
e1bebb9e5 is described below

commit e1bebb9e5bf95f813460ab61de30766b9defa057
Author: mingji <[email protected]>
AuthorDate: Thu Oct 24 17:50:16 2024 +0800

    [CELEBORN-1668] Fix NPE when handle closed file writers
    
    ### What changes were proposed in this pull request?
    To fix an NPE when handling the closed file writers.
    
    ### Why are the changes needed?
    If a file writer stores its shuffle data in memory, the disk file info 
object will be null, causing NPE.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #2846 from FMX/b1688.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/common/meta/DiskFileInfo.java  |  1 +
 .../org/apache/celeborn/common/meta/FileInfo.java  |  2 ++
 .../celeborn/common/meta/MemoryFileInfo.java       |  5 +++++
 .../deploy/worker/storage/PartitionDataWriter.java |  2 +-
 .../service/deploy/worker/PushDataHandler.scala    | 25 +++++++++++-----------
 5 files changed, 21 insertions(+), 14 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index 51978ac2e..bcef84a6e 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -84,6 +84,7 @@ public class DiskFileInfo extends FileInfo {
     return new File(filePath);
   }
 
+  @Override
   public String getFilePath() {
     return filePath;
   }
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 412c1fe85..e81b72936 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -106,4 +106,6 @@ public abstract class FileInfo {
       return streams.isEmpty();
     }
   }
+
+  public abstract String getFilePath();
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
index f75826b84..9b933ea6c 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
@@ -80,4 +80,9 @@ public class MemoryFileInfo extends FileInfo {
     logger.info("Memory File Info {} expire, removed {}", this, bufferSize);
     return bufferSize;
   }
+
+  @Override
+  public String getFilePath() {
+    return "";
+  }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 194796c08..542ccd249 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -541,7 +541,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     }
   }
 
-  protected FileInfo getCurrentFileInfo() {
+  public FileInfo getCurrentFileInfo() {
     if (!isMemoryShuffleFile.get()) {
       return diskFileInfo;
     } else {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 2652b14cf..592e93f09 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -264,10 +264,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     fileWriter.incrementPendingWrites()
 
     if (fileWriter.isClosed) {
-      val diskFileInfo = fileWriter.getDiskFileInfo
+      val fileInfo = fileWriter.getCurrentFileInfo
       logWarning(
-        s"[handlePushData] FileWriter is already closed! File path 
${diskFileInfo.getFilePath} " +
-          s"length ${diskFileInfo.getFileLength}")
+        s"[handlePushData] FileWriter is already closed! File path 
${fileInfo.getFilePath} " +
+          s"length ${fileInfo.getFileLength}")
       callbackWithTimer.onFailure(new CelebornIOException("File already 
closed!"))
       fileWriter.decrementPendingWrites()
       return
@@ -546,10 +546,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
 
     val closedFileWriter = fileWriters.find(_.isClosed)
     if (closedFileWriter.isDefined) {
-      val diskFileInfo = closedFileWriter.get.getDiskFileInfo
+      val fileInfo = closedFileWriter.get.getCurrentFileInfo
       logWarning(
-        s"[handlePushMergedData] FileWriter is already closed! File path 
${diskFileInfo.getFilePath} " +
-          s"length ${diskFileInfo.getFileLength}")
+        s"[handlePushMergedData] FileWriter is already closed! File path 
${fileInfo.getFilePath} " +
+          s"length ${fileInfo.getFileLength}")
       callbackWithTimer.onFailure(new CelebornIOException("File already 
closed!"))
       fileWriters.foreach(_.decrementPendingWrites())
       return
@@ -824,10 +824,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     fileWriter.incrementPendingWrites()
 
     if (fileWriter.isClosed) {
-      val diskFileInfo = fileWriter.getDiskFileInfo
+      val fileInfo = fileWriter.getCurrentFileInfo
       logWarning(
-        s"[handleMapPartitionPushData] FileWriter is already closed! File path 
${diskFileInfo.getFilePath} " +
-          s"length ${diskFileInfo.getFileLength}")
+        s"[handleMapPartitionPushData] FileWriter is already closed! File path 
${fileInfo.getFilePath} " +
+          s"length ${fileInfo.getFileLength}")
       callback.onFailure(new CelebornIOException("File already closed!"))
       fileWriter.decrementPendingWrites()
       return
@@ -1029,8 +1029,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
 
     // During worker shutdown, worker will return HARD_SPLIT for all existed 
partition.
     // This should before return exception to make current push request revive 
and retry.
-    val isPartitionSplitEnabled = fileWriter.asInstanceOf[
-      MapPartitionDataWriter].getDiskFileInfo.isPartitionSplitEnabled
+    val isPartitionSplitEnabled = 
fileWriter.getCurrentFileInfo.isPartitionSplitEnabled
 
     if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
         Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
@@ -1248,8 +1247,8 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
          |diskFull:$diskFull,
          |partitionSplitMinimumSize:$partitionSplitMinimumSize,
          |splitThreshold:${fileWriter.getSplitThreshold},
-         |fileLength:${fileWriter.getDiskFileInfo.getFileLength}
-         |fileName:${fileWriter.getDiskFileInfo.getFilePath}
+         |fileLength:${fileWriter.getCurrentFileInfo.getFileLength}
+         |fileName:${fileWriter.getCurrentFileInfo.getFilePath}
          |""".stripMargin)
     if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
       return true

Reply via email to