This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 783a7cc4f [CELEBORN-2236] Avoiding regular expressions for
DiskFileInfo storage type determination
783a7cc4f is described below
commit 783a7cc4fcedd57d62f6c876057700319163c7ee
Author: xxx <[email protected]>
AuthorDate: Fri Jan 2 20:03:23 2026 +0800
[CELEBORN-2236] Avoiding regular expressions for DiskFileInfo storage type
determination
### What changes were proposed in this pull request?
Avoiding regular expressions for DiskFileInfo storage type determination.
### Why are the changes needed?
When sending a heartbeat, the Worker iterates all FileInfo objects and uses
regex matching on a large number of them to check if the file is an HDFS file,
thus reducing processing efficiency.
<img width="2766" height="1456" alt="image"
src="https://github.com/user-attachments/assets/2dc075f6-562d-4467-a75c-4b6682ed866d"
/>
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3562 from xy2953396112/CELEBORN-2236.
Authored-by: xxx <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/common/meta/DiskFileInfo.java | 17 ++-
common/src/main/proto/TransportMessages.proto | 1 +
.../apache/celeborn/common/util/PbSerDeUtils.scala | 14 +++
.../celeborn/common/util/PbSerDeUtilsTest.scala | 124 ++++++++++++++++++++-
4 files changed, 149 insertions(+), 7 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index ab798eca3..d4571fa4b 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -57,10 +57,15 @@ public class DiskFileInfo extends FileInfo {
boolean partitionSplitEnabled,
FileMeta fileMeta,
String filePath,
+ StorageInfo.Type storageType,
long bytesFlushed) {
super(userIdentifier, partitionSplitEnabled, fileMeta);
this.filePath = filePath;
- this.storageType = StorageInfo.Type.HDD;
+ if (storageType != null) {
+ this.storageType = storageType;
+ } else {
+ this.storageType = StorageInfo.Type.HDD;
+ }
this.bytesFlushed = bytesFlushed;
}
@@ -150,19 +155,21 @@ public class DiskFileInfo extends FileInfo {
}
public boolean isHdfs() {
- return Utils.isHdfsPath(filePath);
+ return storageType == StorageInfo.Type.HDFS;
}
public boolean isS3() {
- return Utils.isS3Path(filePath);
+ return storageType == StorageInfo.Type.S3;
}
public boolean isOSS() {
- return Utils.isOssPath(filePath);
+ return storageType == StorageInfo.Type.OSS;
}
public boolean isDFS() {
- return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) ||
Utils.isHdfsPath(filePath);
+ return storageType == StorageInfo.Type.HDFS
+ || storageType == StorageInfo.Type.S3
+ || storageType == StorageInfo.Type.OSS;
}
public StorageInfo.Type getStorageType() {
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 160c7e962..a813a9e50 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -661,6 +661,7 @@ message PbFileInfo {
bool isSegmentGranularityVisible = 9;
map<int32, int32> partitionWritingSegment = 10;
repeated PbSegmentIndex segmentIndex = 11;
+ int32 storageType = 12;
}
message PbSegmentIndex {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 1e96ece3e..e9c407ce8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -114,11 +114,24 @@ object PbSerDeUtils {
fileMeta.setIsWriterClosed(true)
fileMeta
}
+ var storageType: StorageInfo.Type = null
+ if (pbFileInfo.getStorageType == 0) {
+ if (Utils.isHdfsPath(pbFileInfo.getFilePath)) {
+ storageType = StorageInfo.Type.HDFS
+ } else if (Utils.isOssPath(pbFileInfo.getFilePath)) {
+ storageType = StorageInfo.Type.OSS
+ } else if (Utils.isS3Path(pbFileInfo.getFilePath)) {
+ storageType = StorageInfo.Type.S3
+ }
+ } else {
+ storageType = StorageInfo.typesMap.get(pbFileInfo.getStorageType)
+ }
new DiskFileInfo(
userIdentifier,
pbFileInfo.getPartitionSplitEnabled,
meta,
pbFileInfo.getFilePath,
+ storageType,
pbFileInfo.getBytesFlushed)
}
@@ -141,6 +154,7 @@ object PbSerDeUtils {
.setUserIdentifier(toPbUserIdentifier(fileInfo.getUserIdentifier))
.setBytesFlushed(fileInfo.getFileLength)
.setPartitionSplitEnabled(fileInfo.isPartitionSplitEnabled)
+ .setStorageType(fileInfo.getStorageType.getValue)
if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
val mapFileMeta = fileInfo.getFileMeta.asInstanceOf[MapFileMeta]
builder.setPartitionType(PartitionType.MAP.getValue)
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index fa2ebbfbb..5b8fe9979 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -31,12 +31,12 @@ import
org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta._
-import org.apache.celeborn.common.protocol.{PartitionLocation,
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
+import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType,
PbFileInfo, PbPackedWorkerResource, PbWorkerResource, StorageInfo}
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.{ControlMessages,
StatusCode}
import
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
WorkerResource}
import org.apache.celeborn.common.quota.ResourceConsumption
-import
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
toPbPackedPartitionLocationsPair}
+import
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
toPbPackedPartitionLocationsPair, toPbUserIdentifier}
import org.apache.celeborn.common.write.LocationPushFailedBatches
class PbSerDeUtilsTest extends CelebornFunSuite {
@@ -51,6 +51,10 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val file1 = new File("/mnt/disk/1")
val file2 = new File("/mnt/disk/2")
+ val file3 = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
+ val file4 = "oss://xxxx/xx-xx/x-x-x"
+ val file5 = "s3a://xxxx/xx-xx/x-x-x"
+ val file6 = "s3://xxxx/xx-xx/x-x-x"
val files = List(file1, file2)
val device = new DeviceInfo("device-a")
@@ -65,33 +69,68 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val userIdentifier1 = UserIdentifier("tenant-a", "user-a")
val userIdentifier2 = UserIdentifier("tenant-b", "user-b")
+ val userIdentifier3 = UserIdentifier("tenant-c", "user-c")
val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)
+ val chunkOffsets3 = util.Arrays.asList[java.lang.Long](3000L, 8000L, 12000L)
val fileInfo1 = new DiskFileInfo(
userIdentifier1,
true,
new ReduceFileMeta(chunkOffsets1, 123),
file1.getAbsolutePath,
+ StorageInfo.Type.HDD,
3000L)
val fileInfo2 = new DiskFileInfo(
userIdentifier2,
true,
new ReduceFileMeta(chunkOffsets2, 123),
file2.getAbsolutePath,
+ StorageInfo.Type.SSD,
6000L)
+ val fileInfo3 = new DiskFileInfo(
+ userIdentifier3,
+ true,
+ new ReduceFileMeta(chunkOffsets3, 123),
+ file3,
+ StorageInfo.Type.HDFS,
+ 6000L)
+ val fileInfo4 = new DiskFileInfo(
+ userIdentifier3,
+ true,
+ new ReduceFileMeta(chunkOffsets3, 123),
+ file4,
+ StorageInfo.Type.OSS,
+ 6000L)
+ val fileInfo5 = new DiskFileInfo(
+ userIdentifier3,
+ true,
+ new ReduceFileMeta(chunkOffsets3, 123),
+ file5,
+ StorageInfo.Type.S3,
+ 6000L)
+ val fileInfo6 = new DiskFileInfo(
+ userIdentifier3,
+ true,
+ new ReduceFileMeta(chunkOffsets3, 123),
+ file6,
+ StorageInfo.Type.S3,
+ 6000L)
+
val mapFileInfo1 = new DiskFileInfo(
userIdentifier1,
true,
new MapFileMeta(1024, 10),
file1.getAbsolutePath,
+ StorageInfo.Type.HDD,
6000L)
val mapFileInfo2 = new DiskFileInfo(
userIdentifier2,
true,
new MapFileMeta(1024, 10),
file2.getAbsolutePath,
+ StorageInfo.Type.SSD,
6000L)
val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]()
mapFileInfo1.setMountPoint("/mnt")
@@ -99,8 +138,14 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
fileInfoMap.put("file1", fileInfo1)
fileInfoMap.put("file2", fileInfo2)
+ fileInfoMap.put("file3", fileInfo3)
+ fileInfoMap.put("file4", fileInfo4)
+ fileInfoMap.put("file5", fileInfo5)
+ fileInfoMap.put("file6", fileInfo6)
+
fileInfoMap.put("mapFile1", mapFileInfo1)
fileInfoMap.put("mapFile2", mapFileInfo2)
+
val mountPoints = new util.HashSet[String]
mountPoints.add("/mnt")
val cache = JavaUtils.newConcurrentHashMap[String, UserIdentifier]()
@@ -289,6 +334,81 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredFileInfo2.getMountPoint.equals(mapFileInfo2.getMountPoint))
}
+ test("fromAndToPBFileInfoStorageType") {
+ val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
+ val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap,
cache, mountPoints)
+ val restoredFileInfo1 = restoredFileInfoMap.get("file1")
+ val restoredFileInfo2 = restoredFileInfoMap.get("file2")
+ val restoredFileInfo3 = restoredFileInfoMap.get("file3")
+ val restoredFileInfo4 = restoredFileInfoMap.get("mapFile1")
+ val restoredFileInfo5 = restoredFileInfoMap.get("mapFile2")
+ val restoredFileInfo6 = restoredFileInfoMap.get("file4")
+ val restoredFileInfo7 = restoredFileInfoMap.get("file5")
+ val restoredFileInfo8 = restoredFileInfoMap.get("file6")
+
+ assert(restoredFileInfo1.getStorageType.equals(fileInfo1.getStorageType))
+ assert(restoredFileInfo1.getStorageType.equals(StorageInfo.Type.HDD))
+ assert(restoredFileInfo2.getStorageType.equals(fileInfo2.getStorageType))
+ assert(restoredFileInfo2.getStorageType.equals(StorageInfo.Type.SSD))
+ assert(restoredFileInfo3.getStorageType.equals(fileInfo3.getStorageType))
+ assert(restoredFileInfo3.getStorageType.equals(StorageInfo.Type.HDFS))
+
+
assert(restoredFileInfo4.getStorageType.equals(mapFileInfo1.getStorageType))
+ assert(restoredFileInfo4.getStorageType.equals(StorageInfo.Type.HDD))
+
assert(restoredFileInfo5.getStorageType.equals(mapFileInfo2.getStorageType))
+ assert(restoredFileInfo5.getStorageType.equals(StorageInfo.Type.SSD))
+
+ assert(restoredFileInfo6.getStorageType.equals(fileInfo4.getStorageType))
+ assert(restoredFileInfo6.getStorageType.equals(StorageInfo.Type.OSS))
+ assert(restoredFileInfo7.getStorageType.equals(fileInfo5.getStorageType))
+ assert(restoredFileInfo7.getStorageType.equals(StorageInfo.Type.S3))
+ assert(restoredFileInfo7.getFilePath.equals(fileInfo5.getFilePath))
+ assert(restoredFileInfo8.getStorageType.equals(fileInfo6.getStorageType))
+ assert(restoredFileInfo8.getStorageType.equals(StorageInfo.Type.S3))
+ assert(restoredFileInfo8.getFilePath.equals(fileInfo6.getFilePath))
+ }
+
+ /**
+ * Test PbFileInfo storageType compatible
+ */
+ def testCreateDiskFileInfoFromPbFileInfo(diskFileInfo: DiskFileInfo):
DiskFileInfo = {
+ val builder = PbFileInfo.newBuilder
+ .setFilePath(diskFileInfo.getFilePath)
+ .setUserIdentifier(toPbUserIdentifier(diskFileInfo.getUserIdentifier))
+ .setBytesFlushed(diskFileInfo.getFileLength)
+ .setPartitionSplitEnabled(diskFileInfo.isPartitionSplitEnabled)
+ val reduceFileMeta = diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]
+ builder.setPartitionType(PartitionType.REDUCE.getValue)
+ builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets)
+ val pbFileInfo = builder.build()
+ PbSerDeUtils.fromPbFileInfo(pbFileInfo)
+ }
+
+ test("Test PbFileInfo storageType compatible") {
+ val restoredFileInfo1 = testCreateDiskFileInfoFromPbFileInfo(fileInfo1)
+ assert(restoredFileInfo1.getStorageType.equals(fileInfo1.getStorageType))
+ assert(!restoredFileInfo1.isDFS)
+
+ val restoredFileInfo2 = testCreateDiskFileInfoFromPbFileInfo(fileInfo2)
+ assert(!restoredFileInfo2.isDFS)
+
+ val restoredFileInfo3 = testCreateDiskFileInfoFromPbFileInfo(fileInfo3)
+ assert(restoredFileInfo3.getStorageType.equals(fileInfo3.getStorageType))
+ assert(restoredFileInfo3.isDFS && restoredFileInfo3.isHdfs)
+
+ val restoredFileInfo4 = testCreateDiskFileInfoFromPbFileInfo(fileInfo4)
+ assert(restoredFileInfo4.getStorageType.equals(fileInfo4.getStorageType))
+ assert(restoredFileInfo4.isDFS && restoredFileInfo4.isOSS)
+
+ val restoredFileInfo5 = testCreateDiskFileInfoFromPbFileInfo(fileInfo5)
+ assert(restoredFileInfo5.getStorageType.equals(fileInfo5.getStorageType))
+ assert(restoredFileInfo5.isDFS && restoredFileInfo5.isS3)
+
+ val restoredFileInfo6 = testCreateDiskFileInfoFromPbFileInfo(fileInfo6)
+ assert(restoredFileInfo6.getStorageType.equals(fileInfo6.getStorageType))
+ assert(restoredFileInfo6.isDFS && restoredFileInfo6.isS3)
+ }
+
test("fromAndToPbUserIdentifier") {
val pbUserIdentifier = PbSerDeUtils.toPbUserIdentifier(userIdentifier1)
val restoredUserIdentifier =
PbSerDeUtils.fromPbUserIdentifier(pbUserIdentifier)