This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 783a7cc4f [CELEBORN-2236] Avoiding regular expressions for 
DiskFileInfo storage type determination
783a7cc4f is described below

commit 783a7cc4fcedd57d62f6c876057700319163c7ee
Author: xxx <[email protected]>
AuthorDate: Fri Jan 2 20:03:23 2026 +0800

    [CELEBORN-2236] Avoiding regular expressions for DiskFileInfo storage type 
determination
    
    ### What changes were proposed in this pull request?
    
    Avoiding regular expressions for DiskFileInfo storage type determination.
    
    ### Why are the changes needed?
    
    When sending a heartbeat, the Worker iterates all FileInfo objects and uses 
regex matching on a large number of them to check if the file is an HDFS file, 
thus reducing processing efficiency.
    
    <img width="2766" height="1456" alt="image" 
src="https://github.com/user-attachments/assets/2dc075f6-562d-4467-a75c-4b6682ed866d";
 />
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3562 from xy2953396112/CELEBORN-2236.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/common/meta/DiskFileInfo.java  |  17 ++-
 common/src/main/proto/TransportMessages.proto      |   1 +
 .../apache/celeborn/common/util/PbSerDeUtils.scala |  14 +++
 .../celeborn/common/util/PbSerDeUtilsTest.scala    | 124 ++++++++++++++++++++-
 4 files changed, 149 insertions(+), 7 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index ab798eca3..d4571fa4b 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -57,10 +57,15 @@ public class DiskFileInfo extends FileInfo {
       boolean partitionSplitEnabled,
       FileMeta fileMeta,
       String filePath,
+      StorageInfo.Type storageType,
       long bytesFlushed) {
     super(userIdentifier, partitionSplitEnabled, fileMeta);
     this.filePath = filePath;
-    this.storageType = StorageInfo.Type.HDD;
+    if (storageType != null) {
+      this.storageType = storageType;
+    } else {
+      this.storageType = StorageInfo.Type.HDD;
+    }
     this.bytesFlushed = bytesFlushed;
   }
 
@@ -150,19 +155,21 @@ public class DiskFileInfo extends FileInfo {
   }
 
   public boolean isHdfs() {
-    return Utils.isHdfsPath(filePath);
+    return storageType == StorageInfo.Type.HDFS;
   }
 
   public boolean isS3() {
-    return Utils.isS3Path(filePath);
+    return storageType == StorageInfo.Type.S3;
   }
 
   public boolean isOSS() {
-    return Utils.isOssPath(filePath);
+    return storageType == StorageInfo.Type.OSS;
   }
 
   public boolean isDFS() {
-    return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) || 
Utils.isHdfsPath(filePath);
+    return storageType == StorageInfo.Type.HDFS
+        || storageType == StorageInfo.Type.S3
+        || storageType == StorageInfo.Type.OSS;
   }
 
   public StorageInfo.Type getStorageType() {
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 160c7e962..a813a9e50 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -661,6 +661,7 @@ message PbFileInfo {
   bool isSegmentGranularityVisible = 9;
   map<int32, int32> partitionWritingSegment = 10;
   repeated PbSegmentIndex segmentIndex = 11;
+  int32 storageType = 12;
 }
 
 message PbSegmentIndex {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 1e96ece3e..e9c407ce8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -114,11 +114,24 @@ object PbSerDeUtils {
         fileMeta.setIsWriterClosed(true)
         fileMeta
     }
+    var storageType: StorageInfo.Type = null
+    if (pbFileInfo.getStorageType == 0) {
+      if (Utils.isHdfsPath(pbFileInfo.getFilePath)) {
+        storageType = StorageInfo.Type.HDFS
+      } else if (Utils.isOssPath(pbFileInfo.getFilePath)) {
+        storageType = StorageInfo.Type.OSS
+      } else if (Utils.isS3Path(pbFileInfo.getFilePath)) {
+        storageType = StorageInfo.Type.S3
+      }
+    } else {
+      storageType = StorageInfo.typesMap.get(pbFileInfo.getStorageType)
+    }
     new DiskFileInfo(
       userIdentifier,
       pbFileInfo.getPartitionSplitEnabled,
       meta,
       pbFileInfo.getFilePath,
+      storageType,
       pbFileInfo.getBytesFlushed)
   }
 
@@ -141,6 +154,7 @@ object PbSerDeUtils {
       .setUserIdentifier(toPbUserIdentifier(fileInfo.getUserIdentifier))
       .setBytesFlushed(fileInfo.getFileLength)
       .setPartitionSplitEnabled(fileInfo.isPartitionSplitEnabled)
+      .setStorageType(fileInfo.getStorageType.getValue)
     if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
       val mapFileMeta = fileInfo.getFileMeta.asInstanceOf[MapFileMeta]
       builder.setPartitionType(PartitionType.MAP.getValue)
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index fa2ebbfbb..5b8fe9979 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -31,12 +31,12 @@ import 
org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.meta._
-import org.apache.celeborn.common.protocol.{PartitionLocation, 
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
+import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType, 
PbFileInfo, PbPackedWorkerResource, PbWorkerResource, StorageInfo}
 import org.apache.celeborn.common.protocol.PartitionLocation.Mode
 import org.apache.celeborn.common.protocol.message.{ControlMessages, 
StatusCode}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
 WorkerResource}
 import org.apache.celeborn.common.quota.ResourceConsumption
-import 
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
 toPbPackedPartitionLocationsPair}
+import 
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
 toPbPackedPartitionLocationsPair, toPbUserIdentifier}
 import org.apache.celeborn.common.write.LocationPushFailedBatches
 
 class PbSerDeUtilsTest extends CelebornFunSuite {
@@ -51,6 +51,10 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
 
   val file1 = new File("/mnt/disk/1")
   val file2 = new File("/mnt/disk/2")
+  val file3 = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
+  val file4 = "oss://xxxx/xx-xx/x-x-x"
+  val file5 = "s3a://xxxx/xx-xx/x-x-x"
+  val file6 = "s3://xxxx/xx-xx/x-x-x"
   val files = List(file1, file2)
 
   val device = new DeviceInfo("device-a")
@@ -65,33 +69,68 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
 
   val userIdentifier1 = UserIdentifier("tenant-a", "user-a")
   val userIdentifier2 = UserIdentifier("tenant-b", "user-b")
+  val userIdentifier3 = UserIdentifier("tenant-c", "user-c")
 
   val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
   val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)
+  val chunkOffsets3 = util.Arrays.asList[java.lang.Long](3000L, 8000L, 12000L)
 
   val fileInfo1 = new DiskFileInfo(
     userIdentifier1,
     true,
     new ReduceFileMeta(chunkOffsets1, 123),
     file1.getAbsolutePath,
+    StorageInfo.Type.HDD,
     3000L)
   val fileInfo2 = new DiskFileInfo(
     userIdentifier2,
     true,
     new ReduceFileMeta(chunkOffsets2, 123),
     file2.getAbsolutePath,
+    StorageInfo.Type.SSD,
     6000L)
+  val fileInfo3 = new DiskFileInfo(
+    userIdentifier3,
+    true,
+    new ReduceFileMeta(chunkOffsets3, 123),
+    file3,
+    StorageInfo.Type.HDFS,
+    6000L)
+  val fileInfo4 = new DiskFileInfo(
+    userIdentifier3,
+    true,
+    new ReduceFileMeta(chunkOffsets3, 123),
+    file4,
+    StorageInfo.Type.OSS,
+    6000L)
+  val fileInfo5 = new DiskFileInfo(
+    userIdentifier3,
+    true,
+    new ReduceFileMeta(chunkOffsets3, 123),
+    file5,
+    StorageInfo.Type.S3,
+    6000L)
+  val fileInfo6 = new DiskFileInfo(
+    userIdentifier3,
+    true,
+    new ReduceFileMeta(chunkOffsets3, 123),
+    file6,
+    StorageInfo.Type.S3,
+    6000L)
+
   val mapFileInfo1 = new DiskFileInfo(
     userIdentifier1,
     true,
     new MapFileMeta(1024, 10),
     file1.getAbsolutePath,
+    StorageInfo.Type.HDD,
     6000L)
   val mapFileInfo2 = new DiskFileInfo(
     userIdentifier2,
     true,
     new MapFileMeta(1024, 10),
     file2.getAbsolutePath,
+    StorageInfo.Type.SSD,
     6000L)
   val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]()
   mapFileInfo1.setMountPoint("/mnt")
@@ -99,8 +138,14 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
 
   fileInfoMap.put("file1", fileInfo1)
   fileInfoMap.put("file2", fileInfo2)
+  fileInfoMap.put("file3", fileInfo3)
+  fileInfoMap.put("file4", fileInfo4)
+  fileInfoMap.put("file5", fileInfo5)
+  fileInfoMap.put("file6", fileInfo6)
+
   fileInfoMap.put("mapFile1", mapFileInfo1)
   fileInfoMap.put("mapFile2", mapFileInfo2)
+
   val mountPoints = new util.HashSet[String]
   mountPoints.add("/mnt")
   val cache = JavaUtils.newConcurrentHashMap[String, UserIdentifier]()
@@ -289,6 +334,81 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     assert(restoredFileInfo2.getMountPoint.equals(mapFileInfo2.getMountPoint))
   }
 
+  test("fromAndToPBFileInfoStorageType") {
+    val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
+    val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, 
cache, mountPoints)
+    val restoredFileInfo1 = restoredFileInfoMap.get("file1")
+    val restoredFileInfo2 = restoredFileInfoMap.get("file2")
+    val restoredFileInfo3 = restoredFileInfoMap.get("file3")
+    val restoredFileInfo4 = restoredFileInfoMap.get("mapFile1")
+    val restoredFileInfo5 = restoredFileInfoMap.get("mapFile2")
+    val restoredFileInfo6 = restoredFileInfoMap.get("file4")
+    val restoredFileInfo7 = restoredFileInfoMap.get("file5")
+    val restoredFileInfo8 = restoredFileInfoMap.get("file6")
+
+    assert(restoredFileInfo1.getStorageType.equals(fileInfo1.getStorageType))
+    assert(restoredFileInfo1.getStorageType.equals(StorageInfo.Type.HDD))
+    assert(restoredFileInfo2.getStorageType.equals(fileInfo2.getStorageType))
+    assert(restoredFileInfo2.getStorageType.equals(StorageInfo.Type.SSD))
+    assert(restoredFileInfo3.getStorageType.equals(fileInfo3.getStorageType))
+    assert(restoredFileInfo3.getStorageType.equals(StorageInfo.Type.HDFS))
+
+    
assert(restoredFileInfo4.getStorageType.equals(mapFileInfo1.getStorageType))
+    assert(restoredFileInfo4.getStorageType.equals(StorageInfo.Type.HDD))
+    
assert(restoredFileInfo5.getStorageType.equals(mapFileInfo2.getStorageType))
+    assert(restoredFileInfo5.getStorageType.equals(StorageInfo.Type.SSD))
+
+    assert(restoredFileInfo6.getStorageType.equals(fileInfo4.getStorageType))
+    assert(restoredFileInfo6.getStorageType.equals(StorageInfo.Type.OSS))
+    assert(restoredFileInfo7.getStorageType.equals(fileInfo5.getStorageType))
+    assert(restoredFileInfo7.getStorageType.equals(StorageInfo.Type.S3))
+    assert(restoredFileInfo7.getFilePath.equals(fileInfo5.getFilePath))
+    assert(restoredFileInfo8.getStorageType.equals(fileInfo6.getStorageType))
+    assert(restoredFileInfo8.getStorageType.equals(StorageInfo.Type.S3))
+    assert(restoredFileInfo8.getFilePath.equals(fileInfo6.getFilePath))
+  }
+
+  /**
+   * Test PbFileInfo storageType compatible
+   */
+  def testCreateDiskFileInfoFromPbFileInfo(diskFileInfo: DiskFileInfo): 
DiskFileInfo = {
+    val builder = PbFileInfo.newBuilder
+      .setFilePath(diskFileInfo.getFilePath)
+      .setUserIdentifier(toPbUserIdentifier(diskFileInfo.getUserIdentifier))
+      .setBytesFlushed(diskFileInfo.getFileLength)
+      .setPartitionSplitEnabled(diskFileInfo.isPartitionSplitEnabled)
+    val reduceFileMeta = diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]
+    builder.setPartitionType(PartitionType.REDUCE.getValue)
+    builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets)
+    val pbFileInfo = builder.build()
+    PbSerDeUtils.fromPbFileInfo(pbFileInfo)
+  }
+
+  test("Test PbFileInfo storageType compatible") {
+    val restoredFileInfo1 = testCreateDiskFileInfoFromPbFileInfo(fileInfo1)
+    assert(restoredFileInfo1.getStorageType.equals(fileInfo1.getStorageType))
+    assert(!restoredFileInfo1.isDFS)
+
+    val restoredFileInfo2 = testCreateDiskFileInfoFromPbFileInfo(fileInfo2)
+    assert(!restoredFileInfo2.isDFS)
+
+    val restoredFileInfo3 = testCreateDiskFileInfoFromPbFileInfo(fileInfo3)
+    assert(restoredFileInfo3.getStorageType.equals(fileInfo3.getStorageType))
+    assert(restoredFileInfo3.isDFS && restoredFileInfo3.isHdfs)
+
+    val restoredFileInfo4 = testCreateDiskFileInfoFromPbFileInfo(fileInfo4)
+    assert(restoredFileInfo4.getStorageType.equals(fileInfo4.getStorageType))
+    assert(restoredFileInfo4.isDFS && restoredFileInfo4.isOSS)
+
+    val restoredFileInfo5 = testCreateDiskFileInfoFromPbFileInfo(fileInfo5)
+    assert(restoredFileInfo5.getStorageType.equals(fileInfo5.getStorageType))
+    assert(restoredFileInfo5.isDFS && restoredFileInfo5.isS3)
+
+    val restoredFileInfo6 = testCreateDiskFileInfoFromPbFileInfo(fileInfo6)
+    assert(restoredFileInfo6.getStorageType.equals(fileInfo6.getStorageType))
+    assert(restoredFileInfo6.isDFS && restoredFileInfo6.isS3)
+  }
+
   test("fromAndToPbUserIdentifier") {
     val pbUserIdentifier = PbSerDeUtils.toPbUserIdentifier(userIdentifier1)
     val restoredUserIdentifier = 
PbSerDeUtils.fromPbUserIdentifier(pbUserIdentifier)

Reply via email to