This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ca15eea8b [CELEBORN-1511] Add support for custom master endpoint 
resolver
ca15eea8b is described below

commit ca15eea8be21f7e0003c884d81a03c1b3c70e631
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Aug 7 08:12:41 2024 +0800

    [CELEBORN-1511] Add support for custom master endpoint resolver
    
    ### What changes were proposed in this pull request?
    
    Proposing to add a master endpoint resolver, which makes the master 
endpoints discovery extensible and users can leverage this to create and pass 
different types of resolver which best fit to their need.
    
    ### Why are the changes needed?
    
    Currently celeborn support passing master endpoints by these celeborn 
configs `celeborn.master.endpoints` and `celeborn.master.internal.endpoints` 
and the allowed pattern for these configs `<host1>:<port1>[,<host2>:<port2>]*`. 
Workers and Clients both use above configs to connect with master.
    
    The problem with this approach is that currently it takes static host or IP 
or domain address which can change over time for a long running worker or 
client. Ex – Master node going down, domain UUID changed. In our infra this 
discovery is done by a passing a service group which actively watch the nodes 
for master service and but there is no way to make it work with celeborn as 
currently celeborn only works with static addresses.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Default behaviour will remain same but user can now pass their own master 
endpoint resolver.
    
    ### How was this patch tested?
    
    Added new UTs
    
    Closes #2629 from s0nskar/masterresolver.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/common/client/MasterClient.java       | 41 +++++-----
 .../org/apache/celeborn/common/CelebornConf.scala  | 45 ++++++-----
 .../common/client/MasterEndpointResolver.scala     | 56 +++++++++++++
 .../client/StaticMasterEndpointResolver.scala      | 53 +++++++++++++
 .../org/apache/celeborn/common/util/Utils.scala    | 15 ++++
 .../apache/celeborn/common/CelebornConfSuite.scala | 31 +++++---
 .../client/MasterEndpointResolverSuite.scala       | 92 ++++++++++++++++++++++
 .../client/StaticMasterEndpointResolverSuite.scala | 74 +++++++++++++++++
 .../apache/celeborn/common/util/UtilsSuite.scala   | 26 ++++++
 docs/configuration/client.md                       |  3 +-
 docs/configuration/worker.md                       |  3 +-
 11 files changed, 383 insertions(+), 56 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java 
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index df12fa520..7c39b2a5f 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -37,18 +37,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.RpcNameConstants;
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.OneWayMessageResponse$;
 import org.apache.celeborn.common.protocol.message.MasterRequestMessage;
 import org.apache.celeborn.common.protocol.message.Message;
 import org.apache.celeborn.common.rpc.*;
 import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.common.util.Utils;
 
 public class MasterClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(MasterClient.class);
 
   private final RpcEnv rpcEnv;
-  private final List<String> masterEndpoints;
+  private final MasterEndpointResolver masterEndpointResolver;
   private final int maxRetries;
 
   private final RpcTimeout rpcTimeout;
@@ -57,16 +57,15 @@ public class MasterClient {
   private final ExecutorService oneWayMessageSender;
   private final CelebornConf conf;
   private final boolean isWorker;
-  private String masterEndpointName;
 
   public MasterClient(RpcEnv rpcEnv, CelebornConf conf, boolean isWorker) {
     this.rpcEnv = rpcEnv;
     this.conf = conf;
     this.isWorker = isWorker;
-    this.masterEndpoints = resolveMasterEndpoints();
-    Collections.shuffle(this.masterEndpoints);
-    LOG.info("masterEndpoints = {}", masterEndpoints);
-    this.maxRetries = Math.max(masterEndpoints.size(), 
conf.masterClientMaxRetries());
+    this.masterEndpointResolver =
+        
Utils.instantiateMasterEndpointResolver(this.conf.masterEndpointResolver(), 
conf, isWorker);
+
+    this.maxRetries = conf.masterClientMaxRetries();
     this.rpcTimeout = conf.masterClientRpcAskTimeout();
     this.rpcEndpointRef = new AtomicReference<>();
     this.oneWayMessageSender =
@@ -228,12 +227,20 @@ public class MasterClient {
    */
   private RpcEndpointRef getOrSetupRpcEndpointRef(AtomicInteger currentIndex) {
     RpcEndpointRef endpointRef = rpcEndpointRef.get();
+
+    List<String> activeMasterEndpoints = 
masterEndpointResolver.getActiveMasterEndpoints();
+    // If endpoints are updated by MasterEndpointResolver, we should reset the 
currentIndex to 0.
+    // This also unset the value of updated, so we don't always reset 
currentIndex to 0.
+    if (masterEndpointResolver.getUpdatedAndReset()) {
+      currentIndex.set(0);
+    }
+
     if (endpointRef == null) {
       int index = currentIndex.get();
       do {
-        RpcEndpointRef tempEndpointRef = 
setupEndpointRef(masterEndpoints.get(index));
+        RpcEndpointRef tempEndpointRef = 
setupEndpointRef(activeMasterEndpoints.get(index));
         if (rpcEndpointRef.compareAndSet(null, tempEndpointRef)) {
-          index = (index + 1) % masterEndpoints.size();
+          index = (index + 1) % activeMasterEndpoints.size();
         }
         endpointRef = rpcEndpointRef.get();
       } while (endpointRef == null && index != currentIndex.get());
@@ -243,7 +250,7 @@ public class MasterClient {
       if (endpointRef == null) {
         throw new IllegalStateException(
             "After trying all the available Master Addresses("
-                + String.join(",", masterEndpoints)
+                + String.join(",", activeMasterEndpoints)
                 + "), an usable link still couldn't be created.");
       } else {
         LOG.info("connect to master {}.", endpointRef.address());
@@ -256,7 +263,8 @@ public class MasterClient {
     RpcEndpointRef endpointRef = null;
     try {
       endpointRef =
-          rpcEnv.setupEndpointRef(RpcAddress.fromHostAndPort(endpoint), 
masterEndpointName);
+          rpcEnv.setupEndpointRef(
+              RpcAddress.fromHostAndPort(endpoint), 
masterEndpointResolver.masterEndpointName());
     } catch (Exception e) {
       // Catch all exceptions. Because we don't care whether this exception is 
IOException or
       // TimeoutException or other exceptions, so we just try to connect to 
host:port, if fail,
@@ -265,15 +273,4 @@ public class MasterClient {
     }
     return endpointRef;
   }
-
-  private List<String> resolveMasterEndpoints() {
-    if (isWorker && conf.internalPortEnabled()) {
-      // For worker, we should use the internal endpoints if internal port is 
enabled.
-      masterEndpointName = RpcNameConstants.MASTER_INTERNAL_EP;
-      return Arrays.asList(conf.masterInternalEndpoints());
-    } else {
-      masterEndpointName = RpcNameConstants.MASTER_EP;
-      return Arrays.asList(conf.masterEndpoints());
-    }
-  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3d6326955..0a058d07d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -664,13 +664,9 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   // //////////////////////////////////////////////////////
   //               Address && HA && RATIS                //
   // //////////////////////////////////////////////////////
-  def masterEndpoints: Array[String] =
-    get(MASTER_ENDPOINTS).toArray.map { endpoint =>
-      Utils.parseHostPort(endpoint.replace("<localhost>", 
Utils.localHostName(this))) match {
-        case (host, 0) => s"$host:${HA_MASTER_NODE_PORT.defaultValue.get}"
-        case (host, port) => s"$host:$port"
-      }
-    }
+  def masterEndpoints: Array[String] = get(MASTER_ENDPOINTS).toArray
+
+  def masterEndpointResolver: String = get(MASTER_ENDPOINTS_RESOLVER)
 
   def masterClientRpcAskTimeout: RpcTimeout =
     new RpcTimeout(get(MASTER_CLIENT_RPC_ASK_TIMEOUT).milli, 
MASTER_CLIENT_RPC_ASK_TIMEOUT.key)
@@ -1432,13 +1428,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   // //////////////////////////////////////////////////////
   def internalPortEnabled: Boolean = get(INTERNAL_PORT_ENABLED)
 
-  def masterInternalEndpoints: Array[String] =
-    get(MASTER_INTERNAL_ENDPOINTS).toArray.map { endpoint =>
-      Utils.parseHostPort(endpoint.replace("<localhost>", 
Utils.localHostName(this))) match {
-        case (host, 0) => 
s"$host:${HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get}"
-        case (host, port) => s"$host:$port"
-      }
-    }
+  def masterInternalEndpoints: Array[String] = 
get(MASTER_INTERNAL_ENDPOINTS).toArray
 
   def haMasterNodeInternalPort(nodeId: String): Int = {
     val key = HA_MASTER_NODE_INTERNAL_PORT.key.replace("<id>", nodeId)
@@ -2108,18 +2098,33 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
+  val MASTER_ENDPOINTS_RESOLVER: ConfigEntry[String] =
+    buildConf("celeborn.master.endpoints.resolver")
+      .categories("client", "worker")
+      .doc("Resolver class that can be used for discovering and updating the 
master endpoints. This allows " +
+        "users to provide a custom master endpoint resolver implementation. 
This is useful in environments " +
+        "where the master nodes might change due to scaling operations or 
infrastructure updates. Clients " +
+        "need to ensure that provided resolver class should be present in the 
classpath.")
+      .version("0.6.0")
+      .stringConf
+      .checkValue(
+        resolver => Utils.classIsLoadable(resolver),
+        "Resolver class was not found in the classpath. Please check the class 
name " +
+          "and ensure that it is present in classpath")
+      
.createWithDefault("org.apache.celeborn.common.client.StaticMasterEndpointResolver")
+
   val MASTER_ENDPOINTS: ConfigEntry[Seq[String]] =
     buildConf("celeborn.master.endpoints")
       .categories("client", "worker")
-      .doc("Endpoints of master nodes for celeborn client to connect, allowed 
pattern " +
-        "is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. 
`clb1:9097,clb2:9098,clb3:9099`. " +
-        "If the port is omitted, 9097 will be used.")
+      .doc("Endpoints of master nodes for celeborn clients to connect. Client 
uses resolver provided by" +
+        s"${MASTER_ENDPOINTS_RESOLVER.key} to resolve the master endpoints. By 
default Celeborn uses " +
+        "`org.apache.celeborn.common.client.StaticMasterEndpointResolver` 
which take static master endpoints " +
+        "as input. Allowed pattern: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. 
`clb1:9097,clb2:9098,clb3:9099`. " +
+        "If the port is omitted, 9097 will be used. If the master endpoints 
are not static then users can pass " +
+        s"custom resolver implementation to discover master endpoints actively 
using ${MASTER_ENDPOINTS_RESOLVER.key}.")
       .version("0.2.0")
       .stringConf
       .toSequence
-      .checkValue(
-        endpoints => endpoints.map(_ => 
Try(Utils.parseHostPort(_))).forall(_.isSuccess),
-        "Allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`")
       .createWithDefaultString(s"<localhost>:9097")
 
   val MASTER_CLIENT_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/client/MasterEndpointResolver.scala
 
b/common/src/main/scala/org/apache/celeborn/common/client/MasterEndpointResolver.scala
new file mode 100644
index 000000000..7b72cd048
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/client/MasterEndpointResolver.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.RpcNameConstants
+
+abstract class MasterEndpointResolver(
+    private val conf: CelebornConf,
+    private val isWorker: Boolean) extends Logging {
+
+  protected var activeMasterEndpoints: Option[List[String]] = None
+  protected val updated = new AtomicBoolean(false)
+
+  val masterEndpointName: String =
+    if (isWorker && conf.internalPortEnabled) {
+      // For worker, we should use the internal endpoints if internal port is 
enabled.
+      RpcNameConstants.MASTER_INTERNAL_EP
+    } else {
+      RpcNameConstants.MASTER_EP
+    }
+
+  if (masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP) {
+    resolve(conf.masterInternalEndpoints)
+  } else {
+    resolve(conf.masterEndpoints)
+  }
+
+  def getUpdatedAndReset(): Boolean = updated.compareAndSet(true, false)
+
+  def getActiveMasterEndpoints: java.util.List[String] = 
activeMasterEndpoints.get.asJava
+
+  protected def resolve(endpoints: Array[String]): Unit
+
+  protected def update(endpoints: Array[String]): Unit
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolver.scala
 
b/common/src/main/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolver.scala
new file mode 100644
index 000000000..88f8352e5
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolver.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.{HA_MASTER_NODE_INTERNAL_PORT, 
HA_MASTER_NODE_PORT}
+import org.apache.celeborn.common.protocol.RpcNameConstants
+import org.apache.celeborn.common.util.Utils
+
+class StaticMasterEndpointResolver(conf: CelebornConf, isWorker: Boolean)
+  extends MasterEndpointResolver(conf, isWorker) {
+
+  override def resolve(endpoints: Array[String]): Unit = {
+    val haMasterPort =
+      if (masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP) {
+        HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get
+      } else {
+        HA_MASTER_NODE_PORT.defaultValue.get
+      }
+
+    this.activeMasterEndpoints = Some(endpoints.map { endpoint =>
+      Utils.parseHostPort(endpoint.replace("<localhost>", 
Utils.localHostName(conf))) match {
+        case (host, 0) => s"$host:$haMasterPort"
+        case (host, port) => s"$host:$port"
+      }
+    }.toList)
+
+    Random.shuffle(this.activeMasterEndpoints.get)
+    logInfo(s"masterEndpoints = ${activeMasterEndpoints.get}")
+  }
+
+  override def update(endpoints: Array[String]): Unit = {}
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index d390561d7..44a7d5c2e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -52,6 +52,7 @@ import org.apache.celeborn.common.network.util.TransportConf
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, TransportModuleConstants}
 import org.apache.celeborn.common.protocol.message.{ControlMessages, Message, 
StatusCode}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
+import org.apache.celeborn.reflect.DynConstructors
 
 object Utils extends Logging {
 
@@ -571,6 +572,20 @@ object Utils extends Logging {
     // scalastyle:on classforname
   }
 
+  def instantiateMasterEndpointResolver[T](
+      className: String,
+      conf: CelebornConf,
+      isWorker: Boolean): T = {
+    try {
+      DynConstructors.builder().impl(className, classOf[CelebornConf], 
java.lang.Boolean.TYPE)
+        .build[T]()
+        .newInstance(conf, java.lang.Boolean.valueOf(isWorker))
+    } catch {
+      case e: Throwable =>
+        throw new CelebornException(s"Failed to instantiate 
masterEndpointResolver $className.", e)
+    }
+  }
+
   def getCodeSourceLocation(clazz: Class[_]): String = {
     new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI).getPath
   }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 6da252d9d..54497e6ee 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -158,18 +158,6 @@ class CelebornConfSuite extends CelebornFunSuite {
       "Compression level for Zstd compression codec should be an integer 
between -5 and 22."))
   }
 
-  test("replace <localhost> placeholder") {
-    val conf = new CelebornConf()
-    val replacedHost = conf.masterHost
-    assert(!replacedHost.contains("<localhost>"))
-    assert(replacedHost === Utils.localHostName(conf))
-    val replacedHosts = conf.masterEndpoints
-    replacedHosts.foreach { replacedHost =>
-      assert(!replacedHost.contains("<localhost>"))
-      assert(replacedHost contains Utils.localHostName(conf))
-    }
-  }
-
   test("extract masterNodeIds") {
     val conf = new CelebornConf()
       .set("celeborn.master.ha.node.id", "1")
@@ -453,4 +441,23 @@ class CelebornConfSuite extends CelebornFunSuite {
     }
   }
 
+  test("CELEBORN-1511: Test master endpoint resolver") {
+    val conf = new CelebornConf()
+
+    val validResolverClass = 
"org.apache.celeborn.common.client.StaticMasterEndpointResolver"
+    val invalidResolverClass = "org.apache.celeborn.UnknownClass"
+
+    conf.set(MASTER_ENDPOINTS_RESOLVER.key, validResolverClass)
+    assert(conf.masterEndpointResolver == validResolverClass)
+
+    try {
+      conf.set(MASTER_ENDPOINTS_RESOLVER.key, invalidResolverClass)
+      val _ = conf.masterEndpointResolver
+    } catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[IllegalArgumentException])
+        assert(e.getMessage.contains("Resolver class was not found in the 
classpath."))
+    }
+  }
+
 }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/client/MasterEndpointResolverSuite.scala
 
b/common/src/test/scala/org/apache/celeborn/common/client/MasterEndpointResolverSuite.scala
new file mode 100644
index 000000000..c89c9ac07
--- /dev/null
+++ 
b/common/src/test/scala/org/apache/celeborn/common/client/MasterEndpointResolverSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.RpcNameConstants
+
+class DummyMasterEndpointResolver(conf: CelebornConf, isWorker: Boolean)
+  extends MasterEndpointResolver(conf, isWorker) {
+
+  override def resolve(endpoints: Array[String]): Unit = {
+    this.activeMasterEndpoints = Some(endpoints.toList)
+  }
+
+  override def update(endpoints: Array[String]): Unit = {
+    this.activeMasterEndpoints = Some(endpoints.toList)
+    updated.set(true)
+  }
+
+  def updateTest(endpoints: Array[String]): Unit = {
+    update(endpoints)
+  }
+}
+
+class MasterEndpointResolverSuite extends CelebornFunSuite {
+
+  test("resolve") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.MASTER_ENDPOINTS.key, "clb-1:1234")
+    val resolver = new DummyMasterEndpointResolver(conf, false)
+
+    assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+    assert(!resolver.getUpdatedAndReset())
+    // isUpdated should be not be reset if isUpdated returns false
+    assert(!resolver.getUpdatedAndReset())
+    assert(resolver.getActiveMasterEndpoints.size == 1)
+    assert(resolver.getActiveMasterEndpoints.get(0) == "clb-1:1234")
+
+    resolver.updateTest(Array("clb-2:1234"))
+    assert(resolver.getUpdatedAndReset())
+    // isUpdated should be reset after calling isUpdated once
+    assert(!resolver.getUpdatedAndReset())
+    assert(resolver.getActiveMasterEndpoints.size == 1)
+    assert(resolver.getActiveMasterEndpoints.get(0) == "clb-2:1234")
+  }
+
+  test("resolve with internal port enabled and isWorker = false") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.MASTER_ENDPOINTS.key, "clb-1:1234")
+    conf.set(CelebornConf.MASTER_INTERNAL_ENDPOINTS.key, "clb-internal-1:1234")
+    conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+
+    val resolver = new DummyMasterEndpointResolver(conf, false)
+
+    assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+    assert(resolver.getActiveMasterEndpoints.size == 1)
+    assert(resolver.getActiveMasterEndpoints.get(0) == "clb-1:1234")
+  }
+
+  test("resolve with internal port enabled and isWorker = true") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+    conf.set(CelebornConf.MASTER_ENDPOINTS.key, "clb-1:1234")
+    conf.set(CelebornConf.MASTER_INTERNAL_ENDPOINTS.key, "clb-internal-1:1234")
+
+    val resolver = new DummyMasterEndpointResolver(conf, true)
+
+    assert(resolver.masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP)
+    assert(resolver.getActiveMasterEndpoints.size == 1)
+    assert(resolver.getActiveMasterEndpoints.get(0) == "clb-internal-1:1234")
+  }
+}
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolverSuite.scala
 
b/common/src/test/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolverSuite.scala
new file mode 100644
index 000000000..aacf0bf76
--- /dev/null
+++ 
b/common/src/test/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolverSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.RpcNameConstants
+import org.apache.celeborn.common.util.Utils
+
+class StaticMasterEndpointResolverSuite extends CelebornFunSuite {
+
+  test("resolve with default configs") {
+    val conf = new CelebornConf()
+    val resolver = new StaticMasterEndpointResolver(conf, false)
+
+    assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+    assert(!resolver.getUpdatedAndReset())
+    assert(resolver.getActiveMasterEndpoints.size == 1)
+    assert(resolver.getActiveMasterEndpoints.get(0) == 
s"${Utils.localHostName(conf)}:9097")
+  }
+
+  test("resolve with internal port enabled and isWorker = false") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+    conf.set(CelebornConf.MASTER_ENDPOINTS.key, 
"clb-1:1234,clb-2,<localhost>:9097")
+    conf.set(
+      CelebornConf.MASTER_INTERNAL_ENDPOINTS.key,
+      "clb-internal-1:1234,clb-internal-2,<localhost>:9097")
+
+    val resolver = new StaticMasterEndpointResolver(conf, false)
+
+    assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+    assert(!resolver.getUpdatedAndReset())
+    assert(resolver.getActiveMasterEndpoints.size == 3)
+    assert(resolver.getActiveMasterEndpoints.contains("clb-1:1234"))
+    assert(resolver.getActiveMasterEndpoints.contains(
+      s"clb-2:${CelebornConf.HA_MASTER_NODE_PORT.defaultValue.get}"))
+    
assert(resolver.getActiveMasterEndpoints.contains(s"${Utils.localHostName(conf)}:9097"))
+  }
+
+  test("resolve with internal port enabled and isWorker = true") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+    conf.set(CelebornConf.MASTER_ENDPOINTS.key, 
"clb-1:1234,clb-2,<localhost>:9097")
+    conf.set(
+      CelebornConf.MASTER_INTERNAL_ENDPOINTS.key,
+      "clb-internal-1:1234,clb-internal-2,<localhost>:8097")
+
+    val resolver = new StaticMasterEndpointResolver(conf, true)
+
+    assert(resolver.masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP)
+    assert(!resolver.getUpdatedAndReset())
+    assert(resolver.getActiveMasterEndpoints.size == 3)
+    assert(resolver.getActiveMasterEndpoints.contains("clb-internal-1:1234"))
+    assert(resolver.getActiveMasterEndpoints.contains(
+      
s"clb-internal-2:${CelebornConf.HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get}"))
+    
assert(resolver.getActiveMasterEndpoints.contains(s"${Utils.localHostName(conf)}:8097"))
+  }
+}
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index 6abf91357..a4ee14573 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -21,6 +21,8 @@ import java.util
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.client.{MasterEndpointResolver, 
StaticMasterEndpointResolver}
+import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
TransportModuleConstants}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
 MapperEnd}
 import org.apache.celeborn.common.protocol.message.StatusCode
@@ -102,6 +104,30 @@ class UtilsSuite extends CelebornFunSuite {
     assert(false == Utils.classIsLoadable("a.b.c.d.e.f"))
   }
 
+  test("instantiateMasterEndpointResolver") {
+    val celebornConf = new CelebornConf()
+    val masterEndpointResolver = 
Utils.instantiateMasterEndpointResolver[MasterEndpointResolver](
+      celebornConf.masterEndpointResolver,
+      celebornConf,
+      isWorker = true)
+
+    assert(masterEndpointResolver.isInstanceOf[MasterEndpointResolver])
+    assert(masterEndpointResolver.isInstanceOf[StaticMasterEndpointResolver])
+  }
+
+  test("instantiateMasterEndpointResolver invalid resolver classname") {
+    val celebornConf = new CelebornConf()
+    val invalidClassName = "invalidClassName"
+
+    val e = intercept[CelebornException] {
+      Utils.instantiateMasterEndpointResolver[MasterEndpointResolver](
+        invalidClassName,
+        celebornConf,
+        isWorker = true)
+    }
+    assert(s"Failed to instantiate masterEndpointResolver $invalidClassName." 
=== e.getMessage)
+  }
+
   test("splitPartitionLocationUniqueId") {
     assert((1, 1).equals(Utils.splitPartitionLocationUniqueId("1-1")))
   }
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index eda8edce2..e882449ab 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -115,7 +115,8 @@ license: |
 | celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn 
supports the following kind of fallback policies. 1. ALWAYS: always use spark 
built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle 
implementation, and fallback to use spark built-in shuffle implementation based 
on certain factors, e.g. availability of enough workers and quota, shuffle 
partition number; 3. NEVER: always use celeborn shuffle implementation, and 
fail fast when it it is concluded th [...]
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always 
use spark built-in shuffle implementation. This configuration is deprecated, 
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 
0.3.0 | celeborn.shuffle.forceFallback.enabled | 
 | celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. 
This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | 
celeborn.shuffle.writer | 
-| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 |  | 
+| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn clients to connect. Client uses resolver provided 
byceleborn.master.endpoints.resolver to resolve the master endpoints. By 
default Celeborn uses 
`org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take 
static master endpoints as input. Allowed pattern: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. [...]
+| celeborn.master.endpoints.resolver | 
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | 
Resolver class that can be used for discovering and updating the master 
endpoints. This allows users to provide a custom master endpoint resolver 
implementation. This is useful in environments where the master nodes might 
change due to scaling operations or infrastructure updates. Clients need to 
ensure that provided resolver class should be present in the classpath. | 0.6.0 
|  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service of Spark when Master side checks that there is no enough quota 
for current user. | 0.2.0 |  | 
 | celeborn.quota.identity.provider | 
org.apache.celeborn.common.identity.DefaultIdentityProvider | false | 
IdentityProvider class name. Default class is 
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: 
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will 
be obtained by UserGroupInformation.getUserName; 
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and 
tenant id are default values or user-specific values. | 0 [...]
 | celeborn.quota.identity.user-specific.tenant | default | false | Tenant id 
if celeborn.quota.identity.provider is 
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 |  | 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ed3d15454..1605f1f33 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -34,7 +34,8 @@ license: |
 | celeborn.dynamicConfig.store.fs.path | &lt;undefined&gt; | false | The path 
of dynamic config file for fs store backend. The file format should be yaml. 
The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 |  | 
 | celeborn.internal.port.enabled | false | false | Whether to create a 
internal port on Masters/Workers for inter-Masters/Workers communication. This 
is beneficial when SASL authentication is enforced for all interactions between 
clients and Celeborn Services, but the services can exchange messages without 
being subject to SASL authentication. | 0.5.0 |  | 
 | celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf 
for debugging purposes. | 0.5.0 |  | 
-| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 |  | 
+| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn clients to connect. Client uses resolver provided 
byceleborn.master.endpoints.resolver to resolve the master endpoints. By 
default Celeborn uses 
`org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take 
static master endpoints as input. Allowed pattern: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. [...]
+| celeborn.master.endpoints.resolver | 
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | 
Resolver class that can be used for discovering and updating the master 
endpoints. This allows users to provide a custom master endpoint resolver 
implementation. This is useful in environments where the master nodes might 
change due to scaling operations or infrastructure updates. Clients need to 
ensure that provided resolver class should be present in the classpath. | 0.6.0 
|  | 
 | celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore 
partition size smaller than this configuration of partition size for 
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | 
 | celeborn.master.internal.endpoints | &lt;localhost&gt;:8097 | false | 
Endpoints of master nodes just for celeborn workers to connect, allowed pattern 
is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:8097,clb2:8097,clb3:8097`. 
If the port is omitted, 8097 will be used. | 0.5.0 |  | 
 | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | 
Regex to decide which Celeborn configuration properties and environment 
variables in master and worker environments contain sensitive information. When 
this regex matches a property key or value, the value is redacted from the 
logging. | 0.5.0 |  | 


Reply via email to