This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 113311df3 [CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate 
all slots to disk
113311df3 is described below

commit 113311df3e99ce58715aaf6feedb62dd96f1e8b3
Author: mingji <[email protected]>
AuthorDate: Tue Nov 28 11:26:00 2023 +0800

    [CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate all slots to disk
    
    ### What changes were proposed in this pull request?
    1. Remove UNKNOWN_DISK from StorageInfo.
    2. Enable load-aware slots allocation when there is HDFS.
    
    ### Why are the changes needed?
    To support the application's config about available storage types.
    
    ### Does this PR introduce _any_ user-facing change?
    no.
    
    ### How was this patch tested?
    GA and Cluster.
    
    Closes #2098 from FMX/B1081-1.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/common/protocol/StorageInfo.java      |  50 +++----
 common/src/main/proto/TransportMessages.proto      |   1 +
 .../apache/celeborn/common/meta/DeviceInfo.scala   |  21 ++-
 .../apache/celeborn/common/meta/WorkerInfo.scala   |  10 +-
 .../apache/celeborn/common/util/PbSerDeUtils.scala |   8 +-
 .../common/protocol/PartitionLocationSuiteJ.java   |   8 +-
 .../celeborn/common/meta/WorkerInfoSuite.scala     |   6 +-
 docs/developers/slotsallocation.md                 |  75 +++++++++++
 .../service/deploy/master/SlotsAllocator.java      | 148 ++++++++++++++++-----
 .../deploy/master/clustermeta/MetaUtil.java        |   5 +-
 master/src/main/proto/Resource.proto               |   1 +
 .../celeborn/service/deploy/master/Master.scala    |   2 +-
 .../deploy/master/SlotsAllocatorSuiteJ.java        | 121 ++++++++++++++---
 .../service/deploy/worker/storage/FileWriter.java  |   3 +-
 .../celeborn/service/deploy/worker/Worker.scala    |   2 +-
 .../deploy/worker/storage/StorageManager.scala     |   8 +-
 16 files changed, 375 insertions(+), 94 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java 
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index d3e1bf95b..5b59a10ed 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -38,7 +38,6 @@ public class StorageInfo implements Serializable {
     }
   }
 
-  @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
   public static Map<Integer, Type> typesMap = new HashMap<>();
   public static Set<String> typeNames = new HashSet<>();
 
@@ -57,7 +56,7 @@ public class StorageInfo implements Serializable {
 
   // Default storage Type is MEMORY.
   private Type type = Type.MEMORY;
-  private String mountPoint = UNKNOWN_DISK;
+  private String mountPoint = "";
   // if a file is committed, field "finalResult" will be true
   private boolean finalResult = false;
   private String filePath;
@@ -72,27 +71,10 @@ public class StorageInfo implements Serializable {
     this.filePath = filePath;
   }
 
-  public StorageInfo(String mountPoint, int availableStorageTypes) {
+  public StorageInfo(String mountPoint, StorageInfo.Type type, int 
availableStorageTypes) {
     this.mountPoint = mountPoint;
-    this.availableStorageTypes = availableStorageTypes;
-  }
-
-  public StorageInfo(Type type, String mountPoint) {
-    this.type = type;
-    this.mountPoint = mountPoint;
-  }
-
-  public StorageInfo(Type type, String mountPoint, boolean finalResult) {
-    this.type = type;
-    this.mountPoint = mountPoint;
-    this.finalResult = finalResult;
-  }
-
-  public StorageInfo(Type type, String mountPoint, boolean finalResult, String 
filePath) {
     this.type = type;
-    this.mountPoint = mountPoint;
-    this.finalResult = finalResult;
-    this.filePath = filePath;
+    this.availableStorageTypes = availableStorageTypes;
   }
 
   public StorageInfo(
@@ -147,21 +129,41 @@ public class StorageInfo implements Serializable {
         + '}';
   }
 
-  public boolean localDiskAvailable() {
+  public static boolean localDiskAvailable(int availableStorageTypes) {
     return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
         || (availableStorageTypes & LOCAL_DISK_MASK) > 0;
   }
 
-  public boolean HDFSAvailable() {
+  public boolean localDiskAvailable() {
+    return StorageInfo.localDiskAvailable(availableStorageTypes);
+  }
+
+  public static boolean HDFSAvailable(int availableStorageTypes) {
     return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
         || (availableStorageTypes & HDFS_MASK) > 0;
   }
 
-  public boolean OSSAvailable() {
+  public boolean HDFSAvailable() {
+    return StorageInfo.HDFSAvailable(availableStorageTypes);
+  }
+
+  public static boolean HDFSOnly(int availableStorageTypes) {
+    return availableStorageTypes == HDFS_MASK;
+  }
+
+  public boolean HDFSOnly() {
+    return StorageInfo.HDFSOnly(availableStorageTypes);
+  }
+
+  public static boolean OSSAvailable(int availableStorageTypes) {
     return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
         || (availableStorageTypes & OSS_MASK) > 0;
   }
 
+  public boolean OSSAvailable() {
+    return StorageInfo.OSSAvailable(availableStorageTypes);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index a595a785d..b33271e0e 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -137,6 +137,7 @@ message PbDiskInfo {
   int64 usedSlots = 4;
   int32 status = 5;
   int64 avgFetchTime = 6;
+  int32 storageType = 7;
 }
 
 message PbWorkerInfo {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index 56933a2cf..990a2b127 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -49,6 +49,17 @@ class DiskInfo(
     this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots, 
List.empty, null)
   }
 
+  def this(
+      mountPoint: String,
+      usableSpace: Long,
+      avgFlushTime: Long,
+      avgFetchTime: Long,
+      activeSlots: Long,
+      storageType: StorageInfo.Type) = {
+    this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots, 
List.empty, null)
+    this.storageType = storageType
+  }
+
   def this(
       mountPoint: String,
       dirs: List[File],
@@ -70,10 +81,14 @@ class DiskInfo(
   var status: DiskStatus = DiskStatus.HEALTHY
   var threadCount = 1
   var configuredUsableSpace = 0L
-  var storageType: StorageInfo.Type = _
+  var storageType: StorageInfo.Type = StorageInfo.Type.SSD
   var maxSlots: Long = 0
   lazy val shuffleAllocations = new util.HashMap[String, Integer]()
 
+  def setStorageType(storageType: StorageInfo.Type) = {
+    this.storageType = storageType
+  }
+
   def setStatus(status: DiskStatus): this.type = this.synchronized {
     this.status = status
     this
@@ -145,9 +160,11 @@ class DiskInfo(
       s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
       s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
       s" avgFetchTime: ${Utils.nanoDurationToString(avgFetchTime)}," +
-      s" activeSlots: $activeSlots)" +
+      s" activeSlots: $activeSlots," +
+      s" storageType: ${storageType})" +
       s" status: $status" +
       s" dirs ${dirs.mkString("\t")}"
+
   }
 }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 65dc22918..05ea35d63 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.StorageInfo
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc.RpcEndpointRef
 import org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef
@@ -156,12 +157,12 @@ class WorkerInfo(
         curDisk.activeSlots = newDisk.activeSlots
         curDisk.avgFlushTime = newDisk.avgFlushTime
         curDisk.avgFetchTime = newDisk.avgFetchTime
-        if (estimatedPartitionSize.nonEmpty) {
+        if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS) {
           curDisk.maxSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
         }
         curDisk.setStatus(newDisk.status)
       } else {
-        if (estimatedPartitionSize.nonEmpty) {
+        if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS) {
           newDisk.maxSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
         }
         diskInfos.put(mountPoint, newDisk)
@@ -239,6 +240,11 @@ class WorkerInfo(
     result = 31 * result + replicatePort.hashCode()
     result
   }
+
+  def haveDisk(): Boolean = {
+    diskInfos.values().asScala.exists(p =>
+      p.storageType == StorageInfo.Type.SSD || p.storageType == 
StorageInfo.Type.HDD)
+  }
 }
 
 object WorkerInfo {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 2748d6cf3..d681152f7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -63,14 +63,17 @@ object PbSerDeUtils {
       .setMinor(minor)
       .build.toByteArray
 
-  def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo =
-    new DiskInfo(
+  def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo = {
+    val diskInfo = new DiskInfo(
       pbDiskInfo.getMountPoint,
       pbDiskInfo.getUsableSpace,
       pbDiskInfo.getAvgFlushTime,
       pbDiskInfo.getAvgFetchTime,
       pbDiskInfo.getUsedSlots)
       .setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus))
+    
diskInfo.setStorageType(StorageInfo.typesMap.get(pbDiskInfo.getStorageType))
+    diskInfo
+  }
 
   def toPbDiskInfo(diskInfo: DiskInfo): PbDiskInfo =
     PbDiskInfo.newBuilder
@@ -80,6 +83,7 @@ object PbSerDeUtils {
       .setAvgFetchTime(diskInfo.avgFetchTime)
       .setUsedSlots(diskInfo.activeSlots)
       .setStatus(diskInfo.status.getValue)
+      .setStorageType(diskInfo.storageType.getValue)
       .build
 
   def fromPbFileInfo(pbFileInfo: PbFileInfo): FileInfo =
diff --git 
a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
index 927b3e9c1..e4200c913 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
@@ -178,7 +178,9 @@ public class PartitionLocationSuiteJ {
     PartitionLocation location2 =
         new PartitionLocation(
             partitionId, epoch, host, rpcPort, pushPort, fetchPort, 
replicatePort, mode, peer);
-    StorageInfo storageInfo = new StorageInfo(StorageInfo.Type.MEMORY, 
"/mnt/disk/0");
+    StorageInfo storageInfo =
+        new StorageInfo(
+            "/mnt/disk/0", StorageInfo.Type.MEMORY, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
     RoaringBitmap bitmap = new RoaringBitmap();
     bitmap.add(1);
     bitmap.add(2);
@@ -205,7 +207,7 @@ public class PartitionLocationSuiteJ {
             + "  
host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
             + "  mode:PRIMARY\n"
             + "  peer:(empty)\n"
-            + "  storage hint:StorageInfo{type=MEMORY, 
mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+            + "  storage hint:StorageInfo{type=MEMORY, mountPoint='', 
finalResult=false, filePath=null}\n"
             + "  mapIdBitMap:{}]";
     String exp2 =
         "PartitionLocation[\n"
@@ -213,7 +215,7 @@ public class PartitionLocationSuiteJ {
             + "  
host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
             + "  mode:PRIMARY\n"
             + "  
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n"
-            + "  storage hint:StorageInfo{type=MEMORY, 
mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+            + "  storage hint:StorageInfo{type=MEMORY, mountPoint='', 
finalResult=false, filePath=null}\n"
             + "  mapIdBitMap:{}]";
     String exp3 =
         "PartitionLocation[\n"
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index b42862924..ebd83682e 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -284,9 +284,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
            |SlotsUsed: 60
            |LastHeartbeat: 0
            |Disks: $placeholder
-           |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, 
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs 
$placeholder
-           |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, 
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs 
$placeholder
-           |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, 
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs 
$placeholder
+           |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, 
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) 
status: HEALTHY dirs $placeholder
+           |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, 
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) 
status: HEALTHY dirs $placeholder
+           |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, 
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) 
status: HEALTHY dirs $placeholder
            |UserResourceConsumption: $placeholder
            |  UserIdentifier: `tenant1`.`name1`, ResourceConsumption: 
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, 
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
            |WorkerRef: null
diff --git a/docs/developers/slotsallocation.md 
b/docs/developers/slotsallocation.md
index e69de29bb..78644b16c 100644
--- a/docs/developers/slotsallocation.md
+++ b/docs/developers/slotsallocation.md
@@ -0,0 +1,75 @@
+---
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+# Slots allocation
+
+This article describes the detailed design of Celeborn workers' slots 
allocation.
+Slots allocation is the core components about how Celeborn distribute workload 
amount workers.
+We have achieved two approaches of slots allocation.
+
+## Principle
+Allocate slots to local disks unless explicit assigned to HDFS.
+
+## LoadAware
+### Related configs
+```properties
+celeborn.master.slot.assign.policy LOADAWARE
+celeborn.master.slot.assign.loadAware.numDiskGroups 5
+celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1
+celeborn.master.slot.assign.loadAware.flushTimeWeight 0
+celeborn.master.slot.assign.loadAware.fetchTimeWeight 0
+[spark.client.]celeborn.storage.availableTypes HDD,SSD
+```
+### Detail
+Load-aware slots allocation will take following elements into consideration.
+- disk's fetch time 
+- disk's flush time 
+- disk's usable space
+- disk's used slot 
+
+Slots allocator will find out all worker involved in this allocation and sort 
their disks by 
+`disk's average flushtime * flush time weight + disk's average fetch time * 
fetch time weight`.
+After getting the sorted disks list, Celeborn will split the disks into
+`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number 
to be placed into a disk group 
+is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` 
which means that a group's 
+allocated slots number will be 
(1+`celeborn.master.slot.assign.loadAware.diskGroupGradient`) 
+times to the group's slower than it.
+For example, there is 5 groups, G1 , G2, G3, G4 and G5. If the G5 is allocated 
100 slots.
+Other groups will be G4:110, G3:121, G2:133, G1:146.
+
+After Celeborn has decided the slots number of a disk group, slots will be 
distributed in disks of a disk group.
+Each disk has a usableSlots which is calculated by `(disk's usable 
space)/(average partition size)-usedSlots`. 
+The slots number to allocate in a disk is calculated by ` slots of this disk 
group * ( current disk's usableSlots / the sum of all disks' usableSlots in 
this group)`.
+For example, G5 need to allocate 100 slots and have 3 disks D1 with usable 
slots 100, D2 with usable slots 50, D3 with usable slots 20.
+The distribution will be D1:59, D2: 29, D3: 12.
+
+If all slots can be place in disk groups, the slots allocation process is 
done. 
+
+requested slots are more than all usable slots, slots can not be placed into 
disks.
+Worker will need to allocate these slots to workers with local disks one by 
one.
+
+## RoundRobin
+### Detail
+Roundrobin slots allocation will distribute all slots into all registered 
workers with disks. Celeborn will treat 
+all workers as an array and place 1 slots in a worker until all slots are 
allocated. 
+If a worker has multiple disks, the chosen disk index is `(monotone increasing 
disk index +1)  % disk count`.  
+
+## Celeborn Worker's Behavior
+1. When reserve slots Celeborn worker will decide a slot be placed in local 
disks or HDFS when reserve slots.
+2. If a partition is evicted from memory, the partition might be placed in 
HDFS.
+3. If a slot is explicitly assigned to HDFS, worker will put the slot in HDFS. 
\ No newline at end of file
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index e964e57e5..e54ba8763 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -18,6 +18,7 @@
 package org.apache.celeborn.service.deploy.master;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 import scala.Double;
 import scala.Option;
@@ -64,22 +65,30 @@ public class SlotsAllocator {
     if (workers.size() < 2 && shouldReplicate) {
       return new HashMap<>();
     }
-    Map<WorkerInfo, List<UsableDiskInfo>> restrictions = new HashMap<>();
+    Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions = new HashMap<>();
     for (WorkerInfo worker : workers) {
       List<UsableDiskInfo> usableDisks =
-          restrictions.computeIfAbsent(worker, v -> new ArrayList<>());
+          slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
       for (Map.Entry<String, DiskInfo> diskInfoEntry : 
worker.diskInfos().entrySet()) {
         if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
-          usableDisks.add(
-              new UsableDiskInfo(
-                  diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+          if (StorageInfo.localDiskAvailable(availableStorageTypes)
+              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.HDFS) {
+            usableDisks.add(
+                new UsableDiskInfo(
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+          } else if (StorageInfo.HDFSAvailable(availableStorageTypes)
+              && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.HDFS) {
+            usableDisks.add(
+                new UsableDiskInfo(
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+          }
         }
       }
     }
     return locateSlots(
         partitionIds,
         workers,
-        restrictions,
+        slotsRestrictions,
         shouldReplicate,
         shouldRackAware,
         availableStorageTypes);
@@ -109,6 +118,10 @@ public class SlotsAllocator {
     if (workers.size() < 2 && shouldReplicate) {
       return new HashMap<>();
     }
+    if (StorageInfo.HDFSOnly(availableStorageTypes)) {
+      return offerSlotsRoundRobin(
+          workers, partitionIds, shouldReplicate, shouldRackAware, 
availableStorageTypes);
+    }
 
     List<DiskInfo> usableDisks = new ArrayList<>();
     Map<DiskInfo, WorkerInfo> diskToWorkerMap = new HashMap<>();
@@ -126,7 +139,8 @@ public class SlotsAllocator {
                                   diskReserveRatio.isEmpty()
                                       ? Option.empty()
                                       : Option.apply(diskReserveRatio.get()))
-                          && diskInfo.status().equals(DiskStatus.HEALTHY)) {
+                          && diskInfo.status().equals(DiskStatus.HEALTHY)
+                          && diskInfo.storageType() != StorageInfo.Type.HDFS) {
                         usableDisks.add(diskInfo);
                       }
                     }));
@@ -151,15 +165,15 @@ public class SlotsAllocator {
       initLoadAwareAlgorithm(diskGroupCount, diskGroupGradient);
     }
 
-    Map<WorkerInfo, List<UsableDiskInfo>> restrictions =
-        getRestriction(
+    Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions =
+        getSlotsRestrictionsByLoadAwareAlgorithm(
             placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, 
fetchTimeWeight),
             diskToWorkerMap,
             shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
     return locateSlots(
         partitionIds,
         workers,
-        restrictions,
+        slotsRestrictions,
         shouldReplicate,
         shouldRackAware,
         availableStorageTypes);
@@ -172,16 +186,43 @@ public class SlotsAllocator {
       Map<WorkerInfo, Integer> workerDiskIndex,
       int availableStorageTypes) {
     WorkerInfo selectedWorker = workers.get(workerIndex);
-    List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
+    StorageInfo storageInfo;
     int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
-    while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
-      diskIndex = (diskIndex + 1) % usableDiskInfos.size();
+    if (restrictions != null) {
+      List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
+      while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
+        diskIndex = (diskIndex + 1) % usableDiskInfos.size();
+      }
+      usableDiskInfos.get(diskIndex).usableSlots--;
+      DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
+      if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
+        storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, 
availableStorageTypes);
+      } else {
+        storageInfo =
+            new StorageInfo(
+                selectedDiskInfo.mountPoint(),
+                selectedDiskInfo.storageType(),
+                availableStorageTypes);
+        workerDiskIndex.put(selectedWorker, (diskIndex + 1) % 
usableDiskInfos.size());
+      }
+    } else {
+      if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
+        DiskInfo[] diskInfos =
+            selectedWorker.diskInfos().values().stream()
+                .filter(p -> p.storageType() != StorageInfo.Type.HDFS)
+                .collect(Collectors.toList())
+                .toArray(new DiskInfo[0]);
+        storageInfo =
+            new StorageInfo(
+                diskInfos[diskIndex].mountPoint(),
+                diskInfos[diskIndex].storageType(),
+                availableStorageTypes);
+        diskIndex = (diskIndex + 1) % diskInfos.length;
+        workerDiskIndex.put(selectedWorker, (diskIndex + 1) % 
diskInfos.length);
+      } else {
+        storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, 
availableStorageTypes);
+      }
     }
-    usableDiskInfos.get(diskIndex).usableSlots--;
-    StorageInfo storageInfo =
-        new StorageInfo(
-            usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), 
availableStorageTypes);
-    workerDiskIndex.put(selectedWorker, (diskIndex + 1) % 
usableDiskInfos.size());
     return storageInfo;
   }
 
@@ -195,10 +236,10 @@ public class SlotsAllocator {
       locateSlots(
           List<Integer> partitionIds,
           List<WorkerInfo> workers,
-          Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
+          Map<WorkerInfo, List<UsableDiskInfo>> slotRestrictions,
           boolean shouldReplicate,
           boolean shouldRackAware,
-          int activeStorageTypes) {
+          int availableStorageTypes) {
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
         new HashMap<>();
 
@@ -206,18 +247,24 @@ public class SlotsAllocator {
         roundRobin(
             slots,
             partitionIds,
-            new LinkedList<>(restrictions.keySet()),
-            restrictions,
+            new LinkedList<>(slotRestrictions.keySet()),
+            slotRestrictions,
             shouldReplicate,
             shouldRackAware,
-            activeStorageTypes);
+            availableStorageTypes);
     if (!remain.isEmpty()) {
       remain =
           roundRobin(
-              slots, remain, workers, null, shouldReplicate, shouldRackAware, 
activeStorageTypes);
+              slots,
+              remain,
+              workers,
+              null,
+              shouldReplicate,
+              shouldRackAware,
+              availableStorageTypes);
     }
     if (!remain.isEmpty()) {
-      roundRobin(slots, remain, workers, null, shouldReplicate, false, 
activeStorageTypes);
+      roundRobin(slots, remain, workers, null, shouldReplicate, false, 
availableStorageTypes);
     }
     return slots;
   }
@@ -226,7 +273,7 @@ public class SlotsAllocator {
       Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots,
       List<Integer> partitionIds,
       List<WorkerInfo> workers,
-      Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
+      Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
       boolean shouldReplicate,
       boolean shouldRackAware,
       int availableStorageTypes) {
@@ -241,9 +288,10 @@ public class SlotsAllocator {
       int nextPrimaryInd = primaryIndex;
 
       int partitionId = iter.next();
-      StorageInfo storageInfo = new StorageInfo();
-      if (restrictions != null) {
-        while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) {
+      StorageInfo storageInfo;
+      if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
+        // this means that we'll select a mount point
+        while (!haveUsableSlots(slotsRestrictions, workers, nextPrimaryInd)) {
           nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
           if (nextPrimaryInd == primaryIndex) {
             break outer;
@@ -253,17 +301,29 @@ public class SlotsAllocator {
             getStorageInfo(
                 workers,
                 nextPrimaryInd,
-                restrictions,
+                slotsRestrictions,
                 workerDiskIndexForPrimary,
                 availableStorageTypes);
+      } else {
+        if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
+          while (!workers.get(nextPrimaryInd).haveDisk()) {
+            nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
+            if (nextPrimaryInd == primaryIndex) {
+              break outer;
+            }
+          }
+        }
+        storageInfo =
+            getStorageInfo(
+                workers, nextPrimaryInd, null, workerDiskIndexForPrimary, 
availableStorageTypes);
       }
       PartitionLocation primaryPartition =
           createLocation(partitionId, workers.get(nextPrimaryInd), null, 
storageInfo, true);
 
       if (shouldReplicate) {
         int nextReplicaInd = (nextPrimaryInd + 1) % workers.size();
-        if (restrictions != null) {
-          while (!haveUsableSlots(restrictions, workers, nextReplicaInd)
+        if (slotsRestrictions != null) {
+          while (!haveUsableSlots(slotsRestrictions, workers, nextReplicaInd)
               || !satisfyRackAware(shouldRackAware, workers, nextPrimaryInd, 
nextReplicaInd)) {
             nextReplicaInd = (nextReplicaInd + 1) % workers.size();
             if (nextReplicaInd == nextPrimaryInd) {
@@ -274,7 +334,7 @@ public class SlotsAllocator {
               getStorageInfo(
                   workers,
                   nextReplicaInd,
-                  restrictions,
+                  slotsRestrictions,
                   workerDiskIndexForReplica,
                   availableStorageTypes);
         } else if (shouldRackAware) {
@@ -284,6 +344,18 @@ public class SlotsAllocator {
               break outer;
             }
           }
+        } else {
+          if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
+            while (!workers.get(nextPrimaryInd).haveDisk()) {
+              nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
+              if (nextPrimaryInd == primaryIndex) {
+                break outer;
+              }
+            }
+          }
+          storageInfo =
+              getStorageInfo(
+                  workers, nextReplicaInd, null, workerDiskIndexForReplica, 
availableStorageTypes);
         }
         PartitionLocation replicaPartition =
             createLocation(
@@ -369,7 +441,11 @@ public class SlotsAllocator {
     return diskGroups;
   }
 
-  private static Map<WorkerInfo, List<UsableDiskInfo>> getRestriction(
+  /**
+   * This method implement the load aware slots allocation algorithm. See 
details at
+   * /docs/developers/slotsallocation.md
+   */
+  private static Map<WorkerInfo, List<UsableDiskInfo>> 
getSlotsRestrictionsByLoadAwareAlgorithm(
       List<List<DiskInfo>> groups, Map<DiskInfo, WorkerInfo> diskWorkerMap, 
int partitionCnt) {
     int groupSize = groups.size();
     long[] groupAllocations = new long[groupSize];
@@ -505,8 +581,8 @@ public class SlotsAllocator {
       jointLocations.addAll(slots.get(worker)._2);
       for (PartitionLocation location : jointLocations) {
         String mountPoint = location.getStorageInfo().getMountPoint();
-        // ignore slots for UNKNOWN_DISK
-        if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) {
+        // skip non local disks slots
+        if (!mountPoint.isEmpty()) {
           if (slotsPerDisk.containsKey(mountPoint)) {
             slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
           } else {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
index ec3094fc2..03e1d21e3 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
@@ -24,6 +24,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.identity.UserIdentifier$;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.util.Utils;
 
@@ -61,7 +62,8 @@ public class MetaUtil {
                   v.getUsableSpace(),
                   v.getAvgFlushTime(),
                   v.getAvgFetchTime(),
-                  v.getUsedSlots());
+                  v.getUsedSlots(),
+                  StorageInfo.typesMap.get(v.getStorageType()));
           diskInfo.setStatus(Utils.toDiskStatus(v.getStatus()));
           map.put(k, diskInfo);
         });
@@ -81,6 +83,7 @@ public class MetaUtil {
                     .setAvgFlushTime(v.avgFlushTime())
                     .setAvgFetchTime(v.avgFetchTime())
                     .setUsedSlots(v.activeSlots())
+                    .setStorageType(v.storageType().getValue())
                     .setStatus(v.status().getValue())
                     .build()));
     return map;
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index c7cde94dc..78dc477bd 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -66,6 +66,7 @@ message DiskInfo {
   required int64 usedSlots = 4;
   required int32 status = 5;
   required int64 avgFetchTime = 6;
+  required int32 storageType =7;
 }
 
 message RequestSlotsRequest {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1c03903a7..e17bb46bf 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -687,7 +687,7 @@ private[celeborn] class Master(
     val slots =
       masterSource.sample(MasterSource.OFFER_SLOTS_TIME, 
s"offerSlots-${Random.nextInt()}") {
         statusSystem.workers.synchronized {
-          if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && 
!hasHDFSStorage) {
+          if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
             SlotsAllocator.offerSlotsLoadAware(
               selectedWorkers,
               requestSlots.partitionIdList,
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 92f2ecfbd..fd724cd70 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -17,15 +17,7 @@
 
 package org.apache.celeborn.service.deploy.master;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 
 import scala.Option;
 import scala.Tuple2;
@@ -294,16 +286,34 @@ public class SlotsAllocatorSuiteJ {
       List<WorkerInfo> workers,
       List<Integer> partitionIds,
       boolean shouldReplicate,
-      boolean expectSuccess) {
+      boolean expectSuccess,
+      boolean roundrobin) {
     String shuffleKey = "appId-1";
     CelebornConf conf = new CelebornConf();
     conf.set("celeborn.active.storage.levels", "HDFS");
-    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        SlotsAllocator.offerSlotsRoundRobin(
-            workers, partitionIds, shouldReplicate, false, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
-
+    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots;
+    if (roundrobin) {
+      slots =
+          SlotsAllocator.offerSlotsRoundRobin(
+              workers, partitionIds, shouldReplicate, false, 
StorageInfo.HDFS_MASK);
+    } else {
+      slots =
+          SlotsAllocator.offerSlotsLoadAware(
+              workers,
+              partitionIds,
+              shouldReplicate,
+              false,
+              1000_000_000,
+              Option.empty(),
+              3,
+              0.1,
+              0,
+              1,
+              StorageInfo.LOCAL_DISK_MASK | StorageInfo.HDFS_MASK);
+    }
     int allocatedPartitionCount = 0;
-
+    Map<WorkerInfo, Map<String, Integer>> slotsDistribution =
+        SlotsAllocator.slotsToDiskAllocations(slots);
     for (Map.Entry<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>>
         workerToPartitions : slots.entrySet()) {
       WorkerInfo workerInfo = workerToPartitions.getKey();
@@ -332,7 +342,86 @@ public class SlotsAllocatorSuiteJ {
       partitionIds.add(i);
     }
     final boolean shouldReplicate = true;
-    checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true);
+    checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true);
+  }
+
+  @Test
+  public void testLocalDisksAndHDFSOnRoundRobin() {
+    final List<WorkerInfo> workers = prepareWorkers(true);
+    DiskInfo hdfs1 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    DiskInfo hdfs2 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    DiskInfo hdfs3 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    hdfs1.maxSlots_$eq(Long.MAX_VALUE);
+    hdfs2.maxSlots_$eq(Long.MAX_VALUE);
+    hdfs3.maxSlots_$eq(Long.MAX_VALUE);
+    workers.get(0).diskInfos().put("HDFS", hdfs1);
+    workers.get(1).diskInfos().put("HDFS", hdfs2);
+    workers.get(2).diskInfos().put("HDFS", hdfs3);
+    final List<Integer> partitionIds = new ArrayList<>();
+    for (int i = 0; i < 3000; i++) {
+      partitionIds.add(i);
+    }
+    final boolean shouldReplicate = true;
+    checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true);
+  }
+
+  @Test
+  public void testLocalDisksAndHDFSOnLoadAware() {
+    final List<WorkerInfo> workers = prepareWorkers(true);
+    DiskInfo hdfs1 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    DiskInfo hdfs2 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    //    DiskInfo hdfs3 = new DiskInfo("HDFS", Long.MAX_VALUE, 999999, 
999999, Integer.MAX_VALUE,
+    // StorageInfo.Type.HDFS);
+    hdfs1.maxSlots_$eq(Long.MAX_VALUE);
+    hdfs2.maxSlots_$eq(Long.MAX_VALUE);
+    //    hdfs3.maxSlots_$eq(Long.MAX_VALUE);
+    workers.get(0).diskInfos().put("HDFS", hdfs1);
+    workers.get(1).diskInfos().put("HDFS", hdfs2);
+    //    workers.get(2).diskInfos().put("HDFS", hdfs3);
+    final List<Integer> partitionIds = new ArrayList<>();
+    for (int i = 0; i < 3000; i++) {
+      partitionIds.add(i);
+    }
+    final boolean shouldReplicate = true;
+    checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false);
+  }
+
+  @Test
+  public void testLocalDisksAndHDFSOnLoadAwareWithInsufficientSlots() {
+    final List<WorkerInfo> workers = prepareWorkers(true);
+    DiskInfo hdfs1 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    DiskInfo hdfs2 =
+        new DiskInfo(
+            "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.HDFS);
+    //    DiskInfo hdfs3 = new DiskInfo("HDFS", Long.MAX_VALUE, 999999, 
999999, Integer.MAX_VALUE,
+    // StorageInfo.Type.HDFS);
+    hdfs1.maxSlots_$eq(Long.MAX_VALUE);
+    hdfs2.maxSlots_$eq(Long.MAX_VALUE);
+    //    hdfs3.maxSlots_$eq(Long.MAX_VALUE);
+    workers.get(0).diskInfos().put("HDFS", hdfs1);
+    workers.get(1).diskInfos().put("HDFS", hdfs2);
+    for (Map.Entry<String, DiskInfo> diskEntry : 
workers.get(2).diskInfos().entrySet()) {
+      diskEntry.getValue().maxSlots_$eq(100);
+    }
+    //    workers.get(2).diskInfos().put("HDFS", hdfs3);
+    final List<Integer> partitionIds = new ArrayList<>();
+    for (int i = 0; i < 3000; i++) {
+      partitionIds.add(i);
+    }
+    final boolean shouldReplicate = true;
+    checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false);
   }
 
   @Test
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 8ec6497b4..ca38a49c4 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -234,7 +234,8 @@ public abstract class FileWriter implements DeviceObserver {
   public StorageInfo getStorageInfo() {
     if (flusher instanceof LocalFlusher) {
       LocalFlusher localFlusher = (LocalFlusher) flusher;
-      return new StorageInfo(localFlusher.diskType(), 
localFlusher.mountPoint(), true);
+      // do not write file path to reduce rpc size
+      return new StorageInfo(localFlusher.diskType(), true, "");
     } else {
       if (deleted) {
         return null;
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 80e207309..d8c7049e5 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -343,7 +343,7 @@ private[celeborn] class Worker(
     val diskInfos =
       workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { 
disk =>
         disk.mountPoint -> disk
-      }.toMap.asJava).values().asScala.toSeq
+      }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo
     val response = masterClient.askSync[HeartbeatFromWorkerResponse](
       HeartbeatFromWorker(
         host,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b1239b497..f3c4e99e0 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -40,7 +40,7 @@ import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus, 
FileInfo, TimeWindow}
 import org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
-import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType}
+import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
@@ -72,6 +72,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf)
   }
   val mountPoints = new util.HashSet[String](diskInfos.keySet())
+  val hdfsDiskInfo =
+    if (conf.hasHDFSStorage)
+      Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.HDFS))
+    else None
 
   def disksSnapshot(): List[DiskInfo] = {
     diskInfos.synchronized {
@@ -362,7 +366,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         throw new IOException(s"No available disks! suggested mountPoint 
$suggestedMountPoint")
       }
       val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
-      if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
+      if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) || 
location.getStorageInfo.HDFSOnly()) {
         val shuffleDir =
           new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
         val fileInfo =


Reply via email to