This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 19a8bac4 [ISSUE-448][Feature] shuffle server report storage info (#449)
19a8bac4 is described below

commit 19a8bac456feb3b5be2000027e4719396300b4a3
Author: advancedxy <[email protected]>
AuthorDate: Wed Jan 11 10:45:44 2023 +0800

    [ISSUE-448][Feature] shuffle server report storage info (#449)
    
    ### What changes were proposed in this pull request?
    ShuffleServer reports local storage info about itself.
    This PR also defines a general message definition to extend remote 
distributed info.
    
    
    ### Why are the changes needed?
    To do better shuffle assignments and get more insight of shuffle server
    This addresses #448
    
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UTs.
---
 common/pom.xml                                     |   6 +
 .../apache/uniffle/common/storage/StorageInfo.java | 137 +++++++++++++++++++++
 .../uniffle/common/storage/StorageInfoUtils.java   |  50 ++++++++
 .../uniffle/common/storage/StorageMedia.java       |  67 ++++++++++
 .../uniffle/common/storage/StorageStatus.java      |  66 ++++++++++
 .../common/storage/StorageInfoUtilsTest.java       |  78 ++++++++++++
 .../apache/uniffle/common/util/RssUtilsTest.java   |   6 +-
 .../coordinator/CoordinatorGrpcService.java        |   4 +-
 .../org/apache/uniffle/coordinator/ServerNode.java |  29 ++++-
 .../apache/uniffle/coordinator/ServerNodeTest.java |  24 ++++
 docs/server_guide.md                               |   5 +
 .../apache/uniffle/test/CoordinatorGrpcTest.java   |  19 +++
 .../client/impl/grpc/CoordinatorGrpcClient.java    |   9 +-
 .../client/request/RssSendHeartBeatRequest.java    |  13 +-
 proto/src/main/proto/Rss.proto                     |  29 +++++
 .../apache/uniffle/server/RegisterHeartBeat.java   |  11 +-
 .../apache/uniffle/server/ShuffleServerConf.java   |   6 +
 .../uniffle/server/storage/HdfsStorageManager.java |   7 ++
 .../server/storage/LocalStorageManager.java        |  52 ++++++++
 .../server/storage/MultiStorageManager.java        |   9 ++
 .../uniffle/server/storage/StorageManager.java     |  10 ++
 .../storage/StorageMediaFromEnvProvider.java       |  81 ++++++++++++
 ...che.uniffle.storage.common.StorageMediaProvider |  19 +++
 .../LocalSingleStorageTypeFromEnvProviderTest.java | 103 ++++++++++++++++
 .../server/storage/LocalStorageManagerTest.java    |  58 ++++++++-
 .../common/DefaultStorageMediaProvider.java        |  87 +++++++++++++
 .../uniffle/storage/common/LocalStorage.java       |  22 ++++
 .../storage/common/StorageMediaProvider.java       |  40 ++++++
 .../common/DefaultStorageMediaProviderTest.java    |  43 +++++++
 .../uniffle/storage/common/LocalStorageTest.java   |  34 +++++
 30 files changed, 1112 insertions(+), 12 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index e12db315..70ab0c6c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -83,6 +83,12 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
diff --git 
a/common/src/main/java/org/apache/uniffle/common/storage/StorageInfo.java 
b/common/src/main/java/org/apache/uniffle/common/storage/StorageInfo.java
new file mode 100644
index 00000000..9dca09d4
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/storage/StorageInfo.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import java.util.Objects;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class StorageInfo {
+  private String mountPoint;
+  private StorageMedia type;
+  private long capacity;
+  private long usedBytes;
+  // -1 indicates these field is not used and shall not be serialized to proto.
+  private long writingSpeed1M;
+  private long writingSpeed5M;
+  private long writingSpeed1H;
+  private long numberOfWritingFailures;
+  private StorageStatus status;
+
+  public StorageInfo(
+      String mountPoint,
+      StorageMedia type,
+      long capacity,
+      long usedBytes,
+      StorageStatus status) {
+    this.mountPoint = mountPoint;
+    this.type = type;
+    this.capacity = capacity;
+    this.usedBytes = usedBytes;
+    this.writingSpeed1M = -1;
+    this.writingSpeed5M = -1;
+    this.writingSpeed1H = -1;
+    this.numberOfWritingFailures = -1;
+    this.status = status;
+  }
+
+  public StorageInfo(
+      String mountPoint,
+      StorageMedia type,
+      long capacity,
+      long usedBytes,
+      long writingSpeed1M,
+      long writingSpeed5M,
+      long writingSpeed1H,
+      long numberOfWritingFailures,
+      StorageStatus status) {
+    this.mountPoint = mountPoint;
+    this.type = type;
+    this.capacity = capacity;
+    this.usedBytes = usedBytes;
+    this.writingSpeed1M = writingSpeed1M;
+    this.writingSpeed5M = writingSpeed5M;
+    this.writingSpeed1H = writingSpeed1H;
+    this.numberOfWritingFailures = numberOfWritingFailures;
+    this.status = status;
+  }
+
+
+  public RssProtos.StorageInfo toProto() {
+    RssProtos.StorageInfo.Builder builder = RssProtos.StorageInfo.newBuilder()
+        .setMountPoint(mountPoint)
+        .setStorageMedia(type.toProto())
+        .setCapacity(capacity)
+        .setUsedBytes(usedBytes)
+        .setStatus(status.toProto());
+    if (writingSpeed1M >= 0) {
+      builder.setWritingSpeed1M(writingSpeed1M);
+      builder.setWritingSpeed5M(writingSpeed5M);
+      builder.setWritingSpeed1H(writingSpeed1H);
+    }
+
+    if (numberOfWritingFailures >= 0) {
+      builder.setNumOfWritingFailures(numberOfWritingFailures);
+    }
+
+    return builder.build();
+  }
+
+  public StorageStatus getStatus() {
+    return status;
+  }
+
+  public StorageMedia getType() {
+    return type;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StorageInfo that = (StorageInfo) o;
+    return Objects.equals(mountPoint, that.mountPoint) && type == that.type
+        && capacity == that.capacity
+        && usedBytes == that.usedBytes
+        && writingSpeed1M == that.writingSpeed1M
+        && writingSpeed5M == that.writingSpeed5M
+        && writingSpeed1H == that.writingSpeed1H
+        && numberOfWritingFailures == that.numberOfWritingFailures
+        && status == that.status;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 41;
+    hash = (37 * hash) + Objects.hashCode(mountPoint);
+    hash = (19 * hash) + Objects.hashCode(type);
+    hash = (37 * hash) + (int) capacity;
+    hash = (37 * hash) + (int) usedBytes;
+    hash = (37 * hash) + (int) writingSpeed1M;
+    hash = (37 * hash) + (int) writingSpeed5M;
+    hash = (37 * hash) + (int) writingSpeed1H;
+    hash = (37 * hash) + (int) numberOfWritingFailures;
+    hash = (19 * hash) + Objects.hashCode(status);
+    return hash;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/storage/StorageInfoUtils.java 
b/common/src/main/java/org/apache/uniffle/common/storage/StorageInfoUtils.java
new file mode 100644
index 00000000..a2c7f6e6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/storage/StorageInfoUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import org.apache.uniffle.proto.RssProtos;
+
+
+public class StorageInfoUtils {
+  public static Map<String, RssProtos.StorageInfo> toProto(
+      Map<String, StorageInfo> info) {
+    Map<String, RssProtos.StorageInfo> result = 
Maps.newHashMapWithExpectedSize(info.size());
+    info.forEach((k, v) -> result.put(k, v.toProto()));
+    return result;
+  }
+
+  public static Map<String, StorageInfo> fromProto(Map<String, 
RssProtos.StorageInfo> info) {
+    Map<String, StorageInfo> result = 
Maps.newHashMapWithExpectedSize(info.size());
+    for (Map.Entry<String, RssProtos.StorageInfo> entry : info.entrySet()) {
+      String key = entry.getKey();
+      RssProtos.StorageInfo val = entry.getValue();
+      StorageInfo storageInfo = new StorageInfo(
+          val.getMountPoint(),
+          StorageMedia.fromProto(val.getStorageMedia()),
+          val.getCapacity(),
+          val.getUsedBytes(),
+          StorageStatus.fromProto(val.getStatus()));
+      result.put(key, storageInfo);
+    }
+    return result;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/storage/StorageMedia.java 
b/common/src/main/java/org/apache/uniffle/common/storage/StorageMedia.java
new file mode 100644
index 00000000..85f550f2
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/storage/StorageMedia.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import org.apache.uniffle.proto.RssProtos.StorageInfo;
+
+public enum StorageMedia {
+  UNKNOWN(0),
+  HDD(1),
+  SSD(2),
+  HDFS(3),
+  OBJECT_STORE(4);
+
+  private final byte val;
+
+  StorageMedia(int code) {
+    assert (code >= -1 && code < 256);
+    this.val = (byte) code;
+  }
+
+  public StorageInfo.StorageMedia toProto() {
+    switch (this) {
+      case UNKNOWN:
+        return StorageInfo.StorageMedia.STORAGE_TYPE_UNKNOWN;
+      case HDD:
+        return StorageInfo.StorageMedia.HDD;
+      case SSD:
+        return StorageInfo.StorageMedia.SSD;
+      case HDFS:
+        return StorageInfo.StorageMedia.HDFS;
+      case OBJECT_STORE:
+        return StorageInfo.StorageMedia.OBJECT_STORE;
+      default:
+        return StorageInfo.StorageMedia.UNRECOGNIZED;
+    }
+  }
+
+  public static StorageMedia fromProto(StorageInfo.StorageMedia storageMedia) {
+    switch (storageMedia) {
+      case HDD:
+        return StorageMedia.HDD;
+      case SSD:
+        return StorageMedia.SSD;
+      case HDFS:
+        return StorageMedia.HDFS;
+      case OBJECT_STORE:
+        return StorageMedia.OBJECT_STORE;
+      default:
+        return StorageMedia.UNKNOWN;
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/storage/StorageStatus.java 
b/common/src/main/java/org/apache/uniffle/common/storage/StorageStatus.java
new file mode 100644
index 00000000..e69a768e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/storage/StorageStatus.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import org.apache.uniffle.proto.RssProtos.StorageInfo;
+
+public enum StorageStatus {
+  UNKNOWN(0),
+  NORMAL(1),
+  UNHEALTHY(2),
+  OVERUSED(3);
+
+  private final byte val;
+
+  StorageStatus(int code) {
+    assert (code >= -1 && code < 256);
+    this.val = (byte) code;
+  }
+
+  public final byte getCode() {
+    return val;
+  }
+
+  public StorageInfo.StorageStatus toProto() {
+    switch (this) {
+      case UNKNOWN:
+        return StorageInfo.StorageStatus.STORAGE_STATUS_UNKNOWN;
+      case NORMAL:
+        return StorageInfo.StorageStatus.NORMAL;
+      case UNHEALTHY:
+        return StorageInfo.StorageStatus.UNHEALTHY;
+      case OVERUSED:
+        return StorageInfo.StorageStatus.OVERUSED;
+      default:
+        return StorageInfo.StorageStatus.UNRECOGNIZED;
+    }
+  }
+
+  public static StorageStatus fromProto(StorageInfo.StorageStatus status) {
+    switch (status) {
+      case NORMAL:
+        return StorageStatus.NORMAL;
+      case UNHEALTHY:
+        return StorageStatus.UNHEALTHY;
+      case OVERUSED:
+        return StorageStatus.OVERUSED;
+      default:
+        return StorageStatus.UNKNOWN;
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/storage/StorageInfoUtilsTest.java
 
b/common/src/test/java/org/apache/uniffle/common/storage/StorageInfoUtilsTest.java
new file mode 100644
index 00000000..f905d8cc
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/storage/StorageInfoUtilsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.proto.RssProtos;
+
+import static org.apache.uniffle.common.storage.StorageInfoUtils.fromProto;
+import static org.apache.uniffle.common.storage.StorageInfoUtils.toProto;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class StorageInfoUtilsTest {
+  @Test
+  public void testFromProto() {
+    // empty map should return empty result
+    assertEquals(0, fromProto(Maps.newHashMap()).size());
+    RssProtos.StorageInfo info = RssProtos.StorageInfo.newBuilder()
+        .setMountPoint("/mnt")
+        .setStorageMedia(RssProtos.StorageInfo.StorageMedia.HDD)
+        .setCapacity(100)
+        .setUsedBytes(95)
+        .setStatus(RssProtos.StorageInfo.StorageStatus.NORMAL)
+        .build();
+    Map<String, RssProtos.StorageInfo> tmp = Maps.newHashMap();
+    tmp.put(info.getMountPoint(), info);
+    Map<String, StorageInfo> result = fromProto(tmp);
+    assertEquals(1, result.size());
+    assertNotNull(result.get(info.getMountPoint()));
+    StorageInfo storageInfo = result.get(info.getMountPoint());
+    StorageInfo expected = new StorageInfo(
+        "/mnt",
+        StorageMedia.HDD,
+        100,
+        95,
+        StorageStatus.NORMAL
+    );
+    assertEquals(expected, storageInfo);
+  }
+
+  @Test
+  public void testToProto() {
+    // empty input
+    assertEquals(0, toProto(Maps.newHashMap()).size());
+    StorageInfo info = new StorageInfo(
+        "/mnt",
+        StorageMedia.HDD,
+        100,
+        95,
+        StorageStatus.NORMAL
+    );
+    Map<String, StorageInfo> tmp = Maps.newHashMap();
+    tmp.put("/mnt", info);
+    Map<String, RssProtos.StorageInfo> result = toProto(tmp);
+    assertEquals(1, result.size());
+    assertEquals(info.toProto(), result.get("/mnt"));
+  }
+
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 6ed70798..d054337f 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -209,7 +209,11 @@ public class RssUtilsTest {
       Field field = cl.getDeclaredField("m");
       field.setAccessible(true);
       Map<String, String> writableEnv = (Map<String, String>) field.get(env);
-      writableEnv.put(key, value);
+      if (value != null) {
+        writableEnv.put(key, value);
+      } else {
+        writableEnv.remove(key);
+      }
     } catch (Exception e) {
       throw new IllegalStateException("Failed to set environment variable", e);
     }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 415f31e2..8e2b49d1 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
 import org.apache.uniffle.coordinator.access.AccessInfo;
 import 
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
@@ -374,6 +375,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         request.getAvailableMemory(),
         request.getEventNumInFlush(),
         Sets.newHashSet(request.getTagsList()),
-        isHealthy);
+        isHealthy,
+        StorageInfoUtils.fromProto(request.getStorageInfoMap()));
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index 9f18c330..069e5c79 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -17,8 +17,12 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
+
+import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
 
 public class ServerNode implements Comparable<ServerNode> {
@@ -33,6 +37,7 @@ public class ServerNode implements Comparable<ServerNode> {
   private long timestamp;
   private Set<String> tags;
   private boolean isHealthy;
+  private Map<String, StorageInfo> storageInfo;
 
   public ServerNode(
       String id,
@@ -44,6 +49,21 @@ public class ServerNode implements Comparable<ServerNode> {
       int eventNumInFlush,
       Set<String> tags,
       boolean isHealthy) {
+    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags, isHealthy,
+        Maps.newHashMap());
+  }
+
+  public ServerNode(
+      String id,
+      String ip,
+      int port,
+      long usedMemory,
+      long preAllocatedMemory,
+      long availableMemory,
+      int eventNumInFlush,
+      Set<String> tags,
+      boolean isHealthy,
+      Map<String, StorageInfo> storageInfoMap) {
     this.id = id;
     this.ip = ip;
     this.port = port;
@@ -54,6 +74,7 @@ public class ServerNode implements Comparable<ServerNode> {
     this.timestamp = System.currentTimeMillis();
     this.tags = tags;
     this.isHealthy = isHealthy;
+    this.storageInfo = storageInfoMap;
   }
 
   public ShuffleServerId convertToGrpcProto() {
@@ -100,6 +121,10 @@ public class ServerNode implements Comparable<ServerNode> {
     return isHealthy;
   }
 
+  public Map<String, StorageInfo> getStorageInfo() {
+    return storageInfo;
+  }
+
   @Override
   public String toString() {
     return "ServerNode with id[" + id
@@ -111,7 +136,9 @@ public class ServerNode implements Comparable<ServerNode> {
         + "], eventNumInFlush[" + eventNumInFlush
         + "], timestamp[" + timestamp
         + "], tags" + tags.toString() + ""
-        + ", healthy[" + isHealthy + "]";
+        + ", healthy[" + isHealthy
+        + "], storages[num=" + storageInfo.size() + "]";
+
   }
 
   /**
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
index c70d58f1..9c6e7a38 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
@@ -19,12 +19,18 @@ package org.apache.uniffle.coordinator;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.common.storage.StorageMedia;
+import org.apache.uniffle.common.storage.StorageStatus;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ServerNodeTest {
@@ -44,4 +50,22 @@ public class ServerNodeTest {
     assertEquals("sn1", nodes.get(1).getId());
     assertEquals("sn3", nodes.get(2).getId());
   }
+
+  @Test
+  public void testStorageInfoOfServerNode() {
+    Set<String> tags = Sets.newHashSet("tag");
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags, 
true);
+    // default constructor creates ServerNode with zero size of LocalStorage
+    assertEquals(0, sn1.getStorageInfo().size());
+    Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
+    StorageInfo info = new StorageInfo(
+        "/mnt",
+        StorageMedia.SSD,
+        100L,
+        60L,
+        StorageStatus.NORMAL);
+    localStorageInfo.put("/mnt", info);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags, 
true, localStorageInfo);
+    assertEquals(1, sn2.getStorageInfo().size());
+  }
 }
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 357aa94d..0e0b08cd 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -87,6 +87,11 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 |rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency 
of single partition writer, the data partition file number is equal to this 
value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.|
 |rss.metrics.reporter.class|-|The class of metrics reporter.|
 
+### Advanced Configurations
+|Property Name|Default| Description                                            
                                                                                
                                                     |
+|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.server.storageMediaProvider.from.env.key|-| Sometimes, the local storage 
type/media info is provided by external system. RSS would read the env key 
defined by this configuration and get info about the storage media of its 
basePaths |
+
 
 ### PrometheusPushGatewayMetricReporter settings
 PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, 
which will allow user pushes metrics to a [Prometheus 
Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped 
by Prometheus.
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 4b20b40c..3bae1739 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -36,7 +36,11 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleRegisterInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.common.storage.StorageMedia;
+import org.apache.uniffle.common.storage.StorageStatus;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RssUtilsTest;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.ServerNode;
 import org.apache.uniffle.coordinator.SimpleClusterManager;
@@ -231,16 +235,31 @@ public class CoordinatorGrpcTest extends 
CoordinatorTestBase {
     List<ServerNode> nodes = 
scm.getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
     assertEquals(1, nodes.size());
     ServerNode node = nodes.get(0);
+    assertEquals(1, node.getStorageInfo().size());
+    StorageInfo infoHead = node.getStorageInfo().values().iterator().next();
+    assertEquals(StorageMedia.HDD, infoHead.getType());
+    assertEquals(StorageStatus.NORMAL, infoHead.getStatus());
     assertTrue(node.getTags().contains(Constants.SHUFFLE_SERVER_VERSION));
     
assertTrue(scm.getTagToNodes().get(Constants.SHUFFLE_SERVER_VERSION).contains(node));
     ShuffleServerConf shuffleServerConf = 
shuffleServers.get(0).getShuffleServerConf();
     shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 
2);
     shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
+    shuffleServerConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY, 
"RSS_ENV_KEY");
+    String baseDir = 
shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0);
+    String storageTypeJsonSource = String.format("{\"%s\": \"ssd\"}", baseDir);
+    RssUtilsTest.setEnv("RSS_ENV_KEY", storageTypeJsonSource);
+    // set this server's tag to ssd
+    shuffleServerConf.set(ShuffleServerConf.TAGS, Lists.newArrayList("SSD"));
     ShuffleServer ss = new ShuffleServer(shuffleServerConf);
     ss.start();
     shuffleServers.set(0, ss);
     Thread.sleep(3000);
     assertEquals(2, coordinators.get(0).getClusterManager().getNodesNum());
+    nodes = 
scm.getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION, "SSD"));
+    assertEquals(1, nodes.size());
+    ServerNode ssdNode = nodes.get(0);
+    infoHead = ssdNode.getStorageInfo().values().iterator().next();
+    assertEquals(StorageMedia.SSD, infoHead.getType());
     scm.close();
   }
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 4f85bdae..38f90dc1 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -53,6 +53,8 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.proto.CoordinatorServerGrpc;
 import 
org.apache.uniffle.proto.CoordinatorServerGrpc.CoordinatorServerBlockingStub;
 import org.apache.uniffle.proto.RssProtos;
@@ -115,7 +117,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       int eventNumInFlush,
       long timeout,
       Set<String> tags,
-      boolean isHealthy) {
+      boolean isHealthy,
+      Map<String, StorageInfo> storageInfo) {
     ShuffleServerId serverId =
         ShuffleServerId.newBuilder().setId(id).setIp(ip).setPort(port).build();
     ShuffleServerHeartBeatRequest request =
@@ -127,6 +130,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .setEventNumInFlush(eventNumInFlush)
             .addAllTags(tags)
             .setIsHealthy(BoolValue.newBuilder().setValue(isHealthy).build())
+            .putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
             .build();
 
     StatusCode status;
@@ -190,7 +194,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
         request.getEventNumInFlush(),
         request.getTimeout(),
         request.getTags(),
-        request.isHealthy());
+        request.isHealthy(),
+        request.getStorageInfo());
 
     RssSendHeartBeatResponse response;
     StatusCode statusCode = rpcResponse.getStatus();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index 76a4033e..12a20f5e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -17,8 +17,12 @@
 
 package org.apache.uniffle.client.request;
 
+
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.uniffle.common.storage.StorageInfo;
+
 public class RssSendHeartBeatRequest {
 
   private final String shuffleServerId;
@@ -31,6 +35,7 @@ public class RssSendHeartBeatRequest {
   private final Set<String> tags;
   private final long timeout;
   private final boolean isHealthy;
+  private final Map<String, StorageInfo> storageInfo;
 
   public RssSendHeartBeatRequest(
       String shuffleServerId,
@@ -42,7 +47,8 @@ public class RssSendHeartBeatRequest {
       int eventNumInFlush,
       long timeout,
       Set<String> tags,
-      boolean isHealthy) {
+      boolean isHealthy,
+      Map<String, StorageInfo> storageInfo) {
     this.shuffleServerId = shuffleServerId;
     this.shuffleServerIp = shuffleServerIp;
     this.shuffleServerPort = shuffleServerPort;
@@ -53,6 +59,7 @@ public class RssSendHeartBeatRequest {
     this.tags = tags;
     this.timeout = timeout;
     this.isHealthy = isHealthy;
+    this.storageInfo = storageInfo;
   }
 
   public String getShuffleServerId() {
@@ -94,4 +101,8 @@ public class RssSendHeartBeatRequest {
   public boolean isHealthy() {
     return isHealthy;
   }
+
+  public Map<String, StorageInfo> getStorageInfo() {
+    return storageInfo;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index db3c6a07..3308cdcb 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -239,6 +239,7 @@ message ShuffleServerHeartBeatRequest {
   int32 eventNumInFlush = 5;
   repeated string tags = 6;
   google.protobuf.BoolValue isHealthy = 7;
+  map<string, StorageInfo> storageInfo = 21; // mount point to storage info 
mapping.
 }
 
 message ShuffleServerHeartBeatResponse {
@@ -271,6 +272,34 @@ enum StatusCode {
   // add more status
 }
 
+message StorageInfo {
+  enum StorageMedia {
+    STORAGE_TYPE_UNKNOWN = 0;
+    HDD = 1;
+    SSD = 2;
+    HDFS = 3;
+    OBJECT_STORE = 4;
+    // possible other types, such as cloud-ssd.
+  }
+
+  enum StorageStatus {
+    STORAGE_STATUS_UNKNOWN = 0;
+    NORMAL = 1;
+    UNHEALTHY = 2;
+    OVERUSED = 3; // indicate current disk/storage is overused.
+  }
+
+  string mountPoint = 1;
+  StorageMedia storageMedia = 2;
+  int64 capacity = 3;
+  int64 usedBytes = 4;
+  int64 writingSpeed1M = 5; // writing speed of last minute
+  int64 writingSpeed5M = 6; // writing speed of last 5 minutes
+  int64 writingSpeed1H = 7; // writing speed of last hour
+  int64 numOfWritingFailures = 8; // number of writing failures since start up.
+  StorageStatus status = 9;
+}
+
 service CoordinatorServer {
   // Get Shuffle Server list
   rpc getShuffleServerList(google.protobuf.Empty) returns 
(GetShuffleServerListResponse);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java 
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index f815a02e..42352560 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -35,6 +36,7 @@ import 
org.apache.uniffle.client.factory.CoordinatorClientFactory;
 import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
+import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.util.ThreadUtils;
 
 public class RegisterHeartBeat {
@@ -80,7 +82,8 @@ public class RegisterHeartBeat {
             shuffleServer.getAvailableMemory(),
             shuffleServer.getEventNumInFlush(),
             shuffleServer.getTags(),
-            shuffleServer.isHealthy());
+            shuffleServer.isHealthy(),
+            shuffleServer.getStorageManager().getStorageInfo());
       } catch (Exception e) {
         LOG.warn("Error happened when send heart beat to coordinator");
       }
@@ -98,7 +101,8 @@ public class RegisterHeartBeat {
       long availableMemory,
       int eventNumInFlush,
       Set<String> tags,
-      boolean isHealthy) {
+      boolean isHealthy,
+      Map<String, StorageInfo> localStorageInfo) {
     boolean sendSuccessfully = false;
     RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
         id,
@@ -110,7 +114,8 @@ public class RegisterHeartBeat {
         eventNumInFlush,
         heartBeatTimeout,
         tags,
-        isHealthy);
+        isHealthy,
+        localStorageInfo);
     List<Future<RssSendHeartBeatResponse>> respFutures = coordinatorClients
         .stream()
         .map(client -> heartBeatExecutorService.submit(() -> 
client.sendHeartBeat(request)))
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 8b2d0561..b40b19db 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -336,6 +336,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("Threshold when flushing shuffle data to persistent 
storage, recommend value would be 256K, "
           + "512K, or even 1M");
 
+  public static final ConfigOption<String> STORAGE_MEDIA_PROVIDER_ENV_KEY = 
ConfigOptions
+      .key("rss.server.storageMediaProvider.from.env.key")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The env key to get json source of local storage media 
provider");
+
   public ShuffleServerConf() {
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 516391cd..3864f774 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -141,6 +142,12 @@ public class HdfsStorageManager extends 
SingleStorageManager {
   public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
   }
 
+  @Override
+  public Map<String, StorageInfo> getStorageInfo() {
+    // todo: report remote storage info
+    return Maps.newHashMap();
+  }
+
   public HdfsStorage getStorageByAppId(String appId) {
     if (!appIdToStorages.containsKey(appId)) {
       synchronized (this) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 091a32d9..e8801b7f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -36,6 +37,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -45,6 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.UnionKey;
+import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.common.storage.StorageMedia;
+import org.apache.uniffle.common.storage.StorageStatus;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.LocalStorageChecker;
@@ -57,6 +62,7 @@ import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.common.StorageMediaProvider;
 import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
 import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
 import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
@@ -74,6 +80,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private final LocalStorageChecker checker;
 
   private final Map<String, LocalStorage> partitionsOfStorage;
+  private final List<StorageMediaProvider> typeProviders = 
Lists.newArrayList();
 
   @VisibleForTesting
   LocalStorageManager(ShuffleServerConf conf) {
@@ -94,6 +101,11 @@ public class LocalStorageManager extends 
SingleStorageManager {
     // We must make sure the order of `storageBasePaths` and `localStorages` 
is same, or some unit test may be fail
     CountDownLatch countDownLatch = new 
CountDownLatch(storageBasePaths.size());
     AtomicInteger successCount = new AtomicInteger();
+    ServiceLoader<StorageMediaProvider> loader = 
ServiceLoader.load(StorageMediaProvider.class);
+    for (StorageMediaProvider provider : loader) {
+      provider.init(conf);
+      typeProviders.add(provider);
+    }
     ExecutorService executorService = Executors.newCachedThreadPool();
     LocalStorage[] localStorageArray = new 
LocalStorage[storageBasePaths.size()];
     for (int i = 0; i < storageBasePaths.size(); i++) {
@@ -101,12 +113,14 @@ public class LocalStorageManager extends 
SingleStorageManager {
       String storagePath = storageBasePaths.get(i);
       executorService.submit(() -> {
         try {
+          StorageMedia storageType = getStorageTypeForBasePath(storagePath);
           localStorageArray[idx] = LocalStorage.newBuilder()
               .basePath(storagePath)
               .capacity(capacity)
               .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
               .highWaterMarkOfWrite(highWaterMarkOfWrite)
               .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
+              .localStorageMedia(storageType)
               .build();
           successCount.incrementAndGet();
         } catch (Exception e) {
@@ -141,6 +155,16 @@ public class LocalStorageManager extends 
SingleStorageManager {
     this.checker = new LocalStorageChecker(conf, localStorages);
   }
 
+  private StorageMedia getStorageTypeForBasePath(String basePath) {
+    for (StorageMediaProvider provider : this.typeProviders) {
+      StorageMedia result = provider.getStorageMediaFor(basePath);
+      if (result != StorageMedia.UNKNOWN) {
+        return result;
+      }
+    }
+    return StorageMedia.UNKNOWN;
+  }
+
   @Override
   public Storage selectStorage(ShuffleDataFlushEvent event) {
     String appId = event.getAppId();
@@ -305,6 +329,34 @@ public class LocalStorageManager extends 
SingleStorageManager {
     }
   }
 
+  @Override
+  public Map<String, StorageInfo> getStorageInfo() {
+    Map<String, StorageInfo> result = Maps.newHashMap();
+    for (LocalStorage storage : localStorages) {
+      String mountPoint = storage.getMountPoint();
+      long capacity = storage.getCapacity();
+      long wroteBytes = storage.getDiskSize();
+      StorageStatus status = StorageStatus.NORMAL;
+      if (storage.isCorrupted()) {
+        status = StorageStatus.UNHEALTHY;
+      } else if (!storage.canWrite()) {
+        status = StorageStatus.OVERUSED;
+      }
+      StorageMedia media = storage.getStorageMedia();
+      if (media == null) {
+        media = StorageMedia.UNKNOWN;
+      }
+      StorageInfo info = new StorageInfo(
+          mountPoint,
+          media,
+          capacity,
+          wroteBytes,
+          status);
+      result.put(mountPoint, info);
+    }
+    return result;
+  }
+
   public List<LocalStorage> getStorages() {
     return localStorages;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 53039279..ed01de8f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.storage;
 
 import java.lang.reflect.Constructor;
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.cache.Cache;
@@ -27,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
@@ -144,6 +146,13 @@ public class MultiStorageManager implements StorageManager 
{
     warmStorageManager.checkAndClearLeakedShuffleData(appIds);
   }
 
+  @Override
+  public Map<String, StorageInfo> getStorageInfo() {
+    Map<String, StorageInfo> localStorageInfo = 
warmStorageManager.getStorageInfo();
+    localStorageInfo.putAll(coldStorageManager.getStorageInfo());
+    return localStorageInfo;
+  }
+
   public void removeResources(PurgeEvent event) {
     LOG.info("Start to remove resource of {}", event);
     warmStorageManager.removeResources(event);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 2a487535..3a889089 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -18,8 +18,10 @@
 package org.apache.uniffle.server.storage;
 
 import java.util.Collection;
+import java.util.Map;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
@@ -54,4 +56,12 @@ public interface StorageManager {
   // todo: add an interface that check storage isHealthy
 
   void checkAndClearLeakedShuffleData(Collection<String> appIds);
+
+  /**
+   * Report a map of storage mount point -> storage info mapping. For local 
storages, the mount point will be the device
+   * name of the base dir belongs to. For remote storage, the mount point 
would be the base dir with protocol schema,
+   * such as hdfs://path/to/some/base/dir.
+   * @return a map of storage mount point -> storage info.
+   */
+  Map<String, StorageInfo> getStorageInfo();
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/StorageMediaFromEnvProvider.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageMediaFromEnvProvider.java
new file mode 100644
index 00000000..758a68b3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageMediaFromEnvProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.io.File;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.storage.StorageMedia;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.common.StorageMediaProvider;
+
+public class StorageMediaFromEnvProvider implements StorageMediaProvider {
+  private static Logger logger = 
LoggerFactory.getLogger(StorageMediaFromEnvProvider.class);
+  private Map<String, StorageMedia> localStorageTypes = Maps.newHashMap();
+
+  /**
+   * Returns current storage type for baseDir.
+   *
+   * @param baseDir the storage dir to check for
+   * @return LocalStorageType if valid or queryable. Unknown would be returned 
if it cannot be determined.
+   */
+  @Override
+  public StorageMedia getStorageMediaFor(String baseDir) {
+    File f = new File(baseDir);
+    while (f != null) {
+      if (localStorageTypes.containsKey(f.getPath())) {
+        return localStorageTypes.get(f.getPath());
+      }
+      f = f.getParentFile();
+    }
+    return StorageMedia.UNKNOWN;
+  }
+
+  @Override
+  public void init(RssConf conf) {
+    String envKey = conf.get(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY);
+    String jsonSource = "{}";
+    if (envKey != null && System.getenv(envKey) != null) {
+      jsonSource = System.getenv(envKey);
+    }
+    ObjectMapper om = new ObjectMapper();
+    try {
+      Map<String, String> mappings = om.readValue(jsonSource, Map.class);
+      localStorageTypes.clear();
+      for (Map.Entry<String, String> entry : mappings.entrySet()) {
+        String basePath = entry.getKey();
+        String storageType = entry.getValue();
+        try {
+          StorageMedia type = 
StorageMedia.valueOf(storageType.trim().toUpperCase());
+          localStorageTypes.put(basePath, type);
+        } catch (IllegalArgumentException i) {
+          logger.warn("cannot get storage type from {}, ignoring", 
storageType);
+        }
+      }
+    } catch (JsonProcessingException e) {
+      logger.warn("parse json from env failed with exception", e);
+    }
+  }
+}
diff --git 
a/server/src/main/resources/META-INF/services/org.apache.uniffle.storage.common.StorageMediaProvider
 
b/server/src/main/resources/META-INF/services/org.apache.uniffle.storage.common.StorageMediaProvider
new file mode 100644
index 00000000..333ef634
--- /dev/null
+++ 
b/server/src/main/resources/META-INF/services/org.apache.uniffle.storage.common.StorageMediaProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.uniffle.server.storage.StorageMediaFromEnvProvider
+org.apache.uniffle.storage.common.DefaultStorageMediaProvider
diff --git 
a/server/src/test/java/org/apache/uniffle/server/LocalSingleStorageTypeFromEnvProviderTest.java
 
b/server/src/test/java/org/apache/uniffle/server/LocalSingleStorageTypeFromEnvProviderTest.java
new file mode 100644
index 00000000..f12ad45b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/uniffle/server/LocalSingleStorageTypeFromEnvProviderTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.storage.StorageMedia;
+import org.apache.uniffle.common.util.RssUtilsTest;
+import org.apache.uniffle.server.storage.StorageMediaFromEnvProvider;
+import org.apache.uniffle.storage.common.StorageMediaProvider;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LocalSingleStorageTypeFromEnvProviderTest {
+  private static String STORAGE_TYPE_ENV_KEY = "RSS_LOCAL_STORAGE_TYPES";
+  private static String defaultStorageTypeSource;
+
+  private ShuffleServerConf rssConf;
+  private StorageMediaProvider provider;
+
+  @BeforeAll
+  static void setup() {
+    defaultStorageTypeSource = System.getenv(STORAGE_TYPE_ENV_KEY);
+  }
+
+  @AfterAll
+  static void reset() {
+    RssUtilsTest.setEnv(STORAGE_TYPE_ENV_KEY, defaultStorageTypeSource);
+  }
+
+  @BeforeEach
+  void setupRssServerConfig() {
+    rssConf = new ShuffleServerConf();
+    rssConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY, 
STORAGE_TYPE_ENV_KEY);
+    provider = new StorageMediaFromEnvProvider();
+  }
+
+  @Test
+  public void testJsonSourceParse() {
+    String emptyJsonSource = "";
+    RssUtilsTest.setEnv(STORAGE_TYPE_ENV_KEY, emptyJsonSource);
+    // invalid json source should not throw exceptions
+    provider.init(rssConf);
+    assertEquals(StorageMedia.UNKNOWN, provider.getStorageMediaFor("/data01"));
+    emptyJsonSource = "{}";
+    RssUtilsTest.setEnv(STORAGE_TYPE_ENV_KEY, emptyJsonSource);
+    provider.init(rssConf);
+    assertEquals(StorageMedia.UNKNOWN, provider.getStorageMediaFor("/data01"));
+
+    String storageTypeJson = "{\"/data01\": \"SSD\"}";
+    RssUtilsTest.setEnv(STORAGE_TYPE_ENV_KEY, storageTypeJson);
+    provider.init(rssConf);
+    assertEquals(StorageMedia.SSD, provider.getStorageMediaFor("/data01"));
+  }
+
+  @Test
+  public void testMultipleMountPoints() {
+    Map<String, String> storageTypes = Maps.newHashMap();
+    storageTypes.put("/data01", "ssd");
+    storageTypes.put("/data02", "hdd");
+    storageTypes.put("/data03", "SSD");
+    ObjectMapper om = new ObjectMapper();
+    String jsonSource;
+    try {
+      jsonSource = om.writeValueAsString(storageTypes);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    RssUtilsTest.setEnv(STORAGE_TYPE_ENV_KEY, jsonSource);
+
+    provider.init(rssConf);
+    assertEquals(StorageMedia.HDD, provider.getStorageMediaFor("/data02"));
+    assertEquals(StorageMedia.SSD, provider.getStorageMediaFor("/data01"));
+    assertEquals(StorageMedia.SSD, provider.getStorageMediaFor("/data03"));
+    assertEquals(StorageMedia.UNKNOWN, provider.getStorageMediaFor("/data0"));
+    assertEquals(StorageMedia.UNKNOWN, 
provider.getStorageMediaFor("/Path/not/existed"));
+    assertEquals(StorageMedia.HDD, 
provider.getStorageMediaFor("/data02/abc/1234"));
+    assertEquals(StorageMedia.SSD, 
provider.getStorageMediaFor("/data01/spark_shuffle_data/111"));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index be42a215..b7a79760 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -17,16 +17,23 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.common.storage.StorageMedia;
+import org.apache.uniffle.common.storage.StorageStatus;
+import org.apache.uniffle.common.util.RssUtilsTest;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -82,7 +89,7 @@ public class LocalStorageManagerTest {
     ShuffleServerConf conf = new ShuffleServerConf();
     conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(storagePaths));
     conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
-    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
 
     LocalStorageManager localStorageManager = new LocalStorageManager(conf);
 
@@ -104,7 +111,7 @@ public class LocalStorageManagerTest {
     ShuffleServerConf conf = new ShuffleServerConf();
     conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(storagePaths));
     conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
-    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
     LocalStorageManager localStorageManager = new LocalStorageManager(conf);
 
     List<LocalStorage> storages = localStorageManager.getStorages();
@@ -158,7 +165,7 @@ public class LocalStorageManagerTest {
     ShuffleServerConf conf = new ShuffleServerConf();
     conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(storagePaths));
     conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
-    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
     LocalStorageManager localStorageManager = new LocalStorageManager(conf);
 
     List<LocalStorage> storages = localStorageManager.getStorages();
@@ -216,4 +223,49 @@ public class LocalStorageManagerTest {
       // ignore
     }
   }
+
+  @Test
+  public void testGetLocalStorageInfo() {
+    String[] storagePaths = {"/tmp/rss-data1", "/tmp/rss-data2", 
"/tmp/rss-data3"};
+
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(storagePaths));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
+    LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+    Map<String, StorageInfo> storageInfo = 
localStorageManager.getStorageInfo();
+    assertEquals(1, storageInfo.size());
+    try {
+      String mountPoint = Files.getFileStore(new File("/tmp").toPath()).name();
+      assertNotNull(storageInfo.get(mountPoint));
+      // by default, it should report HDD as local storage type
+      assertEquals(StorageMedia.HDD, storageInfo.get(mountPoint).getType());
+      assertEquals(StorageStatus.NORMAL, 
storageInfo.get(mountPoint).getStatus());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testEnvStorageTypeProvider() {
+    String[] storagePaths = {"/tmp/rss-data1"};
+
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(storagePaths));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
+    conf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY, "env_key");
+    RssUtilsTest.setEnv("env_key", "{\"/tmp\": \"ssd\"}");
+    LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+    Map<String, StorageInfo> storageInfo = 
localStorageManager.getStorageInfo();
+    assertEquals(1, storageInfo.size());
+    try {
+      String mountPoint = Files.getFileStore(new File("/tmp").toPath()).name();
+      assertNotNull(storageInfo.get(mountPoint));
+      // by default, it should report HDD as local storage type
+      assertEquals(StorageMedia.SSD, storageInfo.get(mountPoint).getType());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/DefaultStorageMediaProvider.java
 
b/storage/src/main/java/org/apache/uniffle/storage/common/DefaultStorageMediaProvider.java
new file mode 100644
index 00000000..92b16742
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/DefaultStorageMediaProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.directory.api.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.storage.StorageMedia;
+
+public class DefaultStorageMediaProvider implements StorageMediaProvider {
+  private static final Logger logger = 
LoggerFactory.getLogger(DefaultStorageMediaProvider.class);
+  private static final String NUMBERIC_STRING = "0123456789";
+  private static final String BLOCK_PATH_FORMAT = 
"/sys/block/%s/queue/rotational";
+  private static final String HDFS = "hdfs";
+  private static final List<String> OBJECT_STORE_SCHEMAS = Arrays.asList("s3", 
"oss", "cos", "gcs", "obs");
+
+  @Override
+  public StorageMedia getStorageMediaFor(String baseDir) {
+    try {
+      URI uri = new URI(baseDir);
+      String scheme = uri.getScheme();
+      if (Strings.equals(scheme, HDFS)) {
+        return StorageMedia.HDFS;
+      } else if (scheme != null && 
OBJECT_STORE_SCHEMAS.contains(scheme.toLowerCase())) {
+        return StorageMedia.OBJECT_STORE;
+      }
+    } catch (URISyntaxException e) {
+      logger.warn("invalid uri input from " +  baseDir + ", with exception:", 
e);
+    }
+    // if baseDir starts with HDFS, the hdfs storage type should be reported
+    if (SystemUtils.IS_OS_LINUX) {
+      // according to https://unix.stackexchange.com/a/65602, we can detect 
disk types by looking at the
+      // `/sys/block/sdx/queue/rotational`.
+      try {
+        File baseFile = new File(baseDir);
+        FileStore store = Files.getFileStore(baseFile.toPath());
+        String mountPoint = store.name(); // mountPoint would be /dev/sda1 or 
/dev/vda1, etc.
+        String deviceName = 
mountPoint.substring(mountPoint.lastIndexOf(File.separator));
+        deviceName = StringUtils.stripEnd(deviceName, NUMBERIC_STRING);
+        File blockFile = new File(String.format(BLOCK_PATH_FORMAT, 
deviceName));
+        if (blockFile.exists()) {
+          List<String> contents = Files.readAllLines(blockFile.toPath());
+          // this should always hold true
+          if (contents.size() >= 1) {
+            String rotational = contents.get(0);
+            if (rotational.equals("0")) {
+              return StorageMedia.SSD;
+            } else if (rotational.equals("1")) {
+              return StorageMedia.HDD;
+            }
+          }
+        }
+      } catch (IOException ioe) {
+        logger.warn("Get storage type failed with exception", ioe);
+      }
+    }
+    logger.info("Default storage type provider returns HDD by default");
+    return StorageMedia.HDD;
+  }
+}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index f9f63a3d..e1f9b1cd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -19,6 +19,8 @@ package org.apache.uniffle.storage.common;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
@@ -32,6 +34,7 @@ import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.storage.StorageMedia;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -47,6 +50,7 @@ public class LocalStorage extends AbstractStorage {
 
   private long capacity;
   private final String basePath;
+  private final String mountPoint;
   private final double cleanupThreshold;
   private final long cleanIntervalMs;
   private final double highWaterMarkOfWrite;
@@ -55,6 +59,7 @@ public class LocalStorage extends AbstractStorage {
   private final Queue<String> expiredShuffleKeys = 
Queues.newLinkedBlockingQueue();
 
   private LocalStorageMeta metaData = new LocalStorageMeta();
+  private final StorageMedia media;
   private boolean isSpaceEnough = true;
   private volatile boolean isCorrupted = false;
 
@@ -67,6 +72,7 @@ public class LocalStorage extends AbstractStorage {
     this.lowWaterMarkOfWrite = builder.lowWaterMarkOfWrite;
     this.capacity = builder.capacity;
     this.shuffleExpiredTimeoutMs = builder.shuffleExpiredTimeoutMs;
+    this.media = builder.media;
 
     File baseFolder = new File(basePath);
     try {
@@ -74,6 +80,8 @@ public class LocalStorage extends AbstractStorage {
       if (!baseFolder.mkdirs()) {
         throw new IOException("Failed to create base folder: " + basePath);
       }
+      FileStore store = Files.getFileStore(baseFolder.toPath());
+      this.mountPoint =  store.name();
     } catch (IOException ioe) {
       LOG.warn("Init base directory " + basePath + " fail, the disk should be 
corrupted", ioe);
       throw new RuntimeException(ioe);
@@ -221,6 +229,14 @@ public class LocalStorage extends AbstractStorage {
     return capacity;
   }
 
+  public String getMountPoint() {
+    return mountPoint;
+  }
+
+  public StorageMedia getStorageMedia() {
+    return media;
+  }
+
   public double getHighWaterMarkOfWrite() {
     return highWaterMarkOfWrite;
   }
@@ -326,6 +342,7 @@ public class LocalStorage extends AbstractStorage {
     private String basePath;
     private long cleanIntervalMs;
     private long shuffleExpiredTimeoutMs;
+    private StorageMedia media;
 
     private Builder() {
     }
@@ -365,6 +382,11 @@ public class LocalStorage extends AbstractStorage {
       return this;
     }
 
+    public Builder localStorageMedia(StorageMedia media) {
+      this.media = media;
+      return this;
+    }
+
     public LocalStorage build() {
       return new LocalStorage(this);
     }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/StorageMediaProvider.java
 
b/storage/src/main/java/org/apache/uniffle/storage/common/StorageMediaProvider.java
new file mode 100644
index 00000000..dc85bc5f
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/StorageMediaProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.common;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.storage.StorageMedia;
+
+public interface StorageMediaProvider {
+
+  /**
+   * Returns current storage media for baseDir.
+   *
+   * @param baseDir the storage dir to check for
+   * @return LocalStorageType if valid or queryable. Unknown would be returned 
if it cannot be determined.
+   */
+  StorageMedia getStorageMediaFor(String baseDir);
+
+  /**
+   * init the provider by configuration
+   *
+   * @param config RSS configurations.
+   */
+  default void init(RssConf config) {
+  }
+}
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/common/DefaultStorageMediaProviderTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/common/DefaultStorageMediaProviderTest.java
new file mode 100644
index 00000000..d105d598
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/common/DefaultStorageMediaProviderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.common;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.storage.StorageMedia;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DefaultStorageMediaProviderTest {
+  @Test
+  public void testStorageProvider() {
+    StorageMediaProvider provider = new DefaultStorageMediaProvider();
+    // hdfs file should report hdfs type
+    assertEquals(StorageMedia.HDFS, 
provider.getStorageMediaFor("hdfs://nn1/path/to/base"));
+    // object store files
+    assertEquals(StorageMedia.OBJECT_STORE, 
provider.getStorageMediaFor("s3://bucket-name/a/path"));
+    assertEquals(StorageMedia.OBJECT_STORE, 
provider.getStorageMediaFor("cos://bucket-name/b/path"));
+
+    // by default, the local file should report as HDD
+    assertEquals(StorageMedia.HDD, 
provider.getStorageMediaFor("/path/to/base/dir"));
+    assertEquals(StorageMedia.HDD, 
provider.getStorageMediaFor("file:///path/to/a/dir"));
+
+    // invalid uri should also be reported as HDD
+    assertEquals(StorageMedia.HDD, 
provider.getStorageMediaFor("file@xx:///path/to/a"));
+  }
+}
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java 
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 835d5e9d..49082c6c 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.common;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.List;
 
 import com.google.common.collect.Lists;
@@ -28,23 +29,31 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.RoaringBitmap;
 
+import org.apache.uniffle.common.storage.StorageMedia;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 public class LocalStorageTest {
 
   private static File testBaseDir;
+  private static String mountPoint;
 
   @BeforeAll
   public static void setUp(@TempDir File tempDir) throws IOException  {
     testBaseDir = new File(tempDir, "test");
     testBaseDir.mkdir();
+    try {
+      mountPoint = Files.getFileStore(testBaseDir.toPath()).name();
+    } catch (IOException ioe) {
+      // pass
+    }
   }
 
   @AfterAll
@@ -164,6 +173,31 @@ public class LocalStorageTest {
     assertEquals(2, item.getSortedShuffleKeys(false, 3).size());
   }
 
+  @Test
+  public void diskStorageInfoTest() {
+    LocalStorage item = LocalStorage.newBuilder()
+        .basePath(testBaseDir.getAbsolutePath())
+        .cleanupThreshold(50)
+        .highWaterMarkOfWrite(95)
+        .lowWaterMarkOfWrite(80)
+        .capacity(100)
+        .cleanIntervalMs(5000)
+        .build();
+    assertEquals(mountPoint, item.getMountPoint());
+    assertNull(item.getStorageMedia());
+
+    LocalStorage itemWithStorageType = LocalStorage.newBuilder()
+        .basePath(testBaseDir.getAbsolutePath())
+        .cleanupThreshold(50)
+        .highWaterMarkOfWrite(95)
+        .lowWaterMarkOfWrite(80)
+        .capacity(100)
+        .cleanIntervalMs(5000)
+        .localStorageMedia(StorageMedia.SSD)
+        .build();
+    assertEquals(StorageMedia.SSD, itemWithStorageType.getStorageMedia());
+  }
+
   @Test
   public void writeHandlerTest() {
     LocalStorage item = 
LocalStorage.newBuilder().basePath(testBaseDir.getAbsolutePath()).build();

Reply via email to