This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 74423fbd6 [CELEBORN-1549] Fix networkLocation persistence into Ratis
74423fbd6 is described below

commit 74423fbd6d34a7e8fe2bcb2553c68f715135e603
Author: Aravind Patnam <[email protected]>
AuthorDate: Wed Aug 14 13:55:19 2024 +0800

    [CELEBORN-1549] Fix networkLocation persistence into Ratis
    
    ### What changes were proposed in this pull request?
    Fixing a bug where the `networkLocation` is not persisted in Ratis, and the 
master defaults to `DEFAULT_RACK` when it loads the snapshot. This was missed 
in https://github.com/apache/celeborn/pull/2367 unfortunately, and it came up 
during our stress testing internally.
    
    ### Why are the changes needed?
    Needed for custom network aware replication, so that networkLocation state 
is kept in snapshot file.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Updated unit test to ensure serde is correct.
    
    Closes #2669 from akpatnam25/CELEBORN-1549.
    
    Authored-by: Aravind Patnam <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      |  1 +
 .../org/apache/celeborn/common/CelebornConf.scala  |  9 ++++++++
 .../apache/celeborn/common/util/PbSerDeUtils.scala | 16 ++++++++++++++-
 .../celeborn/common/util/PbSerDeUtilsTest.scala    | 24 +++++++++++++---------
 docs/configuration/master.md                       |  1 +
 .../celeborn/service/deploy/master/Master.scala    |  2 ++
 6 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index ae57d4cda..d08feee4b 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -173,6 +173,7 @@ message PbWorkerInfo {
   repeated PbDiskInfo disks = 6;
   map<string, PbResourceConsumption> userResourceConsumption = 7;
   int32 internalPort = 8;
+  string networkLocation = 9;
 }
 
 message PbFileGroup {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c73331a67..1bbb93f28 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -770,6 +770,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def haMasterRatisSnapshotAutoTriggerThreshold: Long =
     get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
   def haMasterRatisSnapshotRetentionFileNum: Int = 
get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)
+
+  def masterPersistWorkerNetworkLocation: Boolean = 
get(MASTER_PERSIST_WORKER_NETWORK_LOCATION)
   def haRatisCustomConfigs: JMap[String, String] = {
     settings.asScala.filter(_._1.startsWith("celeborn.ratis")).toMap.asJava
   }
@@ -2621,6 +2623,13 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(3)
 
+  val MASTER_PERSIST_WORKER_NETWORK_LOCATION: ConfigEntry[Boolean] =
+    buildConf("celeborn.master.persist.workerNetworkLocation")
+      .categories("master")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val MASTER_SLOT_ASSIGN_POLICY: ConfigEntry[String] =
     buildConf("celeborn.master.slot.assign.policy")
       .withAlternative("celeborn.slots.assign.policy")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index a2cd8dbde..96f1ed2d5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -35,6 +35,12 @@ import org.apache.celeborn.common.util.{CollectionUtils => 
localCollectionUtils}
 
 object PbSerDeUtils {
 
+  private var masterPersistWorkerNetworkLocation: Boolean = false
+
+  def 
setMasterPersistWorkerNetworkLocation(masterPersistWorkerNetworkLocation: 
Boolean) = {
+    this.masterPersistWorkerNetworkLocation = 
masterPersistWorkerNetworkLocation
+  }
+
   @throws[InvalidProtocolBufferException]
   def fromPbSortedShuffleFileSet(data: Array[Byte]): util.Set[String] = {
     val pbSortedShuffleFileSet = PbSortedShuffleFileSet.parseFrom(data)
@@ -227,7 +233,7 @@ object PbSerDeUtils {
     }
     val userResourceConsumption =
       
PbSerDeUtils.fromPbUserResourceConsumption(pbWorkerInfo.getUserResourceConsumptionMap)
-    new WorkerInfo(
+    val workerInfo = new WorkerInfo(
       pbWorkerInfo.getHost,
       pbWorkerInfo.getRpcPort,
       pbWorkerInfo.getPushPort,
@@ -236,6 +242,10 @@ object PbSerDeUtils {
       pbWorkerInfo.getInternalPort,
       disks,
       userResourceConsumption)
+    if (masterPersistWorkerNetworkLocation) {
+      workerInfo.networkLocation_$eq(pbWorkerInfo.getNetworkLocation)
+    }
+    workerInfo
   }
 
   def toPbWorkerInfo(
@@ -249,6 +259,10 @@ object PbSerDeUtils {
       .setPushPort(workerInfo.pushPort)
       .setReplicatePort(workerInfo.replicatePort)
       .setInternalPort(workerInfo.internalPort)
+    if (masterPersistWorkerNetworkLocation) {
+      builder.setNetworkLocation(workerInfo.networkLocation)
+    }
+
     if (!eliminateUserResourceConsumption) {
       builder.putAllUserResourceConsumption(
         
PbSerDeUtils.toPbUserResourceConsumption(workerInfo.userResourceConsumption))
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index d2eea050e..184d49d14 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -101,6 +101,7 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
       1005,
       diskInfos,
       userResourceConsumption)
+  workerInfo1.networkLocation_$eq("/1")
   val workerInfo2 =
     new WorkerInfo(
       "localhost",
@@ -250,16 +251,19 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
   }
 
   test("fromAndToPbWorkerInfo") {
-    val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
-    val pbWorkerInfoWithEmptyResource = 
PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
-    val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
-    val restoredWorkerInfoWithEmptyResource =
-      PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)
-
-    assert(restoredWorkerInfo.equals(workerInfo1))
-    
assert(restoredWorkerInfoWithEmptyResource.userResourceConsumption.equals(new 
util.HashMap[
-      UserIdentifier,
-      ResourceConsumption]()))
+    Seq(false, true).foreach { b =>
+      PbSerDeUtils.setMasterPersistWorkerNetworkLocation(b)
+      val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
+      val pbWorkerInfoWithEmptyResource = 
PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
+      val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
+      val restoredWorkerInfoWithEmptyResource =
+        PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)
+
+      assert(restoredWorkerInfo.equals(workerInfo1))
+      
assert(restoredWorkerInfoWithEmptyResource.userResourceConsumption.equals(new 
util.HashMap[
+        UserIdentifier,
+        ResourceConsumption]()))
+    }
   }
 
   test("fromAndToPbPartitionLocation") {
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 537904f31..7b7b3c646 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -59,6 +59,7 @@ license: |
 | celeborn.master.http.spnego.principal | &lt;undefined&gt; | false | SPNego 
service principal, typical value would look like HTTP/[email protected]. SPNego 
service principal would be used when celeborn http authentication is enabled. 
This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 |  
| 
 | celeborn.master.http.stopTimeout | 5s | false | Master http server stop 
timeout. | 0.5.0 |  | 
 | celeborn.master.internal.port | 8097 | false | Internal port on the master 
where both workers and other master nodes connect. | 0.5.0 |  | 
+| celeborn.master.persist.workerNetworkLocation | false | false |  | 0.6.0 |  
| 
 | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 |  | 
 | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for 
refreshing the node rack information periodically. | 0.5.0 |  | 
 | celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads 
used by the Master to send ApplicationMeta to Workers. | 0.5.0 |  | 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index d43ece215..182ce03a7 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -83,6 +83,8 @@ private[celeborn] class Master(
   private val sendApplicationMetaThreads = 
conf.masterSendApplicationMetaThreads
   // Send ApplicationMeta to workers
   private var sendApplicationMetaExecutor: ExecutorService = _
+  private val masterPersistWorkerNetworkLocation = 
conf.masterPersistWorkerNetworkLocation
+  
PbSerDeUtils.setMasterPersistWorkerNetworkLocation(masterPersistWorkerNetworkLocation)
 
   if (conf.logCelebornConfEnabled) {
     logInfo(getConf)

Reply via email to