This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 870da8655 [CELEBORN-2021] Fix issues on regression HDFS and OSS before
release 0.6
870da8655 is described below
commit 870da865588a20b34d6e07ab0a8629cdd793433c
Author: mingji <[email protected]>
AuthorDate: Wed Jun 4 15:37:50 2025 +0800
[CELEBORN-2021] Fix issues on regression HDFS and OSS before release 0.6
### What changes were proposed in this pull request?
1. Fix a NPE when reading HDFS files.
2. Change partition manager will generate correct storage info.
3. Add assertions for tier writers.
### Why are the changes needed?
Regression for release 0.6.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster tests.
Closes #3302 from FMX/b2021.
Authored-by: mingji <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 7bde738e5e85da6d6804174ddf34b2547fd65815)
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/client/read/DfsPartitionReader.java | 12 ++++++------
.../org/apache/celeborn/client/LifecycleManager.scala | 15 +++++++++++----
.../service/deploy/worker/storage/StoragePolicy.scala | 2 +-
.../service/deploy/worker/storage/TierWriter.scala | 5 ++++-
.../worker/storage/PartitionDataWriterSuiteUtils.java | 4 ++--
5 files changed, 24 insertions(+), 14 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 58fb1aac0..e39fb7ce8 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -101,12 +101,12 @@ public class DfsPartitionReader implements
PartitionReader {
this.metricsCallback = metricsCallback;
this.location = location;
- if (location.getStorageInfo() != null) {
- if (location.getStorageInfo().getType() == StorageInfo.Type.S3) {
- this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
- } else if (location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
- this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
- }
+ if (location.getStorageInfo() != null
+ && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
+ this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+ } else if (location.getStorageInfo() != null
+ && location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
+ this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
} else {
this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 1b44c14bb..32cee3dda 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -36,10 +36,12 @@ import scala.util.Random
import com.google.common.annotations.VisibleForTesting
import com.google.common.cache.{Cache, CacheBuilder}
+import org.roaringbitmap.RoaringBitmap
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers}
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.ACTIVE_STORAGE_TYPES
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
@@ -101,6 +103,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int,
PartitionLocation]]()
private val userIdentifier: UserIdentifier =
IdentityProvider.instantiate(conf).provide()
private val availableStorageTypes = conf.availableStorageTypes
+ private val storageTypes =
+
conf.get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf).toList
// app shuffle id -> LinkedHashMap of (app shuffle identifier, (shuffle id,
fetch status))
private val shuffleIdMapping = JavaUtils.newConcurrentHashMap[
Int,
@@ -1530,8 +1534,10 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
candidates(primaryIndex).pushPort,
candidates(primaryIndex).fetchPort,
candidates(primaryIndex).replicatePort,
- PartitionLocation.Mode.PRIMARY)
- primaryLocation.getStorageInfo.availableStorageTypes =
availableStorageTypes
+ PartitionLocation.Mode.PRIMARY,
+ null,
+ new StorageInfo("", storageTypes.head, availableStorageTypes),
+ new RoaringBitmap())
if (pushReplicateEnabled) {
var replicaIndex = (primaryIndex + 1) % candidates.size
while (pushRackAwareEnabled && isOnSameRack(primaryIndex, replicaIndex)
@@ -1551,8 +1557,9 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
candidates(replicaIndex).fetchPort,
candidates(replicaIndex).replicatePort,
PartitionLocation.Mode.REPLICA,
- primaryLocation)
- replicaLocation.getStorageInfo.availableStorageTypes =
availableStorageTypes
+ primaryLocation,
+ new StorageInfo("", storageTypes.head, availableStorageTypes),
+ new RoaringBitmap())
primaryLocation.setPeer(replicaLocation)
val primaryAndReplicaPairs =
slots.computeIfAbsent(candidates(replicaIndex), newLocationFunc)
primaryAndReplicaPairs._2.add(replicaLocation)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index cf526ef83..9fc2e723a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -138,7 +138,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
partitionDataWriterContext.isPartitionSplitEnabled)
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
- if (storageInfoType == StorageInfo.Type.HDD || storageInfoType
== StorageInfo.Type.SSD) {
+ if ((storageInfoType == StorageInfo.Type.HDD || storageInfoType
== StorageInfo.Type.SSD) && location.getStorageInfo.localDiskAvailable()) {
new LocalTierWriter(
conf,
metaHandler,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index ecc3372e1..fda56d5d8 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -284,7 +284,7 @@ class MemoryTierWriter(
partitionDataWriterContext.getPartitionLocation.getFileName,
partitionDataWriterContext.getShuffleKey,
storageManager) {
-
+ assert(storageType == StorageInfo.Type.MEMORY)
val memoryFileStorageMaxFileSize: Long =
conf.workerMemoryFileStorageMaxFileSize
storageManager.registerMemoryPartitionWriter(
@@ -382,6 +382,7 @@ class LocalTierWriter(
partitionDataWriterContext.getPartitionLocation.getFileName,
partitionDataWriterContext.getShuffleKey,
storageManager) {
+ assert(storageType == StorageInfo.Type.HDD || storageType ==
StorageInfo.Type.SSD)
flusherBufferSize = conf.workerFlusherBufferSize
private val flushWorkerIndex: Int = flusher.getWorkerIndex
val userCongestionControlContext: UserCongestionControlContext =
@@ -510,6 +511,8 @@ class DfsTierWriter(
partitionDataWriterContext.getPartitionLocation.getFileName,
partitionDataWriterContext.getShuffleKey,
storageManager) {
+ assert(
+ storageType == StorageInfo.Type.HDFS || storageType ==
StorageInfo.Type.OSS || storageType == StorageInfo.Type.S3)
flusherBufferSize = conf.workerHdfsFlusherBufferSize
private val flushWorkerIndex: Int = flusher.getWorkerIndex
val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index dac622743..918e2e27b 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -99,7 +99,7 @@ public class PartitionDataWriterSuiteUtils {
flusher,
source,
fileInfo,
- StorageInfo.Type.MEMORY,
+ StorageInfo.Type.HDD,
context,
storageManager))
.when(storagePolicy)
@@ -235,7 +235,7 @@ public class PartitionDataWriterSuiteUtils {
flusher,
source,
fileInfo,
- StorageInfo.Type.MEMORY,
+ StorageInfo.Type.HDD,
writerContext,
storageManager))
.when(storagePolicy)