This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 74423fbd6 [CELEBORN-1549] Fix networkLocation persistence into Ratis
74423fbd6 is described below
commit 74423fbd6d34a7e8fe2bcb2553c68f715135e603
Author: Aravind Patnam <[email protected]>
AuthorDate: Wed Aug 14 13:55:19 2024 +0800
[CELEBORN-1549] Fix networkLocation persistence into Ratis
### What changes were proposed in this pull request?
Fixing a bug where the `networkLocation` is not persisted in Ratis, and the
master defaults to `DEFAULT_RACK` when it loads the snapshot. This was missed
in https://github.com/apache/celeborn/pull/2367 unfortunately, and it came up
during our stress testing internally.
### Why are the changes needed?
Needed for custom network aware replication, so that networkLocation state
is kept in snapshot file.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Updated unit test to ensure serde is correct.
Closes #2669 from akpatnam25/CELEBORN-1549.
Authored-by: Aravind Patnam <[email protected]>
Signed-off-by: mingji <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 1 +
.../org/apache/celeborn/common/CelebornConf.scala | 9 ++++++++
.../apache/celeborn/common/util/PbSerDeUtils.scala | 16 ++++++++++++++-
.../celeborn/common/util/PbSerDeUtilsTest.scala | 24 +++++++++++++---------
docs/configuration/master.md | 1 +
.../celeborn/service/deploy/master/Master.scala | 2 ++
6 files changed, 42 insertions(+), 11 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index ae57d4cda..d08feee4b 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -173,6 +173,7 @@ message PbWorkerInfo {
repeated PbDiskInfo disks = 6;
map<string, PbResourceConsumption> userResourceConsumption = 7;
int32 internalPort = 8;
+ string networkLocation = 9;
}
message PbFileGroup {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c73331a67..1bbb93f28 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -770,6 +770,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def haMasterRatisSnapshotAutoTriggerThreshold: Long =
get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
def haMasterRatisSnapshotRetentionFileNum: Int =
get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)
+
+ def masterPersistWorkerNetworkLocation: Boolean =
get(MASTER_PERSIST_WORKER_NETWORK_LOCATION)
def haRatisCustomConfigs: JMap[String, String] = {
settings.asScala.filter(_._1.startsWith("celeborn.ratis")).toMap.asJava
}
@@ -2621,6 +2623,13 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(3)
+ val MASTER_PERSIST_WORKER_NETWORK_LOCATION: ConfigEntry[Boolean] =
+ buildConf("celeborn.master.persist.workerNetworkLocation")
+ .categories("master")
+ .version("0.6.0")
+ .booleanConf
+ .createWithDefault(false)
+
val MASTER_SLOT_ASSIGN_POLICY: ConfigEntry[String] =
buildConf("celeborn.master.slot.assign.policy")
.withAlternative("celeborn.slots.assign.policy")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index a2cd8dbde..96f1ed2d5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -35,6 +35,12 @@ import org.apache.celeborn.common.util.{CollectionUtils =>
localCollectionUtils}
object PbSerDeUtils {
+ private var masterPersistWorkerNetworkLocation: Boolean = false
+
+ def
setMasterPersistWorkerNetworkLocation(masterPersistWorkerNetworkLocation:
Boolean) = {
+ this.masterPersistWorkerNetworkLocation =
masterPersistWorkerNetworkLocation
+ }
+
@throws[InvalidProtocolBufferException]
def fromPbSortedShuffleFileSet(data: Array[Byte]): util.Set[String] = {
val pbSortedShuffleFileSet = PbSortedShuffleFileSet.parseFrom(data)
@@ -227,7 +233,7 @@ object PbSerDeUtils {
}
val userResourceConsumption =
PbSerDeUtils.fromPbUserResourceConsumption(pbWorkerInfo.getUserResourceConsumptionMap)
- new WorkerInfo(
+ val workerInfo = new WorkerInfo(
pbWorkerInfo.getHost,
pbWorkerInfo.getRpcPort,
pbWorkerInfo.getPushPort,
@@ -236,6 +242,10 @@ object PbSerDeUtils {
pbWorkerInfo.getInternalPort,
disks,
userResourceConsumption)
+ if (masterPersistWorkerNetworkLocation) {
+ workerInfo.networkLocation_$eq(pbWorkerInfo.getNetworkLocation)
+ }
+ workerInfo
}
def toPbWorkerInfo(
@@ -249,6 +259,10 @@ object PbSerDeUtils {
.setPushPort(workerInfo.pushPort)
.setReplicatePort(workerInfo.replicatePort)
.setInternalPort(workerInfo.internalPort)
+ if (masterPersistWorkerNetworkLocation) {
+ builder.setNetworkLocation(workerInfo.networkLocation)
+ }
+
if (!eliminateUserResourceConsumption) {
builder.putAllUserResourceConsumption(
PbSerDeUtils.toPbUserResourceConsumption(workerInfo.userResourceConsumption))
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index d2eea050e..184d49d14 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -101,6 +101,7 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
1005,
diskInfos,
userResourceConsumption)
+ workerInfo1.networkLocation_$eq("/1")
val workerInfo2 =
new WorkerInfo(
"localhost",
@@ -250,16 +251,19 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
}
test("fromAndToPbWorkerInfo") {
- val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
- val pbWorkerInfoWithEmptyResource =
PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
- val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
- val restoredWorkerInfoWithEmptyResource =
- PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)
-
- assert(restoredWorkerInfo.equals(workerInfo1))
-
assert(restoredWorkerInfoWithEmptyResource.userResourceConsumption.equals(new
util.HashMap[
- UserIdentifier,
- ResourceConsumption]()))
+ Seq(false, true).foreach { b =>
+ PbSerDeUtils.setMasterPersistWorkerNetworkLocation(b)
+ val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
+ val pbWorkerInfoWithEmptyResource =
PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
+ val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
+ val restoredWorkerInfoWithEmptyResource =
+ PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)
+
+ assert(restoredWorkerInfo.equals(workerInfo1))
+
assert(restoredWorkerInfoWithEmptyResource.userResourceConsumption.equals(new
util.HashMap[
+ UserIdentifier,
+ ResourceConsumption]()))
+ }
}
test("fromAndToPbPartitionLocation") {
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 537904f31..7b7b3c646 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -59,6 +59,7 @@ license: |
| celeborn.master.http.spnego.principal | <undefined> | false | SPNego
service principal, typical value would look like HTTP/[email protected]. SPNego
service principal would be used when celeborn http authentication is enabled.
This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 |
|
| celeborn.master.http.stopTimeout | 5s | false | Master http server stop
timeout. | 0.5.0 | |
| celeborn.master.internal.port | 8097 | false | Internal port on the master
where both workers and other master nodes connect. | 0.5.0 | |
+| celeborn.master.persist.workerNetworkLocation | false | false | | 0.6.0 |
|
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for
refreshing the node rack information periodically. | 0.5.0 | |
| celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads
used by the Master to send ApplicationMeta to Workers. | 0.5.0 | |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index d43ece215..182ce03a7 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -83,6 +83,8 @@ private[celeborn] class Master(
private val sendApplicationMetaThreads =
conf.masterSendApplicationMetaThreads
// Send ApplicationMeta to workers
private var sendApplicationMetaExecutor: ExecutorService = _
+ private val masterPersistWorkerNetworkLocation =
conf.masterPersistWorkerNetworkLocation
+
PbSerDeUtils.setMasterPersistWorkerNetworkLocation(masterPersistWorkerNetworkLocation)
if (conf.logCelebornConfEnabled) {
logInfo(getConf)