This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 9c87bd45d [CELEBORN-1081] Client support 
`celeborn.storage.activeTypes` config
9c87bd45d is described below

commit 9c87bd45dee702cc734368bf2da989a7b4ab5f0b
Author: mingji <[email protected]>
AuthorDate: Fri Nov 3 20:03:11 2023 +0800

    [CELEBORN-1081] Client support `celeborn.storage.activeTypes` config
    
    1.To support `celeborn.storage.activeTypes` in Client.
    2.Master will ignore slots for "UNKNOWN_DISK".
    
    Enable client application to select storage types to use.
    
    Yes.
    
    GA and cluster.
    
    Closes #2045 from FMX/B1081.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  |  4 +-
 .../common/protocol/PartitionLocation.java         |  1 -
 .../celeborn/common/protocol/StorageInfo.java      | 94 ++++++++++++++++++++--
 common/src/main/proto/TransportMessages.proto      |  5 +-
 .../org/apache/celeborn/common/CelebornConf.scala  | 16 ++--
 .../common/protocol/message/ControlMessages.scala  |  4 +
 .../apache/celeborn/common/CelebornConfSuite.scala | 22 ++++-
 .../celeborn/common/util/PbSerDeUtilsTest.scala    | 40 ++++++++-
 docs/configuration/client.md                       |  1 +
 docs/configuration/master.md                       |  2 +-
 docs/configuration/worker.md                       |  2 +-
 .../service/deploy/master/SlotsAllocator.java      | 72 ++++++++++++-----
 .../celeborn/service/deploy/master/Master.scala    |  8 +-
 .../master/SlotsAllocatorRackAwareSuiteJ.java      |  7 +-
 .../deploy/master/SlotsAllocatorSuiteJ.java        | 12 ++-
 .../deploy/worker/storage/StorageManager.scala     |  6 +-
 16 files changed, 243 insertions(+), 53 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ebc892815..4c30b92a5 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -75,6 +75,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   val latestPartitionLocation =
     JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int, 
PartitionLocation]]()
   private val userIdentifier: UserIdentifier = 
IdentityProvider.instantiate(conf).provide()
+  private val availableStorageTypes = conf.availableStorageTypes
 
   @VisibleForTesting
   def workerSnapshots(shuffleId: Int): util.Map[WorkerInfo, 
ShufflePartitionLocationInfo] =
@@ -1025,7 +1026,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         pushReplicateEnabled,
         pushRackAwareEnabled,
         userIdentifier,
-        slotsAssignMaxWorkers)
+        slotsAssignMaxWorkers,
+        availableStorageTypes)
     val res = requestMasterRequestSlots(req)
     if (res.status != StatusCode.SUCCESS) {
       requestMasterRequestSlots(req)
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
index 08e97c0ff..8e76f3488 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
@@ -64,7 +64,6 @@ public class PartitionLocation implements Serializable {
   private StorageInfo storageInfo;
   private RoaringBitmap mapIdBitMap;
   private transient String _hostPushPort;
-
   private transient String _hostFetchPort;
 
   public PartitionLocation(PartitionLocation loc) {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java 
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index b7bc0e878..d3e1bf95b 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -38,7 +38,7 @@ public class StorageInfo implements Serializable {
     }
   }
 
-  public static String UNKNOWN_DISK = "UNKNOWN_DISK";
+  @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
   public static Map<Integer, Type> typesMap = new HashMap<>();
   public static Set<String> typeNames = new HashSet<>();
 
@@ -49,6 +49,12 @@ public class StorageInfo implements Serializable {
     }
   }
 
+  public static final int MEMORY_MASK = 0b1;
+  public static final int LOCAL_DISK_MASK = 0b10;
+  public static final int HDFS_MASK = 0b100;
+  public static final int OSS_MASK = 0b1000;
+  public static final int ALL_TYPES_AVAILABLE_MASK = 0;
+
   // Default storage Type is MEMORY.
   private Type type = Type.MEMORY;
   private String mountPoint = UNKNOWN_DISK;
@@ -56,6 +62,8 @@ public class StorageInfo implements Serializable {
   private boolean finalResult = false;
   private String filePath;
 
+  public int availableStorageTypes = 0;
+
   public StorageInfo() {}
 
   public StorageInfo(Type type, boolean isFinal, String filePath) {
@@ -64,8 +72,9 @@ public class StorageInfo implements Serializable {
     this.filePath = filePath;
   }
 
-  public StorageInfo(String mountPoint) {
+  public StorageInfo(String mountPoint, int availableStorageTypes) {
     this.mountPoint = mountPoint;
+    this.availableStorageTypes = availableStorageTypes;
   }
 
   public StorageInfo(Type type, String mountPoint) {
@@ -86,6 +95,19 @@ public class StorageInfo implements Serializable {
     this.filePath = filePath;
   }
 
+  public StorageInfo(
+      Type type,
+      String mountPoint,
+      boolean finalResult,
+      String filePath,
+      int availableStorageTypes) {
+    this.type = type;
+    this.mountPoint = mountPoint;
+    this.finalResult = finalResult;
+    this.filePath = filePath;
+    this.availableStorageTypes = availableStorageTypes;
+  }
+
   public boolean isFinalResult() {
     return finalResult;
   }
@@ -125,13 +147,50 @@ public class StorageInfo implements Serializable {
         + '}';
   }
 
+  public boolean localDiskAvailable() {
+    return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+        || (availableStorageTypes & LOCAL_DISK_MASK) > 0;
+  }
+
+  public boolean HDFSAvailable() {
+    return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+        || (availableStorageTypes & HDFS_MASK) > 0;
+  }
+
+  public boolean OSSAvailable() {
+    return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+        || (availableStorageTypes & OSS_MASK) > 0;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    StorageInfo that = (StorageInfo) o;
+    return finalResult == that.finalResult
+        && availableStorageTypes == that.availableStorageTypes
+        && type == that.type
+        && Objects.equals(mountPoint, that.mountPoint)
+        && Objects.equals(filePath, that.filePath);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(type, mountPoint, finalResult, filePath, 
availableStorageTypes);
+  }
+
+  public static final boolean validate(String typeStr) {
+    return typeNames.contains(typeStr);
+  }
+
   public static PbStorageInfo toPb(StorageInfo storageInfo) {
     String filePath = storageInfo.getFilePath();
     PbStorageInfo.Builder builder = PbStorageInfo.newBuilder();
     builder
         .setType(storageInfo.type.value)
         .setFinalResult(storageInfo.finalResult)
-        .setMountPoint(storageInfo.mountPoint);
+        .setMountPoint(storageInfo.mountPoint)
+        .setAvailableStorageTypes(storageInfo.availableStorageTypes);
     if (filePath != null) {
       builder.setFilePath(filePath);
     }
@@ -143,10 +202,29 @@ public class StorageInfo implements Serializable {
         typesMap.get(pbStorageInfo.getType()),
         pbStorageInfo.getMountPoint(),
         pbStorageInfo.getFinalResult(),
-        pbStorageInfo.getFilePath());
-  }
-
-  public static boolean validateStorageType(String str) {
-    return typeNames.contains(str);
+        pbStorageInfo.getFilePath(),
+        pbStorageInfo.getAvailableStorageTypes());
+  }
+
+  public static int getAvailableTypes(List<Type> types) {
+    int ava = 0;
+    for (Type type : types) {
+      switch (type) {
+        case MEMORY:
+          ava = ava | MEMORY_MASK;
+          break;
+        case HDD:
+        case SSD:
+          ava = ava | LOCAL_DISK_MASK;
+          break;
+        case HDFS:
+          ava = ava | HDFS_MASK;
+          break;
+        case OSS:
+          ava = ava | OSS_MASK;
+          break;
+      }
+    }
+    return ava;
   }
 }
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index fa91bd234..8fae4fb5c 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -89,6 +89,7 @@ message PbStorageInfo {
   string mountPoint = 2;
   bool finalResult = 3;
   string filePath = 4;
+  int32 availableStorageTypes = 5;
 }
 
 message PbPartitionLocation {
@@ -198,6 +199,7 @@ message PbRequestSlots {
   PbUserIdentifier userIdentifier = 8;
   bool shouldRackAware = 9;
   int32 maxWorkers = 10;
+  int32 availableStorageTypes = 11;
 }
 
 message PbSlotInfo {
@@ -367,6 +369,7 @@ message PbReserveSlots {
   PbUserIdentifier userIdentifier = 9;
   int64 pushDataTimeout = 10;
   bool partitionSplitEnabled = 11;
+  int32 availableStorageTypes = 12;
 }
 
 message PbReserveSlotsResponse {
@@ -496,7 +499,7 @@ message PbSnapshotMetaInfo {
   int64 partitionTotalWritten = 8;
   int64 partitionTotalFileCount = 9;
   repeated PbAppDiskUsageSnapshot appDiskUsageMetricSnapshots = 10;
-  optional PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
+  PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
   map<string, int64> lostWorkers = 12;
   repeated PbWorkerInfo shutdownWorkers = 13;
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 968386e29..2f11559b7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,7 +514,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   // //////////////////////////////////////////////////////
   def masterSlotAssignPolicy: SlotsAssignPolicy =
     SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY))
-
+  def availableStorageTypes: Int = {
+    val types = 
get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf(_)).toList
+    StorageInfo.getAvailableTypes(types.asJava)
+  }
   def hasHDFSStorage: Boolean =
     get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && 
get(HDFS_DIR).isDefined
   def masterSlotAssignLoadAwareDiskGroupNum: Int = 
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
@@ -3909,13 +3912,16 @@ object CelebornConf extends Logging {
       .createWithDefault(true)
 
   val ACTIVE_STORAGE_TYPES: ConfigEntry[String] =
-    buildConf("celeborn.storage.activeTypes")
-      .categories("master", "worker")
+    buildConf("celeborn.storage.availableTypes")
+      .withAlternative("celeborn.storage.activeTypes")
+      .categories("master", "worker", "client")
       .version("0.3.0")
-      .doc("Enabled storage levels. Available options: HDD,SSD,HDFS. ")
+      .doc(
+        "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD 
and SSD would be treated as identical.")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
-      .createWithDefault("HDD,SSD")
+      .checkValue(p => p.split(",").map(StorageInfo.validate(_)).reduce(_ && 
_), "")
+      .createWithDefault("HDD")
 
   val READ_LOCAL_SHUFFLE_FILE: ConfigEntry[Boolean] =
     buildConf("celeborn.client.readLocalShuffleFile.enabled")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index f5d11b0cf..e28508c43 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -170,6 +170,7 @@ object ControlMessages extends Logging {
       shouldRackAware: Boolean,
       userIdentifier: UserIdentifier,
       maxWorkers: Int,
+      availableStorageTypes: Int,
       override var requestId: String = ZERO_UUID)
     extends MasterRequestMessage
 
@@ -526,6 +527,7 @@ object ControlMessages extends Logging {
           shouldRackAware,
           userIdentifier,
           maxWorkers,
+          availableStorageTypes,
           requestId) =>
       val payload = PbRequestSlots.newBuilder()
         .setApplicationId(applicationId)
@@ -536,6 +538,7 @@ object ControlMessages extends Logging {
         .setShouldRackAware(shouldRackAware)
         .setMaxWorkers(maxWorkers)
         .setRequestId(requestId)
+        .setAvailableStorageTypes(availableStorageTypes)
         .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
         .build().toByteArray
       new TransportMessage(MessageType.REQUEST_SLOTS, payload)
@@ -922,6 +925,7 @@ object ControlMessages extends Logging {
           pbRequestSlots.getShouldRackAware,
           userIdentifier,
           pbRequestSlots.getMaxWorkers,
+          pbRequestSlots.getAvailableStorageTypes,
           pbRequestSlots.getRequestId)
 
       case REQUEST_SLOTS_RESPONSE_VALUE =>
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 00ed10730..0da75aebd 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf._
+import org.apache.celeborn.common.protocol.StorageInfo
 import org.apache.celeborn.common.util.Utils
 
 class CelebornConfSuite extends CelebornFunSuite {
@@ -193,11 +194,11 @@ class CelebornConfSuite extends CelebornFunSuite {
     conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
     assert(conf.workerBaseDirs.isEmpty)
 
-    conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS")
+    conf.set("celeborn.storage.activeTypes", "SSD,HDD,HDFS")
     conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
     assert(conf.workerBaseDirs.isEmpty)
 
-    conf.set("celeborn.storage.activeTypes", "SDD,HDD")
+    conf.set("celeborn.storage.activeTypes", "SSD,HDD")
     assert(!conf.workerBaseDirs.isEmpty)
   }
 
@@ -207,10 +208,25 @@ class CelebornConfSuite extends CelebornFunSuite {
     conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
     assert(conf.workerCommitThreads === 128)
 
-    conf.set("celeborn.storage.activeTypes", "SDD,HDD")
+    conf.set("celeborn.storage.activeTypes", "SSD,HDD")
     assert(conf.workerCommitThreads === 32)
   }
 
+  test("Test available storage types") {
+    val conf = new CelebornConf()
+
+    assert(conf.availableStorageTypes == StorageInfo.LOCAL_DISK_MASK)
+
+    conf.set("celeborn.storage.availableTypes", "HDD,MEMORY")
+    assert(conf.availableStorageTypes == Integer.parseInt("11", 2))
+
+    conf.set("celeborn.storage.availableTypes", "HDD,HDFS")
+    assert(conf.availableStorageTypes == (StorageInfo.HDFS_MASK | 
StorageInfo.LOCAL_DISK_MASK))
+
+    conf.set("celeborn.storage.availableTypes", "HDFS")
+    assert(conf.availableStorageTypes == StorageInfo.HDFS_MASK)
+  }
+
   test("Test role rpcDispatcherNumThreads") {
     val availableCores = 5
     val conf = new CelebornConf()
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 903af02c9..64b6f7e86 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -23,7 +23,7 @@ import java.util
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, FileInfo, 
WorkerInfo}
-import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
 import org.apache.celeborn.common.quota.ResourceConsumption
 
@@ -51,8 +51,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
   val userIdentifier1 = UserIdentifier("tenant-a", "user-a")
   val userIdentifier2 = UserIdentifier("tenant-b", "user-b")
 
-  val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000, 2000, 3000)
-  val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000, 4000, 6000)
+  val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
+  val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)
 
   val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1)
   val fileInfo2 = new FileInfo("/tmp/2", chunkOffsets2, userIdentifier2)
@@ -77,6 +77,27 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
   val partitionLocation2 =
     new PartitionLocation(1, 1, "host2", 20, 19, 18, 24, 
PartitionLocation.Mode.REPLICA)
 
+  val partitionLocation3 =
+    new PartitionLocation(2, 2, "host3", 30, 29, 28, 27, 
PartitionLocation.Mode.PRIMARY)
+  val partitionLocation4 =
+    new PartitionLocation(
+      3,
+      3,
+      "host4",
+      40,
+      39,
+      38,
+      37,
+      PartitionLocation.Mode.REPLICA,
+      partitionLocation3,
+      new StorageInfo(
+        StorageInfo.Type.HDD,
+        "mountPoint",
+        false,
+        "filePath",
+        StorageInfo.LOCAL_DISK_MASK),
+      null)
+
   val workerResource = new WorkerResource()
   workerResource.put(
     workerInfo1,
@@ -187,4 +208,17 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
 
     assert(restoredWorkerResource.equals(workerResource))
   }
+
+  test("testPbStorageInfo") {
+    val pbPartitionLocation3 = 
PbSerDeUtils.toPbPartitionLocation(partitionLocation3)
+    val pbPartitionLocation4 = 
PbSerDeUtils.toPbPartitionLocation(partitionLocation4)
+
+    val restoredPartitionLocation3 = 
PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation3)
+    val restoredPartitionLocation4 = 
PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation4)
+
+    assert(restoredPartitionLocation3.equals(partitionLocation3))
+    assert(restoredPartitionLocation4.equals(partitionLocation4))
+    
assert(restoredPartitionLocation4.getStorageInfo.equals(partitionLocation4.getStorageInfo))
+  }
+
 }
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index b5d3bbf94..24c3f27cc 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -102,5 +102,6 @@ license: |
 | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 500000 
| Celeborn will only accept shuffle of partition number lower than this 
configuration value. | 0.3.0 | 
 | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. | 
0.3.0 | 
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master 
nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 | 
+| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
 <!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 69c93774b..28515e124 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -36,6 +36,6 @@ license: |
 | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to 
assign slots, Celeborn supports two types of policy: roundrobin and loadaware. 
Loadaware policy will be ignored when `HDFS` is enabled in 
`celeborn.storage.activeTypes` | 0.3.0 | 
 | celeborn.master.userResourceConsumption.update.interval | 30s | Time length 
for a window about compute user resource consumption. | 0.3.0 | 
 | celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker 
unavailable info would be cleared when the retention period is expired | 0.3.1 
| 
-| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available 
options: HDD,SSD,HDFS.  | 0.3.0 | 
+| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
 <!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 194d13d00..15c3cf5da 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -22,7 +22,7 @@ license: |
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master 
nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 | 
 | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size 
smaller than this configuration of partition size for estimation. | 0.3.0 | 
 | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged 
shuffle data. For example, if a reducer's shuffle data is 128M and the data 
will need 16 fetch chunk requests to fetch. | 0.2.0 | 
-| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available 
options: HDD,SSD,HDFS.  | 0.3.0 | 
+| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
 | celeborn.worker.activeConnection.max | &lt;undefined&gt; | If the number of 
active connections on a worker exceeds this configuration value, the worker 
will be marked as high-load in the heartbeat report, and the master will not 
include that node in the response of RequestSlots. | 0.3.1 | 
 | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for 
read buffer per mount point. | 0.3.0 | 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 10fb31824..3c83abad6 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -53,7 +53,8 @@ public class SlotsAllocator {
           List<WorkerInfo> workers,
           List<Integer> partitionIds,
           boolean shouldReplicate,
-          boolean shouldRackAware) {
+          boolean shouldRackAware,
+          int availableStorageTypes) {
     if (partitionIds.isEmpty()) {
       return new HashMap<>();
     }
@@ -72,7 +73,13 @@ public class SlotsAllocator {
         }
       }
     }
-    return locateSlots(partitionIds, workers, restrictions, shouldReplicate, 
shouldRackAware);
+    return locateSlots(
+        partitionIds,
+        workers,
+        restrictions,
+        shouldReplicate,
+        shouldRackAware,
+        availableStorageTypes);
   }
 
   /**
@@ -90,7 +97,8 @@ public class SlotsAllocator {
           int diskGroupCount,
           double diskGroupGradient,
           double flushTimeWeight,
-          double fetchTimeWeight) {
+          double fetchTimeWeight,
+          int availableStorageTypes) {
     if (partitionIds.isEmpty()) {
       return new HashMap<>();
     }
@@ -121,7 +129,8 @@ public class SlotsAllocator {
       logger.warn(
           "offer slots for {} fallback to roundrobin because there is no 
usable disks",
           StringUtils.join(partitionIds, ','));
-      return offerSlotsRoundRobin(workers, partitionIds, shouldReplicate, 
shouldRackAware);
+      return offerSlotsRoundRobin(
+          workers, partitionIds, shouldReplicate, shouldRackAware, 
availableStorageTypes);
     }
 
     if (!initialized) {
@@ -133,14 +142,21 @@ public class SlotsAllocator {
             placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, 
fetchTimeWeight),
             diskToWorkerMap,
             shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
-    return locateSlots(partitionIds, workers, restrictions, shouldReplicate, 
shouldRackAware);
+    return locateSlots(
+        partitionIds,
+        workers,
+        restrictions,
+        shouldReplicate,
+        shouldRackAware,
+        availableStorageTypes);
   }
 
   private static StorageInfo getStorageInfo(
       List<WorkerInfo> workers,
       int workerIndex,
       Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
-      Map<WorkerInfo, Integer> workerDiskIndex) {
+      Map<WorkerInfo, Integer> workerDiskIndex,
+      int availableStorageTypes) {
     WorkerInfo selectedWorker = workers.get(workerIndex);
     List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
     int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
@@ -148,7 +164,9 @@ public class SlotsAllocator {
       diskIndex = (diskIndex + 1) % usableDiskInfos.size();
     }
     usableDiskInfos.get(diskIndex).usableSlots--;
-    StorageInfo storageInfo = new 
StorageInfo(usableDiskInfos.get(diskIndex).diskInfo.mountPoint());
+    StorageInfo storageInfo =
+        new StorageInfo(
+            usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), 
availableStorageTypes);
     workerDiskIndex.put(selectedWorker, (diskIndex + 1) % 
usableDiskInfos.size());
     return storageInfo;
   }
@@ -165,7 +183,8 @@ public class SlotsAllocator {
           List<WorkerInfo> workers,
           Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
           boolean shouldReplicate,
-          boolean shouldRackAware) {
+          boolean shouldRackAware,
+          int activeStorageTypes) {
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
         new HashMap<>();
 
@@ -176,12 +195,15 @@ public class SlotsAllocator {
             new LinkedList<>(restrictions.keySet()),
             restrictions,
             shouldReplicate,
-            shouldRackAware);
+            shouldRackAware,
+            activeStorageTypes);
     if (!remain.isEmpty()) {
-      remain = roundRobin(slots, remain, workers, null, shouldReplicate, 
shouldRackAware);
+      remain =
+          roundRobin(
+              slots, remain, workers, null, shouldReplicate, shouldRackAware, 
activeStorageTypes);
     }
     if (!remain.isEmpty()) {
-      roundRobin(slots, remain, workers, null, shouldReplicate, false);
+      roundRobin(slots, remain, workers, null, shouldReplicate, false, 
activeStorageTypes);
     }
     return slots;
   }
@@ -192,7 +214,8 @@ public class SlotsAllocator {
       List<WorkerInfo> workers,
       Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
       boolean shouldReplicate,
-      boolean shouldRackAware) {
+      boolean shouldRackAware,
+      int availableStorageTypes) {
     // workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
     Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
     Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
@@ -213,7 +236,12 @@ public class SlotsAllocator {
           }
         }
         storageInfo =
-            getStorageInfo(workers, nextPrimaryInd, restrictions, 
workerDiskIndexForPrimary);
+            getStorageInfo(
+                workers,
+                nextPrimaryInd,
+                restrictions,
+                workerDiskIndexForPrimary,
+                availableStorageTypes);
       }
       PartitionLocation primaryPartition =
           createLocation(partitionId, workers.get(nextPrimaryInd), null, 
storageInfo, true);
@@ -229,7 +257,12 @@ public class SlotsAllocator {
             }
           }
           storageInfo =
-              getStorageInfo(workers, nextReplicaInd, restrictions, 
workerDiskIndexForReplica);
+              getStorageInfo(
+                  workers,
+                  nextReplicaInd,
+                  restrictions,
+                  workerDiskIndexForReplica,
+                  availableStorageTypes);
         } else if (shouldRackAware) {
           while (!satisfyRackAware(true, workers, nextPrimaryInd, 
nextReplicaInd)) {
             nextReplicaInd = (nextReplicaInd + 1) % workers.size();
@@ -458,10 +491,13 @@ public class SlotsAllocator {
       jointLocations.addAll(slots.get(worker)._2);
       for (PartitionLocation location : jointLocations) {
         String mountPoint = location.getStorageInfo().getMountPoint();
-        if (slotsPerDisk.containsKey(mountPoint)) {
-          slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
-        } else {
-          slotsPerDisk.put(mountPoint, 1);
+        // ignore slots for UNKNOWN_DISK
+        if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) {
+          if (slotsPerDisk.containsKey(mountPoint)) {
+            slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
+          } else {
+            slotsPerDisk.put(mountPoint, 1);
+          }
         }
       }
     }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 51b03174c..1285c8ff3 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -353,7 +353,7 @@ private[celeborn] class Master(
       // keep it for compatible reason
       context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
 
-    case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
+    case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _) =>
       logTrace(s"Received RequestSlots request $requestSlots.")
       executeWithLeaderChecker(context, handleRequestSlots(context, 
requestSlots))
 
@@ -669,13 +669,15 @@ private[celeborn] class Master(
               slotsAssignLoadAwareDiskGroupNum,
               slotsAssignLoadAwareDiskGroupGradient,
               loadAwareFlushTimeWeight,
-              loadAwareFetchTimeWeight)
+              loadAwareFetchTimeWeight,
+              requestSlots.availableStorageTypes)
           } else {
             SlotsAllocator.offerSlotsRoundRobin(
               selectedWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
-              requestSlots.shouldRackAware)
+              requestSlots.shouldRackAware,
+              requestSlots.availableStorageTypes)
           }
         }
       }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index bc7b54b33..165e25148 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver;
 
 public class SlotsAllocatorRackAwareSuiteJ {
@@ -67,7 +68,8 @@ public class SlotsAllocatorRackAwareSuiteJ {
     List<WorkerInfo> workers = prepareWorkers(resolver);
 
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, true, true);
+        SlotsAllocator.offerSlotsRoundRobin(
+            workers, partitionIds, true, true, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
 
     Consumer<PartitionLocation> assertCustomer =
         new Consumer<PartitionLocation>() {
@@ -102,7 +104,8 @@ public class SlotsAllocatorRackAwareSuiteJ {
     List<WorkerInfo> workers = prepareWorkers(resolver);
 
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, true, true);
+        SlotsAllocator.offerSlotsRoundRobin(
+            workers, partitionIds, true, true, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
 
     Consumer<PartitionLocation> assertConsumer =
         new Consumer<PartitionLocation>() {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index ccdeb5b2d..e82742bab 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -36,6 +36,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.protocol.StorageInfo;
 
 public class SlotsAllocatorSuiteJ {
   private List<WorkerInfo> prepareWorkers(boolean hasDisks) {
@@ -239,7 +240,8 @@ public class SlotsAllocatorSuiteJ {
             conf.masterSlotAssignLoadAwareDiskGroupNum(),
             conf.masterSlotAssignLoadAwareDiskGroupGradient(),
             conf.masterSlotAssignLoadAwareFlushTimeWeight(),
-            conf.masterSlotAssignLoadAwareFetchTimeWeight());
+            conf.masterSlotAssignLoadAwareFetchTimeWeight(),
+            StorageInfo.ALL_TYPES_AVAILABLE_MASK);
     if (expectSuccess) {
       if (shouldReplicate) {
         slots.forEach(
@@ -275,10 +277,11 @@ public class SlotsAllocatorSuiteJ {
         allocateToDiskSlots += worker.usedSlots();
       }
       if (shouldReplicate) {
-        Assert.assertEquals(partitionIds.size() * 2, unknownDiskSlots + 
allocateToDiskSlots);
+        Assert.assertTrue(partitionIds.size() * 2 >= unknownDiskSlots + 
allocateToDiskSlots);
       } else {
-        Assert.assertEquals(partitionIds.size(), unknownDiskSlots + 
allocateToDiskSlots);
+        Assert.assertTrue(partitionIds.size() >= unknownDiskSlots + 
allocateToDiskSlots);
       }
+      Assert.assertEquals(0, unknownDiskSlots);
     } else {
       assert slots.isEmpty()
           : "Expect to fail to offer slots, but return " + slots.size() + " 
slots.";
@@ -294,7 +297,8 @@ public class SlotsAllocatorSuiteJ {
     CelebornConf conf = new CelebornConf();
     conf.set("celeborn.active.storage.levels", "HDFS");
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, 
shouldReplicate, false);
+        SlotsAllocator.offerSlotsRoundRobin(
+            workers, partitionIds, shouldReplicate, false, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
 
     int allocatedPartitionCount = 0;
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 27c225378..2daa76409 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -359,7 +359,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         throw new IOException(s"No available disks! suggested mountPoint 
$suggestedMountPoint")
       }
       val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
-      if (dirs.isEmpty) {
+      if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
         val shuffleDir =
           new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
         val fileInfo =
@@ -397,7 +397,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         }
         hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
         return hdfsWriter
-      } else {
+      } else if (dirs.nonEmpty && 
location.getStorageInfo.localDiskAvailable()) {
         val dir = dirs(getNextIndex() % dirs.size)
         val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, 
mountPoints)
         val shuffleDir = new File(dir, s"$appId/$shuffleId")
@@ -468,6 +468,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
               exception,
               DiskStatus.READ_OR_WRITE_FAILURE)
         }
+      } else {
+        exception = new IOException("No storage available for location:" + 
location.toString)
       }
       retryCount += 1
     }


Reply via email to