This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e1bebb9e5 [CELEBORN-1668] Fix NPE when handle closed file writers
e1bebb9e5 is described below
commit e1bebb9e5bf95f813460ab61de30766b9defa057
Author: mingji <[email protected]>
AuthorDate: Thu Oct 24 17:50:16 2024 +0800
[CELEBORN-1668] Fix NPE when handle closed file writers
### What changes were proposed in this pull request?
To fix an NPE when handling the closed file writers.
### Why are the changes needed?
If a file writer stores its shuffle data in memory, the disk file info
object will be null, causing NPE.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #2846 from FMX/b1688.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/common/meta/DiskFileInfo.java | 1 +
.../org/apache/celeborn/common/meta/FileInfo.java | 2 ++
.../celeborn/common/meta/MemoryFileInfo.java | 5 +++++
.../deploy/worker/storage/PartitionDataWriter.java | 2 +-
.../service/deploy/worker/PushDataHandler.scala | 25 +++++++++++-----------
5 files changed, 21 insertions(+), 14 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index 51978ac2e..bcef84a6e 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -84,6 +84,7 @@ public class DiskFileInfo extends FileInfo {
return new File(filePath);
}
+ @Override
public String getFilePath() {
return filePath;
}
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 412c1fe85..e81b72936 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -106,4 +106,6 @@ public abstract class FileInfo {
return streams.isEmpty();
}
}
+
+ public abstract String getFilePath();
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
index f75826b84..9b933ea6c 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java
@@ -80,4 +80,9 @@ public class MemoryFileInfo extends FileInfo {
logger.info("Memory File Info {} expire, removed {}", this, bufferSize);
return bufferSize;
}
+
+ @Override
+ public String getFilePath() {
+ return "";
+ }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 194796c08..542ccd249 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -541,7 +541,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
}
- protected FileInfo getCurrentFileInfo() {
+ public FileInfo getCurrentFileInfo() {
if (!isMemoryShuffleFile.get()) {
return diskFileInfo;
} else {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 2652b14cf..592e93f09 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -264,10 +264,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
fileWriter.incrementPendingWrites()
if (fileWriter.isClosed) {
- val diskFileInfo = fileWriter.getDiskFileInfo
+ val fileInfo = fileWriter.getCurrentFileInfo
logWarning(
- s"[handlePushData] FileWriter is already closed! File path
${diskFileInfo.getFilePath} " +
- s"length ${diskFileInfo.getFileLength}")
+ s"[handlePushData] FileWriter is already closed! File path
${fileInfo.getFilePath} " +
+ s"length ${fileInfo.getFileLength}")
callbackWithTimer.onFailure(new CelebornIOException("File already
closed!"))
fileWriter.decrementPendingWrites()
return
@@ -546,10 +546,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val closedFileWriter = fileWriters.find(_.isClosed)
if (closedFileWriter.isDefined) {
- val diskFileInfo = closedFileWriter.get.getDiskFileInfo
+ val fileInfo = closedFileWriter.get.getCurrentFileInfo
logWarning(
- s"[handlePushMergedData] FileWriter is already closed! File path
${diskFileInfo.getFilePath} " +
- s"length ${diskFileInfo.getFileLength}")
+ s"[handlePushMergedData] FileWriter is already closed! File path
${fileInfo.getFilePath} " +
+ s"length ${fileInfo.getFileLength}")
callbackWithTimer.onFailure(new CelebornIOException("File already
closed!"))
fileWriters.foreach(_.decrementPendingWrites())
return
@@ -824,10 +824,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
fileWriter.incrementPendingWrites()
if (fileWriter.isClosed) {
- val diskFileInfo = fileWriter.getDiskFileInfo
+ val fileInfo = fileWriter.getCurrentFileInfo
logWarning(
- s"[handleMapPartitionPushData] FileWriter is already closed! File path
${diskFileInfo.getFilePath} " +
- s"length ${diskFileInfo.getFileLength}")
+ s"[handleMapPartitionPushData] FileWriter is already closed! File path
${fileInfo.getFilePath} " +
+ s"length ${fileInfo.getFileLength}")
callback.onFailure(new CelebornIOException("File already closed!"))
fileWriter.decrementPendingWrites()
return
@@ -1029,8 +1029,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
// During worker shutdown, worker will return HARD_SPLIT for all existed
partition.
// This should before return exception to make current push request revive
and retry.
- val isPartitionSplitEnabled = fileWriter.asInstanceOf[
- MapPartitionDataWriter].getDiskFileInfo.isPartitionSplitEnabled
+ val isPartitionSplitEnabled =
fileWriter.getCurrentFileInfo.isPartitionSplitEnabled
if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
@@ -1248,8 +1247,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
|diskFull:$diskFull,
|partitionSplitMinimumSize:$partitionSplitMinimumSize,
|splitThreshold:${fileWriter.getSplitThreshold},
- |fileLength:${fileWriter.getDiskFileInfo.getFileLength}
- |fileName:${fileWriter.getDiskFileInfo.getFilePath}
+ |fileLength:${fileWriter.getCurrentFileInfo.getFileLength}
+ |fileName:${fileWriter.getCurrentFileInfo.getFilePath}
|""".stripMargin)
if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
return true