This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 870da8655 [CELEBORN-2021] Fix issues on regression HDFS and OSS before 
release 0.6
870da8655 is described below

commit 870da865588a20b34d6e07ab0a8629cdd793433c
Author: mingji <[email protected]>
AuthorDate: Wed Jun 4 15:37:50 2025 +0800

    [CELEBORN-2021] Fix issues on regression HDFS and OSS before release 0.6
    
    ### What changes were proposed in this pull request?
    1. Fix a NPE when reading HDFS files.
    2. Change partition manager will generate correct storage info.
    3. Add assertions for tier writers.
    
    ### Why are the changes needed?
    Regression for release 0.6.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    Cluster tests.
    
    Closes #3302 from FMX/b2021.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 7bde738e5e85da6d6804174ddf34b2547fd65815)
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/client/read/DfsPartitionReader.java   | 12 ++++++------
 .../org/apache/celeborn/client/LifecycleManager.scala     | 15 +++++++++++----
 .../service/deploy/worker/storage/StoragePolicy.scala     |  2 +-
 .../service/deploy/worker/storage/TierWriter.scala        |  5 ++++-
 .../worker/storage/PartitionDataWriterSuiteUtils.java     |  4 ++--
 5 files changed, 24 insertions(+), 14 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 58fb1aac0..e39fb7ce8 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -101,12 +101,12 @@ public class DfsPartitionReader implements 
PartitionReader {
 
     this.metricsCallback = metricsCallback;
     this.location = location;
-    if (location.getStorageInfo() != null) {
-      if (location.getStorageInfo().getType() == StorageInfo.Type.S3) {
-        this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
-      } else if (location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
-        this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
-      }
+    if (location.getStorageInfo() != null
+        && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
+      this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+    } else if (location.getStorageInfo() != null
+        && location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
+      this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
     } else {
       this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
     }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 1b44c14bb..32cee3dda 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -36,10 +36,12 @@ import scala.util.Random
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.cache.{Cache, CacheBuilder}
+import org.roaringbitmap.RoaringBitmap
 
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers}
 import org.apache.celeborn.client.listener.WorkerStatusListener
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.ACTIVE_STORAGE_TYPES
 import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
 import org.apache.celeborn.common.internal.Logging
@@ -101,6 +103,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int, 
PartitionLocation]]()
   private val userIdentifier: UserIdentifier = 
IdentityProvider.instantiate(conf).provide()
   private val availableStorageTypes = conf.availableStorageTypes
+  private val storageTypes =
+    
conf.get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf).toList
   // app shuffle id -> LinkedHashMap of (app shuffle identifier, (shuffle id, 
fetch status))
   private val shuffleIdMapping = JavaUtils.newConcurrentHashMap[
     Int,
@@ -1530,8 +1534,10 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
       candidates(primaryIndex).pushPort,
       candidates(primaryIndex).fetchPort,
       candidates(primaryIndex).replicatePort,
-      PartitionLocation.Mode.PRIMARY)
-    primaryLocation.getStorageInfo.availableStorageTypes = 
availableStorageTypes
+      PartitionLocation.Mode.PRIMARY,
+      null,
+      new StorageInfo("", storageTypes.head, availableStorageTypes),
+      new RoaringBitmap())
     if (pushReplicateEnabled) {
       var replicaIndex = (primaryIndex + 1) % candidates.size
       while (pushRackAwareEnabled && isOnSameRack(primaryIndex, replicaIndex)
@@ -1551,8 +1557,9 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         candidates(replicaIndex).fetchPort,
         candidates(replicaIndex).replicatePort,
         PartitionLocation.Mode.REPLICA,
-        primaryLocation)
-      replicaLocation.getStorageInfo.availableStorageTypes = 
availableStorageTypes
+        primaryLocation,
+        new StorageInfo("", storageTypes.head, availableStorageTypes),
+        new RoaringBitmap())
       primaryLocation.setPeer(replicaLocation)
       val primaryAndReplicaPairs = 
slots.computeIfAbsent(candidates(replicaIndex), newLocationFunc)
       primaryAndReplicaPairs._2.add(replicaLocation)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index cf526ef83..9fc2e723a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -138,7 +138,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                 partitionDataWriterContext.isPartitionSplitEnabled)
               partitionDataWriterContext.setWorkingDir(workingDir)
               val metaHandler = getPartitionMetaHandler(diskFileInfo)
-              if (storageInfoType == StorageInfo.Type.HDD || storageInfoType 
== StorageInfo.Type.SSD) {
+              if ((storageInfoType == StorageInfo.Type.HDD || storageInfoType 
== StorageInfo.Type.SSD) && location.getStorageInfo.localDiskAvailable()) {
                 new LocalTierWriter(
                   conf,
                   metaHandler,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index ecc3372e1..fda56d5d8 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -284,7 +284,7 @@ class MemoryTierWriter(
     partitionDataWriterContext.getPartitionLocation.getFileName,
     partitionDataWriterContext.getShuffleKey,
     storageManager) {
-
+  assert(storageType == StorageInfo.Type.MEMORY)
   val memoryFileStorageMaxFileSize: Long = 
conf.workerMemoryFileStorageMaxFileSize
 
   storageManager.registerMemoryPartitionWriter(
@@ -382,6 +382,7 @@ class LocalTierWriter(
     partitionDataWriterContext.getPartitionLocation.getFileName,
     partitionDataWriterContext.getShuffleKey,
     storageManager) {
+  assert(storageType == StorageInfo.Type.HDD || storageType == 
StorageInfo.Type.SSD)
   flusherBufferSize = conf.workerFlusherBufferSize
   private val flushWorkerIndex: Int = flusher.getWorkerIndex
   val userCongestionControlContext: UserCongestionControlContext =
@@ -510,6 +511,8 @@ class DfsTierWriter(
     partitionDataWriterContext.getPartitionLocation.getFileName,
     partitionDataWriterContext.getShuffleKey,
     storageManager) {
+  assert(
+    storageType == StorageInfo.Type.HDFS || storageType == 
StorageInfo.Type.OSS || storageType == StorageInfo.Type.S3)
   flusherBufferSize = conf.workerHdfsFlusherBufferSize
   private val flushWorkerIndex: Int = flusher.getWorkerIndex
   val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index dac622743..918e2e27b 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -99,7 +99,7 @@ public class PartitionDataWriterSuiteUtils {
                     flusher,
                     source,
                     fileInfo,
-                    StorageInfo.Type.MEMORY,
+                    StorageInfo.Type.HDD,
                     context,
                     storageManager))
         .when(storagePolicy)
@@ -235,7 +235,7 @@ public class PartitionDataWriterSuiteUtils {
                     flusher,
                     source,
                     fileInfo,
-                    StorageInfo.Type.MEMORY,
+                    StorageInfo.Type.HDD,
                     writerContext,
                     storageManager))
         .when(storagePolicy)

Reply via email to