This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ca15eea8b [CELEBORN-1511] Add support for custom master endpoint
resolver
ca15eea8b is described below
commit ca15eea8be21f7e0003c884d81a03c1b3c70e631
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Aug 7 08:12:41 2024 +0800
[CELEBORN-1511] Add support for custom master endpoint resolver
### What changes were proposed in this pull request?
Proposing to add a master endpoint resolver, which makes the master
endpoints discovery extensible and users can leverage this to create and pass
different types of resolver which best fit to their need.
### Why are the changes needed?
Currently celeborn support passing master endpoints by these celeborn
configs `celeborn.master.endpoints` and `celeborn.master.internal.endpoints`
and the allowed pattern for these configs `<host1>:<port1>[,<host2>:<port2>]*`.
Workers and Clients both use above configs to connect with master.
The problem with this approach is that currently it takes static host or IP
or domain address which can change over time for a long running worker or
client. Ex – Master node going down, domain UUID changed. In our infra this
discovery is done by a passing a service group which actively watch the nodes
for master service and but there is no way to make it work with celeborn as
currently celeborn only works with static addresses.
### Does this PR introduce _any_ user-facing change?
Default behaviour will remain same but user can now pass their own master
endpoint resolver.
### How was this patch tested?
Added new UTs
Closes #2629 from s0nskar/masterresolver.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/common/client/MasterClient.java | 41 +++++-----
.../org/apache/celeborn/common/CelebornConf.scala | 45 ++++++-----
.../common/client/MasterEndpointResolver.scala | 56 +++++++++++++
.../client/StaticMasterEndpointResolver.scala | 53 +++++++++++++
.../org/apache/celeborn/common/util/Utils.scala | 15 ++++
.../apache/celeborn/common/CelebornConfSuite.scala | 31 +++++---
.../client/MasterEndpointResolverSuite.scala | 92 ++++++++++++++++++++++
.../client/StaticMasterEndpointResolverSuite.scala | 74 +++++++++++++++++
.../apache/celeborn/common/util/UtilsSuite.scala | 26 ++++++
docs/configuration/client.md | 3 +-
docs/configuration/worker.md | 3 +-
11 files changed, 383 insertions(+), 56 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index df12fa520..7c39b2a5f 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -37,18 +37,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.RpcNameConstants;
import
org.apache.celeborn.common.protocol.message.ControlMessages.OneWayMessageResponse$;
import org.apache.celeborn.common.protocol.message.MasterRequestMessage;
import org.apache.celeborn.common.protocol.message.Message;
import org.apache.celeborn.common.rpc.*;
import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.common.util.Utils;
public class MasterClient {
private static final Logger LOG =
LoggerFactory.getLogger(MasterClient.class);
private final RpcEnv rpcEnv;
- private final List<String> masterEndpoints;
+ private final MasterEndpointResolver masterEndpointResolver;
private final int maxRetries;
private final RpcTimeout rpcTimeout;
@@ -57,16 +57,15 @@ public class MasterClient {
private final ExecutorService oneWayMessageSender;
private final CelebornConf conf;
private final boolean isWorker;
- private String masterEndpointName;
public MasterClient(RpcEnv rpcEnv, CelebornConf conf, boolean isWorker) {
this.rpcEnv = rpcEnv;
this.conf = conf;
this.isWorker = isWorker;
- this.masterEndpoints = resolveMasterEndpoints();
- Collections.shuffle(this.masterEndpoints);
- LOG.info("masterEndpoints = {}", masterEndpoints);
- this.maxRetries = Math.max(masterEndpoints.size(),
conf.masterClientMaxRetries());
+ this.masterEndpointResolver =
+
Utils.instantiateMasterEndpointResolver(this.conf.masterEndpointResolver(),
conf, isWorker);
+
+ this.maxRetries = conf.masterClientMaxRetries();
this.rpcTimeout = conf.masterClientRpcAskTimeout();
this.rpcEndpointRef = new AtomicReference<>();
this.oneWayMessageSender =
@@ -228,12 +227,20 @@ public class MasterClient {
*/
private RpcEndpointRef getOrSetupRpcEndpointRef(AtomicInteger currentIndex) {
RpcEndpointRef endpointRef = rpcEndpointRef.get();
+
+ List<String> activeMasterEndpoints =
masterEndpointResolver.getActiveMasterEndpoints();
+ // If endpoints are updated by MasterEndpointResolver, we should reset the
currentIndex to 0.
+ // This also unset the value of updated, so we don't always reset
currentIndex to 0.
+ if (masterEndpointResolver.getUpdatedAndReset()) {
+ currentIndex.set(0);
+ }
+
if (endpointRef == null) {
int index = currentIndex.get();
do {
- RpcEndpointRef tempEndpointRef =
setupEndpointRef(masterEndpoints.get(index));
+ RpcEndpointRef tempEndpointRef =
setupEndpointRef(activeMasterEndpoints.get(index));
if (rpcEndpointRef.compareAndSet(null, tempEndpointRef)) {
- index = (index + 1) % masterEndpoints.size();
+ index = (index + 1) % activeMasterEndpoints.size();
}
endpointRef = rpcEndpointRef.get();
} while (endpointRef == null && index != currentIndex.get());
@@ -243,7 +250,7 @@ public class MasterClient {
if (endpointRef == null) {
throw new IllegalStateException(
"After trying all the available Master Addresses("
- + String.join(",", masterEndpoints)
+ + String.join(",", activeMasterEndpoints)
+ "), an usable link still couldn't be created.");
} else {
LOG.info("connect to master {}.", endpointRef.address());
@@ -256,7 +263,8 @@ public class MasterClient {
RpcEndpointRef endpointRef = null;
try {
endpointRef =
- rpcEnv.setupEndpointRef(RpcAddress.fromHostAndPort(endpoint),
masterEndpointName);
+ rpcEnv.setupEndpointRef(
+ RpcAddress.fromHostAndPort(endpoint),
masterEndpointResolver.masterEndpointName());
} catch (Exception e) {
// Catch all exceptions. Because we don't care whether this exception is
IOException or
// TimeoutException or other exceptions, so we just try to connect to
host:port, if fail,
@@ -265,15 +273,4 @@ public class MasterClient {
}
return endpointRef;
}
-
- private List<String> resolveMasterEndpoints() {
- if (isWorker && conf.internalPortEnabled()) {
- // For worker, we should use the internal endpoints if internal port is
enabled.
- masterEndpointName = RpcNameConstants.MASTER_INTERNAL_EP;
- return Arrays.asList(conf.masterInternalEndpoints());
- } else {
- masterEndpointName = RpcNameConstants.MASTER_EP;
- return Arrays.asList(conf.masterEndpoints());
- }
- }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3d6326955..0a058d07d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -664,13 +664,9 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// //////////////////////////////////////////////////////
// Address && HA && RATIS //
// //////////////////////////////////////////////////////
- def masterEndpoints: Array[String] =
- get(MASTER_ENDPOINTS).toArray.map { endpoint =>
- Utils.parseHostPort(endpoint.replace("<localhost>",
Utils.localHostName(this))) match {
- case (host, 0) => s"$host:${HA_MASTER_NODE_PORT.defaultValue.get}"
- case (host, port) => s"$host:$port"
- }
- }
+ def masterEndpoints: Array[String] = get(MASTER_ENDPOINTS).toArray
+
+ def masterEndpointResolver: String = get(MASTER_ENDPOINTS_RESOLVER)
def masterClientRpcAskTimeout: RpcTimeout =
new RpcTimeout(get(MASTER_CLIENT_RPC_ASK_TIMEOUT).milli,
MASTER_CLIENT_RPC_ASK_TIMEOUT.key)
@@ -1432,13 +1428,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// //////////////////////////////////////////////////////
def internalPortEnabled: Boolean = get(INTERNAL_PORT_ENABLED)
- def masterInternalEndpoints: Array[String] =
- get(MASTER_INTERNAL_ENDPOINTS).toArray.map { endpoint =>
- Utils.parseHostPort(endpoint.replace("<localhost>",
Utils.localHostName(this))) match {
- case (host, 0) =>
s"$host:${HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get}"
- case (host, port) => s"$host:$port"
- }
- }
+ def masterInternalEndpoints: Array[String] =
get(MASTER_INTERNAL_ENDPOINTS).toArray
def haMasterNodeInternalPort(nodeId: String): Int = {
val key = HA_MASTER_NODE_INTERNAL_PORT.key.replace("<id>", nodeId)
@@ -2108,18 +2098,33 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
+ val MASTER_ENDPOINTS_RESOLVER: ConfigEntry[String] =
+ buildConf("celeborn.master.endpoints.resolver")
+ .categories("client", "worker")
+ .doc("Resolver class that can be used for discovering and updating the
master endpoints. This allows " +
+ "users to provide a custom master endpoint resolver implementation.
This is useful in environments " +
+ "where the master nodes might change due to scaling operations or
infrastructure updates. Clients " +
+ "need to ensure that provided resolver class should be present in the
classpath.")
+ .version("0.6.0")
+ .stringConf
+ .checkValue(
+ resolver => Utils.classIsLoadable(resolver),
+ "Resolver class was not found in the classpath. Please check the class
name " +
+ "and ensure that it is present in classpath")
+
.createWithDefault("org.apache.celeborn.common.client.StaticMasterEndpointResolver")
+
val MASTER_ENDPOINTS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.endpoints")
.categories("client", "worker")
- .doc("Endpoints of master nodes for celeborn client to connect, allowed
pattern " +
- "is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g.
`clb1:9097,clb2:9098,clb3:9099`. " +
- "If the port is omitted, 9097 will be used.")
+ .doc("Endpoints of master nodes for celeborn clients to connect. Client
uses resolver provided by" +
+ s"${MASTER_ENDPOINTS_RESOLVER.key} to resolve the master endpoints. By
default Celeborn uses " +
+ "`org.apache.celeborn.common.client.StaticMasterEndpointResolver`
which take static master endpoints " +
+ "as input. Allowed pattern: `<host1>:<port1>[,<host2>:<port2>]*`, e.g.
`clb1:9097,clb2:9098,clb3:9099`. " +
+ "If the port is omitted, 9097 will be used. If the master endpoints
are not static then users can pass " +
+ s"custom resolver implementation to discover master endpoints actively
using ${MASTER_ENDPOINTS_RESOLVER.key}.")
.version("0.2.0")
.stringConf
.toSequence
- .checkValue(
- endpoints => endpoints.map(_ =>
Try(Utils.parseHostPort(_))).forall(_.isSuccess),
- "Allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`")
.createWithDefaultString(s"<localhost>:9097")
val MASTER_CLIENT_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/client/MasterEndpointResolver.scala
b/common/src/main/scala/org/apache/celeborn/common/client/MasterEndpointResolver.scala
new file mode 100644
index 000000000..7b72cd048
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/client/MasterEndpointResolver.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.RpcNameConstants
+
+abstract class MasterEndpointResolver(
+ private val conf: CelebornConf,
+ private val isWorker: Boolean) extends Logging {
+
+ protected var activeMasterEndpoints: Option[List[String]] = None
+ protected val updated = new AtomicBoolean(false)
+
+ val masterEndpointName: String =
+ if (isWorker && conf.internalPortEnabled) {
+ // For worker, we should use the internal endpoints if internal port is
enabled.
+ RpcNameConstants.MASTER_INTERNAL_EP
+ } else {
+ RpcNameConstants.MASTER_EP
+ }
+
+ if (masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP) {
+ resolve(conf.masterInternalEndpoints)
+ } else {
+ resolve(conf.masterEndpoints)
+ }
+
+ def getUpdatedAndReset(): Boolean = updated.compareAndSet(true, false)
+
+ def getActiveMasterEndpoints: java.util.List[String] =
activeMasterEndpoints.get.asJava
+
+ protected def resolve(endpoints: Array[String]): Unit
+
+ protected def update(endpoints: Array[String]): Unit
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolver.scala
b/common/src/main/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolver.scala
new file mode 100644
index 000000000..88f8352e5
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolver.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.{HA_MASTER_NODE_INTERNAL_PORT,
HA_MASTER_NODE_PORT}
+import org.apache.celeborn.common.protocol.RpcNameConstants
+import org.apache.celeborn.common.util.Utils
+
+class StaticMasterEndpointResolver(conf: CelebornConf, isWorker: Boolean)
+ extends MasterEndpointResolver(conf, isWorker) {
+
+ override def resolve(endpoints: Array[String]): Unit = {
+ val haMasterPort =
+ if (masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP) {
+ HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get
+ } else {
+ HA_MASTER_NODE_PORT.defaultValue.get
+ }
+
+ this.activeMasterEndpoints = Some(endpoints.map { endpoint =>
+ Utils.parseHostPort(endpoint.replace("<localhost>",
Utils.localHostName(conf))) match {
+ case (host, 0) => s"$host:$haMasterPort"
+ case (host, port) => s"$host:$port"
+ }
+ }.toList)
+
+ Random.shuffle(this.activeMasterEndpoints.get)
+ logInfo(s"masterEndpoints = ${activeMasterEndpoints.get}")
+ }
+
+ override def update(endpoints: Array[String]): Unit = {}
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index d390561d7..44a7d5c2e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -52,6 +52,7 @@ import org.apache.celeborn.common.network.util.TransportConf
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, TransportModuleConstants}
import org.apache.celeborn.common.protocol.message.{ControlMessages, Message,
StatusCode}
import
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
+import org.apache.celeborn.reflect.DynConstructors
object Utils extends Logging {
@@ -571,6 +572,20 @@ object Utils extends Logging {
// scalastyle:on classforname
}
+ def instantiateMasterEndpointResolver[T](
+ className: String,
+ conf: CelebornConf,
+ isWorker: Boolean): T = {
+ try {
+ DynConstructors.builder().impl(className, classOf[CelebornConf],
java.lang.Boolean.TYPE)
+ .build[T]()
+ .newInstance(conf, java.lang.Boolean.valueOf(isWorker))
+ } catch {
+ case e: Throwable =>
+ throw new CelebornException(s"Failed to instantiate
masterEndpointResolver $className.", e)
+ }
+ }
+
def getCodeSourceLocation(clazz: Class[_]): String = {
new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI).getPath
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 6da252d9d..54497e6ee 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -158,18 +158,6 @@ class CelebornConfSuite extends CelebornFunSuite {
"Compression level for Zstd compression codec should be an integer
between -5 and 22."))
}
- test("replace <localhost> placeholder") {
- val conf = new CelebornConf()
- val replacedHost = conf.masterHost
- assert(!replacedHost.contains("<localhost>"))
- assert(replacedHost === Utils.localHostName(conf))
- val replacedHosts = conf.masterEndpoints
- replacedHosts.foreach { replacedHost =>
- assert(!replacedHost.contains("<localhost>"))
- assert(replacedHost contains Utils.localHostName(conf))
- }
- }
-
test("extract masterNodeIds") {
val conf = new CelebornConf()
.set("celeborn.master.ha.node.id", "1")
@@ -453,4 +441,23 @@ class CelebornConfSuite extends CelebornFunSuite {
}
}
+ test("CELEBORN-1511: Test master endpoint resolver") {
+ val conf = new CelebornConf()
+
+ val validResolverClass =
"org.apache.celeborn.common.client.StaticMasterEndpointResolver"
+ val invalidResolverClass = "org.apache.celeborn.UnknownClass"
+
+ conf.set(MASTER_ENDPOINTS_RESOLVER.key, validResolverClass)
+ assert(conf.masterEndpointResolver == validResolverClass)
+
+ try {
+ conf.set(MASTER_ENDPOINTS_RESOLVER.key, invalidResolverClass)
+ val _ = conf.masterEndpointResolver
+ } catch {
+ case e: Exception =>
+ assert(e.isInstanceOf[IllegalArgumentException])
+ assert(e.getMessage.contains("Resolver class was not found in the
classpath."))
+ }
+ }
+
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/client/MasterEndpointResolverSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/client/MasterEndpointResolverSuite.scala
new file mode 100644
index 000000000..c89c9ac07
--- /dev/null
+++
b/common/src/test/scala/org/apache/celeborn/common/client/MasterEndpointResolverSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.RpcNameConstants
+
+class DummyMasterEndpointResolver(conf: CelebornConf, isWorker: Boolean)
+ extends MasterEndpointResolver(conf, isWorker) {
+
+ override def resolve(endpoints: Array[String]): Unit = {
+ this.activeMasterEndpoints = Some(endpoints.toList)
+ }
+
+ override def update(endpoints: Array[String]): Unit = {
+ this.activeMasterEndpoints = Some(endpoints.toList)
+ updated.set(true)
+ }
+
+ def updateTest(endpoints: Array[String]): Unit = {
+ update(endpoints)
+ }
+}
+
+class MasterEndpointResolverSuite extends CelebornFunSuite {
+
+ test("resolve") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.MASTER_ENDPOINTS.key, "clb-1:1234")
+ val resolver = new DummyMasterEndpointResolver(conf, false)
+
+ assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+ assert(!resolver.getUpdatedAndReset())
+ // isUpdated should be not be reset if isUpdated returns false
+ assert(!resolver.getUpdatedAndReset())
+ assert(resolver.getActiveMasterEndpoints.size == 1)
+ assert(resolver.getActiveMasterEndpoints.get(0) == "clb-1:1234")
+
+ resolver.updateTest(Array("clb-2:1234"))
+ assert(resolver.getUpdatedAndReset())
+ // isUpdated should be reset after calling isUpdated once
+ assert(!resolver.getUpdatedAndReset())
+ assert(resolver.getActiveMasterEndpoints.size == 1)
+ assert(resolver.getActiveMasterEndpoints.get(0) == "clb-2:1234")
+ }
+
+ test("resolve with internal port enabled and isWorker = false") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.MASTER_ENDPOINTS.key, "clb-1:1234")
+ conf.set(CelebornConf.MASTER_INTERNAL_ENDPOINTS.key, "clb-internal-1:1234")
+ conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+
+ val resolver = new DummyMasterEndpointResolver(conf, false)
+
+ assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+ assert(resolver.getActiveMasterEndpoints.size == 1)
+ assert(resolver.getActiveMasterEndpoints.get(0) == "clb-1:1234")
+ }
+
+ test("resolve with internal port enabled and isWorker = true") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+ conf.set(CelebornConf.MASTER_ENDPOINTS.key, "clb-1:1234")
+ conf.set(CelebornConf.MASTER_INTERNAL_ENDPOINTS.key, "clb-internal-1:1234")
+
+ val resolver = new DummyMasterEndpointResolver(conf, true)
+
+ assert(resolver.masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP)
+ assert(resolver.getActiveMasterEndpoints.size == 1)
+ assert(resolver.getActiveMasterEndpoints.get(0) == "clb-internal-1:1234")
+ }
+}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolverSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolverSuite.scala
new file mode 100644
index 000000000..aacf0bf76
--- /dev/null
+++
b/common/src/test/scala/org/apache/celeborn/common/client/StaticMasterEndpointResolverSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.RpcNameConstants
+import org.apache.celeborn.common.util.Utils
+
+class StaticMasterEndpointResolverSuite extends CelebornFunSuite {
+
+ test("resolve with default configs") {
+ val conf = new CelebornConf()
+ val resolver = new StaticMasterEndpointResolver(conf, false)
+
+ assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+ assert(!resolver.getUpdatedAndReset())
+ assert(resolver.getActiveMasterEndpoints.size == 1)
+ assert(resolver.getActiveMasterEndpoints.get(0) ==
s"${Utils.localHostName(conf)}:9097")
+ }
+
+ test("resolve with internal port enabled and isWorker = false") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+ conf.set(CelebornConf.MASTER_ENDPOINTS.key,
"clb-1:1234,clb-2,<localhost>:9097")
+ conf.set(
+ CelebornConf.MASTER_INTERNAL_ENDPOINTS.key,
+ "clb-internal-1:1234,clb-internal-2,<localhost>:9097")
+
+ val resolver = new StaticMasterEndpointResolver(conf, false)
+
+ assert(resolver.masterEndpointName == RpcNameConstants.MASTER_EP)
+ assert(!resolver.getUpdatedAndReset())
+ assert(resolver.getActiveMasterEndpoints.size == 3)
+ assert(resolver.getActiveMasterEndpoints.contains("clb-1:1234"))
+ assert(resolver.getActiveMasterEndpoints.contains(
+ s"clb-2:${CelebornConf.HA_MASTER_NODE_PORT.defaultValue.get}"))
+
assert(resolver.getActiveMasterEndpoints.contains(s"${Utils.localHostName(conf)}:9097"))
+ }
+
+ test("resolve with internal port enabled and isWorker = true") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+ conf.set(CelebornConf.MASTER_ENDPOINTS.key,
"clb-1:1234,clb-2,<localhost>:9097")
+ conf.set(
+ CelebornConf.MASTER_INTERNAL_ENDPOINTS.key,
+ "clb-internal-1:1234,clb-internal-2,<localhost>:8097")
+
+ val resolver = new StaticMasterEndpointResolver(conf, true)
+
+ assert(resolver.masterEndpointName == RpcNameConstants.MASTER_INTERNAL_EP)
+ assert(!resolver.getUpdatedAndReset())
+ assert(resolver.getActiveMasterEndpoints.size == 3)
+ assert(resolver.getActiveMasterEndpoints.contains("clb-internal-1:1234"))
+ assert(resolver.getActiveMasterEndpoints.contains(
+
s"clb-internal-2:${CelebornConf.HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get}"))
+
assert(resolver.getActiveMasterEndpoints.contains(s"${Utils.localHostName(conf)}:8097"))
+ }
+}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index 6abf91357..a4ee14573 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -21,6 +21,8 @@ import java.util
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.client.{MasterEndpointResolver,
StaticMasterEndpointResolver}
+import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.protocol.{PartitionLocation,
TransportModuleConstants}
import
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
MapperEnd}
import org.apache.celeborn.common.protocol.message.StatusCode
@@ -102,6 +104,30 @@ class UtilsSuite extends CelebornFunSuite {
assert(false == Utils.classIsLoadable("a.b.c.d.e.f"))
}
+ test("instantiateMasterEndpointResolver") {
+ val celebornConf = new CelebornConf()
+ val masterEndpointResolver =
Utils.instantiateMasterEndpointResolver[MasterEndpointResolver](
+ celebornConf.masterEndpointResolver,
+ celebornConf,
+ isWorker = true)
+
+ assert(masterEndpointResolver.isInstanceOf[MasterEndpointResolver])
+ assert(masterEndpointResolver.isInstanceOf[StaticMasterEndpointResolver])
+ }
+
+ test("instantiateMasterEndpointResolver invalid resolver classname") {
+ val celebornConf = new CelebornConf()
+ val invalidClassName = "invalidClassName"
+
+ val e = intercept[CelebornException] {
+ Utils.instantiateMasterEndpointResolver[MasterEndpointResolver](
+ invalidClassName,
+ celebornConf,
+ isWorker = true)
+ }
+ assert(s"Failed to instantiate masterEndpointResolver $invalidClassName."
=== e.getMessage)
+ }
+
test("splitPartitionLocationUniqueId") {
assert((1, 1).equals(Utils.splitPartitionLocationUniqueId("1-1")))
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index eda8edce2..e882449ab 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -115,7 +115,8 @@ license: |
| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn
supports the following kind of fallback policies. 1. ALWAYS: always use spark
built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle
implementation, and fallback to use spark built-in shuffle implementation based
on certain factors, e.g. availability of enough workers and quota, shuffle
partition number; 3. NEVER: always use celeborn shuffle implementation, and
fail fast when it it is concluded th [...]
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always
use spark built-in shuffle implementation. This configuration is deprecated,
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. |
0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge.
This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 |
celeborn.shuffle.writer |
-| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 | |
+| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn clients to connect. Client uses resolver provided
byceleborn.master.endpoints.resolver to resolve the master endpoints. By
default Celeborn uses
`org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take
static master endpoints as input. Allowed pattern:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. [...]
+| celeborn.master.endpoints.resolver |
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false |
Resolver class that can be used for discovering and updating the master
endpoints. This allows users to provide a custom master endpoint resolver
implementation. This is useful in environments where the master nodes might
change due to scaling operations or infrastructure updates. Clients need to
ensure that provided resolver class should be present in the classpath. | 0.6.0
| |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service of Spark when Master side checks that there is no enough quota
for current user. | 0.2.0 | |
| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | false |
IdentityProvider class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0 [...]
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ed3d15454..1605f1f33 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -34,7 +34,8 @@ license: |
| celeborn.dynamicConfig.store.fs.path | <undefined> | false | The path
of dynamic config file for fs store backend. The file format should be yaml.
The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 | |
| celeborn.internal.port.enabled | false | false | Whether to create a
internal port on Masters/Workers for inter-Masters/Workers communication. This
is beneficial when SASL authentication is enforced for all interactions between
clients and Celeborn Services, but the services can exchange messages without
being subject to SASL authentication. | 0.5.0 | |
| celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf
for debugging purposes. | 0.5.0 | |
-| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 | |
+| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn clients to connect. Client uses resolver provided
byceleborn.master.endpoints.resolver to resolve the master endpoints. By
default Celeborn uses
`org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take
static master endpoints as input. Allowed pattern:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. [...]
+| celeborn.master.endpoints.resolver |
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false |
Resolver class that can be used for discovering and updating the master
endpoints. This allows users to provide a custom master endpoint resolver
implementation. This is useful in environments where the master nodes might
change due to scaling operations or infrastructure updates. Clients need to
ensure that provided resolver class should be present in the classpath. | 0.6.0
| |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore
partition size smaller than this configuration of partition size for
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.master.internal.endpoints | <localhost>:8097 | false |
Endpoints of master nodes just for celeborn workers to connect, allowed pattern
is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:8097,clb2:8097,clb3:8097`.
If the port is omitted, 8097 will be used. | 0.5.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false |
Regex to decide which Celeborn configuration properties and environment
variables in master and worker environments contain sensitive information. When
this regex matches a property key or value, the value is redacted from the
logging. | 0.5.0 | |