This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d35f6f64f [CELEBORN-713] Local network binding support IP or FQDN
d35f6f64f is described below
commit d35f6f64fd44ccb112db71820b7984dddfdf6695
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jun 27 09:42:11 2023 +0800
[CELEBORN-713] Local network binding support IP or FQDN
### What changes were proposed in this pull request?
This PR aims to make network local address binding support both IP and FQDN
strategy.
Additional, it refactors the `ShuffleClientImpl#genAddressPair`, from
`${hostAndPort}-${hostAndPort}` to `Pair<String, String>`, which works properly
when using IP but may not on FQDN because FQDN may contain `-`
### Why are the changes needed?
Currently, when the bind hostname is not set explicitly, Celeborn will find
the first non-loopback address and always uses the IP to bind, this is not
suitable for K8s cases, as the STS has a stable FQDN but Pod IP will be changed
once Pod restarting.
For `ShuffleClientImpl#genAddressPair`, it must be changed otherwise may
cause
```
java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 11657 in stage 0.0 failed 4 times, most recent
failure: Lost task 11657.3 in stage 0.0 (TID 12747) (10.153.253.198 executor
157): java.lang.ArrayIndexOutOfBoundsException: 1
at
org.apache.celeborn.client.ShuffleClientImpl.doPushMergedData(ShuffleClientImpl.java:874)
at
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:735)
at
org.apache.celeborn.client.ShuffleClientImpl.mergeData(ShuffleClientImpl.java:827)
at
org.apache.spark.shuffle.celeborn.SortBasedPusher.pushData(SortBasedPusher.java:140)
at
org.apache.spark.shuffle.celeborn.SortBasedPusher.insertRecord(SortBasedPusher.java:192)
at
org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.fastWrite0(SortBasedShuffleWriter.java:192)
at
org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.write(SortBasedShuffleWriter.java:145)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
Yes, a new configuration `celeborn.network.bind.preferIpAddress` is
introduced, and the default value is `true` to preserve the existing behavior.
### How was this patch tested?
Manually testing with `celeborn.network.bind.preferIpAddress=false`
```
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-0.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.143.252
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-1.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.173.94
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-2.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.149.42
starting org.apache.celeborn.service.deploy.worker.Worker, logging to
/opt/celeborn/logs/celeborn--org.apache.celeborn.service.deploy.worker.Worker-1-celeborn-worker-4.out
2023-06-25 23:49:52 [INFO] [main]
org.apache.celeborn.common.rpc.netty.Dispatcher#51 - Dispatcher numThreads: 4
2023-06-25 23:49:52 [INFO] [main]
org.apache.celeborn.common.network.client.TransportClientFactory#91 - mode NIO
threads 64
2023-06-25 23:49:52 [INFO] [main]
org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory#51 - Starting RPC
Server [WorkerSys] on
celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0 with advisor
endpoint celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.util.Utils#51
- Successfully started service 'WorkerSys' on port 38303.
```
Closes #1622 from pan3793/CELEBORN-713.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 1753556565e73fa74dd9f3d7722ec09ac0a5b476)
Signed-off-by: Cheng Pan <[email protected]>
---
client/pom.xml | 1 -
.../apache/celeborn/client/ShuffleClientImpl.java | 31 +++++++++++-----------
.../apache/celeborn/client/LifecycleManager.scala | 2 +-
.../apache/celeborn/common/write/PushState.java | 23 +++++++---------
.../org/apache/celeborn/common/CelebornConf.scala | 31 +++++++++++++++-------
.../org/apache/celeborn/common/util/Utils.scala | 14 +++-------
.../apache/celeborn/common/CelebornConfSuite.scala | 4 +--
docs/configuration/metrics.md | 4 +--
docs/configuration/network.md | 1 +
docs/deploy_on_k8s.md | 26 ++++++++++++++++--
.../ha/RatisMasterStatusSystemSuiteJ.java | 8 +++---
.../deploy/master/MasterArgumentsSuite.scala | 4 ++-
.../celeborn/tests/flink/HeartbeatTest.scala | 6 ++---
.../celeborn/tests/spark/HeartbeatTest.scala | 6 ++---
.../service/deploy/worker/WorkerArguments.scala | 2 +-
.../celeborn/service/deploy/HeartbeatFeature.scala | 25 +++++++----------
.../service/deploy/WorkerArgumentsSuite.scala | 2 +-
17 files changed, 102 insertions(+), 88 deletions(-)
diff --git a/client/pom.xml b/client/pom.xml
index 864bfbdb7..813d208f3 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -61,7 +61,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 6edf24a04..1dc7605cd 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -30,6 +30,7 @@ import scala.reflect.ClassTag$;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,7 +169,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
// init rpc env and master endpointRef
- rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, conf);
+ rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(conf), 0,
conf);
String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
@@ -281,7 +282,7 @@ public class ShuffleClientImpl extends ShuffleClient {
StatusCode cause,
Integer oldGroupedBatchId,
int remainReviveTimes) {
- HashMap<String, DataBatches> newDataBatchesMap = new HashMap<>();
+ HashMap<Pair<String, String>, DataBatches> newDataBatchesMap = new
HashMap<>();
ArrayList<DataBatches.DataBatch> reviveFailedBatchesMap = new
ArrayList<>();
for (DataBatches.DataBatch batch : batches) {
int partitionId = batch.loc.getId();
@@ -326,8 +327,8 @@ public class ShuffleClientImpl extends ShuffleClient {
}
}
- for (Map.Entry<String, DataBatches> entry : newDataBatchesMap.entrySet()) {
- String addressPair = entry.getKey();
+ for (Map.Entry<Pair<String, String>, DataBatches> entry :
newDataBatchesMap.entrySet()) {
+ Pair<String, String> addressPair = entry.getKey();
DataBatches newDataBatches = entry.getValue();
doPushMergedData(
addressPair,
@@ -355,14 +356,12 @@ public class ShuffleClientImpl extends ShuffleClient {
}
}
- private String genAddressPair(PartitionLocation loc) {
- String addressPair;
+ private Pair<String, String> genAddressPair(PartitionLocation loc) {
if (loc.hasPeer()) {
- addressPair = loc.hostAndPushPort() + "-" +
loc.getPeer().hostAndPushPort();
+ return Pair.of(loc.hostAndPushPort(), loc.getPeer().hostAndPushPort());
} else {
- addressPair = loc.hostAndPushPort();
+ return Pair.of(loc.hostAndPushPort(), null);
}
- return addressPair;
}
private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
@@ -908,7 +907,7 @@ public class ShuffleClientImpl extends ShuffleClient {
} else {
// add batch data
logger.debug("Merge batch {}.", nextBatchId);
- String addressPair = genAddressPair(loc);
+ Pair<String, String> addressPair = genAddressPair(loc);
boolean shouldPush = pushState.addBatchData(addressPair, loc,
nextBatchId, body);
if (shouldPush) {
limitMaxInFlight(mapKey, pushState, loc.hostAndPushPort());
@@ -1019,12 +1018,12 @@ public class ShuffleClientImpl extends ShuffleClient {
if (pushState == null) {
return;
}
- ArrayList<Map.Entry<String, DataBatches>> batchesArr =
+ ArrayList<Map.Entry<Pair<String, String>, DataBatches>> batchesArr =
new ArrayList<>(pushState.batchesMap.entrySet());
while (!batchesArr.isEmpty()) {
- Map.Entry<String, DataBatches> entry =
batchesArr.get(RND.nextInt(batchesArr.size()));
- String[] tokens = entry.getKey().split("-");
- limitMaxInFlight(mapKey, pushState, tokens[0]);
+ Map.Entry<Pair<String, String>, DataBatches> entry =
+ batchesArr.get(RND.nextInt(batchesArr.size()));
+ limitMaxInFlight(mapKey, pushState, entry.getKey().getLeft());
ArrayList<DataBatches.DataBatch> batches =
entry.getValue().requireBatches(pushBufferMaxSize);
if (entry.getValue().getTotalSize() == 0) {
batchesArr.remove(entry);
@@ -1035,14 +1034,14 @@ public class ShuffleClientImpl extends ShuffleClient {
}
private void doPushMergedData(
- String addressPair,
+ Pair<String, String> addressPair,
int shuffleId,
int mapId,
int attemptId,
ArrayList<DataBatches.DataBatch> batches,
PushState pushState,
int remainReviveTimes) {
- String hostPort = addressPair.split("-")[0];
+ String hostPort = addressPair.getLeft();
final String[] splits = hostPort.split(":");
final String host = splits[0];
final int port = Integer.parseInt(splits[1]);
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index f6e4586cc..840a85f27 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -55,7 +55,7 @@ object LifecycleManager {
class LifecycleManager(val appUniqueId: String, val conf: CelebornConf)
extends RpcEndpoint
with Logging {
- private val lifecycleHost = Utils.localHostName
+ private val lifecycleHost = Utils.localHostName(conf)
private val shuffleExpiredCheckIntervalMs =
conf.shuffleExpiredCheckIntervalMs
private val pushReplicateEnabled = conf.clientPushReplicateEnabled
diff --git
a/common/src/main/java/org/apache/celeborn/common/write/PushState.java
b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
index b082cc489..b61384f2d 100644
--- a/common/src/main/java/org/apache/celeborn/common/write/PushState.java
+++ b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
+
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.util.JavaUtils;
@@ -40,25 +42,18 @@ public class PushState {
inFlightRequestTracker.cleanup();
}
- // key: ${master addr}-${slave addr} value: list of data batch
- public final ConcurrentHashMap<String, DataBatches> batchesMap =
JavaUtils.newConcurrentHashMap();
-
- /**
- * Not thread-safe
- *
- * @param addressPair
- * @param loc
- * @param batchId
- * @param body
- * @return
- */
- public boolean addBatchData(String addressPair, PartitionLocation loc, int
batchId, byte[] body) {
+ // key: ${master addr}, ${slave addr} value: list of data batch
+ public final ConcurrentHashMap<Pair<String, String>, DataBatches> batchesMap
=
+ JavaUtils.newConcurrentHashMap();
+
+ public boolean addBatchData(
+ Pair<String, String> addressPair, PartitionLocation loc, int batchId,
byte[] body) {
DataBatches batches = batchesMap.computeIfAbsent(addressPair, (s) -> new
DataBatches());
batches.addDataBatch(loc, batchId, body);
return batches.getTotalSize() > pushBufferMaxSize;
}
- public DataBatches takeDataBatches(String addressPair) {
+ public DataBatches takeDataBatches(Pair<String, String> addressPair) {
return batchesMap.remove(addressPair);
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 92a201018..af31918e8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -369,6 +369,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// //////////////////////////////////////////////////////
// Network //
// //////////////////////////////////////////////////////
+ def bindPreferIP: Boolean = get(NETWORK_BIND_PREFER_IP)
def portMaxRetries: Int = get(PORT_MAX_RETRY)
def networkTimeout: RpcTimeout =
new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key)
@@ -517,7 +518,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// //////////////////////////////////////////////////////
def masterEndpoints: Array[String] =
get(MASTER_ENDPOINTS).toArray.map { endpoint =>
- Utils.parseHostPort(endpoint) match {
+ Utils.parseHostPort(endpoint.replace("<localhost>",
Utils.localHostName(this))) match {
case (host, 0) => s"$host:${HA_MASTER_NODE_PORT.defaultValue.get}"
case (host, port) => s"$host:$port"
}
@@ -528,7 +529,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def masterClientMaxRetries: Int = get(MASTER_CLIENT_MAX_RETRIES)
- def masterHost: String = get(MASTER_HOST)
+ def masterHost: String = get(MASTER_HOST).replace("<localhost>",
Utils.localHostName(this))
def masterPort: Int = get(MASTER_PORT)
@@ -563,7 +564,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def haMasterNodeHost(nodeId: String): String = {
val key = HA_MASTER_NODE_HOST.key.replace("<id>", nodeId)
val legacyKey = HA_MASTER_NODE_HOST.alternatives.head._1.replace("<id>",
nodeId)
- get(key, get(legacyKey, Utils.localHostName))
+ get(key, get(legacyKey, Utils.localHostName(this)))
}
def haMasterNodePort(nodeId: String): Int = {
@@ -652,12 +653,14 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE)
def metricsCollectCriticalEnabled: Boolean =
get(METRICS_COLLECT_CRITICAL_ENABLED)
def metricsCapacity: Int = get(METRICS_CAPACITY)
- def masterPrometheusMetricHost: String = get(MASTER_PROMETHEUS_HOST)
+ def masterPrometheusMetricHost: String =
+ get(MASTER_PROMETHEUS_HOST).replace("<localhost>",
Utils.localHostName(this))
def masterPrometheusMetricPort: Int = get(MASTER_PROMETHEUS_PORT)
- def workerPrometheusMetricHost: String = get(WORKER_PROMETHEUS_HOST)
+ def workerPrometheusMetricHost: String =
+ get(WORKER_PROMETHEUS_HOST).replace("<localhost>",
Utils.localHostName(this))
def workerPrometheusMetricPort: Int = get(WORKER_PROMETHEUS_PORT)
def metricsExtraLabels: Map[String, String] =
- get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels(_)).toMap
+ get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
def metricsAppTopDiskUsageWindowSize: Int =
get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
def metricsAppTopDiskUsageInterval: Long =
get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
@@ -1171,6 +1174,16 @@ object CelebornConf extends Logging {
private def buildConf(key: String): ConfigBuilder =
ConfigBuilder(key).onCreate(register)
+ val NETWORK_BIND_PREFER_IP: ConfigEntry[Boolean] =
+ buildConf("celeborn.network.bind.preferIpAddress")
+ .categories("network")
+ .version("0.3.0")
+ .doc("When `ture`, prefer to use IP address, otherwise FQDN. This
configuration only " +
+ "takes effects when the bind hostname is not set explicitly, in such
case, Celeborn " +
+ "will find the first non-loopback address to bind.")
+ .booleanConf
+ .createWithDefault(true)
+
val NETWORK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.network.timeout")
.categories("network")
@@ -1448,7 +1461,6 @@ object CelebornConf extends Logging {
"If the port is omitted, 9097 will be used.")
.version("0.2.0")
.stringConf
- .transform(_.replace("<localhost>", Utils.localHostName))
.toSequence
.checkValue(
endpoints => endpoints.map(_ =>
Try(Utils.parseHostPort(_))).forall(_.isSuccess),
@@ -1498,7 +1510,6 @@ object CelebornConf extends Logging {
.version("0.2.0")
.doc("Hostname for master to bind.")
.stringConf
- .transform(_.replace("<localhost>", Utils.localHostName))
.createWithDefaultString("<localhost>")
val MASTER_PORT: ConfigEntry[Int] =
@@ -3293,7 +3304,7 @@ object CelebornConf extends Logging {
.doc("Master's Prometheus host.")
.version("0.3.0")
.stringConf
- .createWithDefault("0.0.0.0")
+ .createWithDefault("<localhost>")
val MASTER_PROMETHEUS_PORT: ConfigEntry[Int] =
buildConf("celeborn.metrics.master.prometheus.port")
@@ -3312,7 +3323,7 @@ object CelebornConf extends Logging {
.doc("Worker's Prometheus host.")
.version("0.3.0")
.stringConf
- .createWithDefault("0.0.0.0")
+ .createWithDefault("<localhost>")
val WORKER_PROMETHEUS_PORT: ConfigEntry[Int] =
buildConf("celeborn.metrics.worker.prometheus.port")
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index d7fb75073..247d77657 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -382,22 +382,14 @@ object Utils extends Logging {
private var customHostname: Option[String] =
sys.env.get("CELEBORN_LOCAL_HOSTNAME")
+ // for testing
def setCustomHostname(hostname: String) {
- // DEBUG code
Utils.checkHost(hostname)
customHostname = Some(hostname)
}
- def localCanonicalHostName: String = {
- customHostname.getOrElse(localIpAddress.getCanonicalHostName)
- }
-
- def localHostName: String = {
- customHostname.getOrElse(localIpAddress.getHostAddress)
- }
-
- def localHostNameForURI: String = {
- customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
+ def localHostName(conf: CelebornConf): String = customHostname.getOrElse {
+ if (conf.bindPreferIP) localIpAddress.getHostAddress else
localIpAddress.getCanonicalHostName
}
/**
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 2b12b9f74..e7766040d 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -144,11 +144,11 @@ class CelebornConfSuite extends CelebornFunSuite {
val conf = new CelebornConf()
val replacedHost = conf.masterHost
assert(!replacedHost.contains("<localhost>"))
- assert(replacedHost === Utils.localHostName)
+ assert(replacedHost === Utils.localHostName(conf))
val replacedHosts = conf.masterEndpoints
replacedHosts.foreach { replacedHost =>
assert(!replacedHost.contains("<localhost>"))
- assert(replacedHost contains Utils.localHostName)
+ assert(replacedHost contains Utils.localHostName(conf))
}
}
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index 8a2999af5..817e03572 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -27,10 +27,10 @@ license: |
| celeborn.metrics.conf | <undefined> | Custom metrics configuration
file path. Default use `metrics.properties` in classpath. | 0.3.0 |
| celeborn.metrics.enabled | true | When true, enable metrics system. | 0.2.0
|
| celeborn.metrics.extraLabels | | If default metric labels are not enough,
extra metric labels can be customized. Labels' pattern is:
`<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`; e.g.
`env=prod,version=1` | 0.3.0 |
-| celeborn.metrics.master.prometheus.host | 0.0.0.0 | Master's Prometheus
host. | 0.3.0 |
+| celeborn.metrics.master.prometheus.host | <localhost> | Master's
Prometheus host. | 0.3.0 |
| celeborn.metrics.master.prometheus.port | 9098 | Master's Prometheus port. |
0.3.0 |
| celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer
metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
| celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size
of timer metric. | 0.2.0 |
-| celeborn.metrics.worker.prometheus.host | 0.0.0.0 | Worker's Prometheus
host. | 0.3.0 |
+| celeborn.metrics.worker.prometheus.host | <localhost> | Worker's
Prometheus host. | 0.3.0 |
| celeborn.metrics.worker.prometheus.port | 9096 | Worker's Prometheus port. |
0.3.0 |
<!--end-include-->
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 28dc7fd01..1784ca3ec 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -38,6 +38,7 @@ license: |
| celeborn.<module>.io.serverThreads | 0 | Number of threads used in the
server thread pool. Default to 0, which is 2x#cores. | |
| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for
checking push data timeout. If setting <module> to `data`, it works for shuffle
client push data and should be configured on client side. If setting <module>
to `replicate`, it works for worker replicate data to peer worker and should be
configured on worker side. | 0.3.0 |
| celeborn.<module>.push.timeoutCheck.threads | 16 | Threads num for
checking push data timeout. If setting <module> to `data`, it works for shuffle
client push data and should be configured on client side. If setting <module>
to `replicate`, it works for worker replicate data to peer worker and should be
configured on worker side. | 0.3.0 |
+| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP
address, otherwise FQDN. This configuration only takes effects when the bind
hostname is not set explicitly, in such case, Celeborn will find the first
non-loopback address to bind. | 0.3.0 |
| celeborn.network.connect.timeout | 10s | Default socket connect timeout. |
0.2.0 |
| celeborn.network.memory.allocator.numArenas | <undefined> | Number of
arenas for pooled memory allocator. Default value is
Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 |
| celeborn.network.memory.allocator.share | false | Whether to share memory
allocator. | 0.3.0 |
diff --git a/docs/deploy_on_k8s.md b/docs/deploy_on_k8s.md
index c7d79ee08..2e1ad36a7 100644
--- a/docs/deploy_on_k8s.md
+++ b/docs/deploy_on_k8s.md
@@ -82,7 +82,7 @@ celeborn-worker-0 1/1 Running 0 1m
...
```
-Given that Celeborn Master/Worker takes time to start, you can see the
following phenomenon:
+Given that Celeborn Master/Worker Pod takes time to start, you can see the
following phenomenon:
```
** server can't find
celeborn-master-0.celeborn-master-svc.default.svc.cluster.local: NXDOMAIN
@@ -113,7 +113,29 @@ starting org.apache.celeborn.service.deploy.master.Master,
logging to /opt/celeb
23/03/23 14:10:56,216 INFO [main] Master: Master started.
```
-### 5. Build Celeborn Client
+### 5. Access Celeborn Service
+
+The Celeborn Master/Worker nodes deployed via official Helm charts run as
[StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/),
+it can be accessed through Pod IP or [Stable Network ID (DNS
name)](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#stable-network-id),
+in above case, the Master/Worker nodes can be accessed through:
+
+```
+celeborn-master-0.celeborn-master-svc.default.svc.cluster.local`
+...
+celeborn-worker-0.celeborn-worker-svc.default.svc.cluster.local`
+...
+```
+
+After a restart, the StatefulSet Pod IP changes but the DNS name remains, this
is important for rolling upgrade.
+
+When bind address is not set explicitly, Celeborn worker is going to find the
first non-loopback address to bind. By default,
+it use IP address both for address binding and registering, that causes the
Master and Client use the IP address to access the
+Worker, it's problematic after Worker restart as explained above, especially
when Graceful Shutdown is enabled.
+
+You may want to set `celeborn.network.bind.preferIpAddress=false` to address
such issue. Note that, depends on your Kubernetes
+network infrastructure, this may cause pressure on DNS service or other
network issues compared with using IP address directly.
+
+### 6. Build Celeborn Client
Here, without going into detail on how to configure spark/flink to find
celeborn master/worker, mention the key
configuration:
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index fdb0ca0e1..9b9bea917 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -112,25 +112,23 @@ public class RatisMasterStatusSystemSuiteJ {
int ratisPort2 = 9873;
int ratisPort3 = 9874;
- String localHost = Utils.localHostName();
-
MasterNode masterNode1 =
new MasterNode.Builder()
- .setHost(localHost)
+ .setHost(Utils.localHostName(conf1))
.setRatisPort(ratisPort1)
.setRpcPort(9872)
.setNodeId(id1)
.build();
MasterNode masterNode2 =
new MasterNode.Builder()
- .setHost(localHost)
+ .setHost(Utils.localHostName(conf2))
.setRatisPort(ratisPort2)
.setRpcPort(9873)
.setNodeId(id2)
.build();
MasterNode masterNode3 =
new MasterNode.Builder()
- .setHost(localHost)
+ .setHost(Utils.localHostName(conf3))
.setRatisPort(ratisPort3)
.setRpcPort(9874)
.setNodeId(id3)
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
index 3e17e1ab7..ae7402585 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
@@ -31,7 +31,9 @@ class MasterArgumentsSuite extends AnyFunSuite with Logging {
val conf1 = new CelebornConf()
val arguments1 = new MasterArguments(args1, conf1)
- assert(arguments1.host === sys.env.getOrElse("CELEBORN_LOCAL_HOSTNAME",
Utils.localHostName))
+ assert(arguments1.host === sys.env.getOrElse(
+ "CELEBORN_LOCAL_HOSTNAME",
+ Utils.localHostName(conf1)))
assert(arguments1.port === 9097)
// should use celeborn conf
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
index 0f7a065a2..fb2a4b280 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
@@ -30,7 +30,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
with BeforeAndAfterAll with BeforeAndAfterEach {
test("celeborn flink hearbeat test - client <- worker") {
- val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf()
+ val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
"",
@@ -45,7 +45,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
}
test("celeborn flink hearbeat test - client <- worker no heartbeat") {
- val (_, clientConf) =
getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
+ val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
"",
@@ -60,7 +60,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
}
test("celeborn flink hearbeat test - client <- worker timeout") {
- val (_, clientConf) =
getTestHeartbeatFromWorker2ClientWithCloseChannelConf()
+ val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
"",
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
index 5e167f18c..af6142022 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
@@ -37,21 +37,21 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
}
test("celeborn spark heartbeat test - client <- worker") {
- val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf()
+ val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
val shuffleClientImpl =
new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
testHeartbeatFromWorker2Client(shuffleClientImpl.getDataClientFactory)
}
test("celeborn spark heartbeat test - client <- worker on heartbeat") {
- val (_, clientConf) =
getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
+ val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
val shuffleClientImpl =
new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
testHeartbeatFromWorker2ClientWithNoHeartbeat(shuffleClientImpl.getDataClientFactory)
}
test("celeborn spark heartbeat test - client <- worker timeout") {
- val (_, clientConf) =
getTestHeartbeatFromWorker2ClientWithCloseChannelConf()
+ val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
val shuffleClientImpl =
new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
testHeartbeatFromWorker2ClientWithCloseChannel(shuffleClientImpl.getDataClientFactory)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
index 32231f8f5..6f8fb682b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
@@ -34,7 +34,7 @@ class WorkerArguments(args: Array[String], conf:
CelebornConf) {
parse(args.toList)
// 2nd read from configuration file
_propertiesFile = Some(Utils.loadDefaultCelebornProperties(conf,
_propertiesFile.orNull))
- _host = _host.orElse(Some(Utils.localHostName))
+ _host = _host.orElse(Some(Utils.localHostName(conf)))
_port = _port.orElse(Some(conf.workerRpcPort))
def host: String = _host.get
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
index 27112eef4..dad8dcd38 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
@@ -41,29 +41,26 @@ trait HeartbeatFeature extends MiniClusterFeature {
val (_master, _workers) = setUpMiniCluster(masterConf, workerConf,
workerNum = 1)
master = _master
workers = _workers
- workers.map(w => {
+ workers.foreach { w =>
val (pushPort, fetchPort) = w.getPushFetchServerPort
logInfo(s"worker port1:$pushPort $fetchPort")
val clientPush =
- dataClientFactory.createClient(Utils.localHostName, pushPort, 0)
+ dataClientFactory.createClient(Utils.localHostName(w.conf),
pushPort, 0)
val clientFetch =
- dataClientFactory.createClient(
- Utils.localHostName,
- fetchPort,
- 0)
+ dataClientFactory.createClient(Utils.localHostName(w.conf),
fetchPort, 0)
logInfo(s"worker port2:$clientPush $clientFetch")
- // At Beggining, the client is active
+ // At beginning, the client is active
Assert.assertTrue(clientPush.isActive)
Assert.assertTrue(clientFetch.isActive)
assertFunc(clientPush, clientFetch)
- })
+ }
} finally {
if (master != null && workers != null)
shutdownMiniCluster()
}
}
- def getTestHeartbeatFromWorker2ClientConf(): (Map[String, String],
CelebornConf) = {
+ def getTestHeartbeatFromWorker2ClientConf: (Map[String, String],
CelebornConf) = {
val workerConf = Map(
CelebornConf.MASTER_ENDPOINTS.key -> "localhost:9097",
"celeborn.push.heartbeat.interval" -> "4s",
@@ -77,7 +74,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
}
def testHeartbeatFromWorker2Client(dataClientFactory:
TransportClientFactory): Unit = {
- val (workerConf, _) = getTestHeartbeatFromWorker2ClientConf()
+ val (workerConf, _) = getTestHeartbeatFromWorker2ClientConf
// client <- worker:default client do not send heartbeat to worker, and
worker sends hearbeat to client
// client active: after connection timeout, the channel still be active
testCore(
@@ -91,8 +88,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
})
}
- def getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
- : (Map[String, String], CelebornConf) = {
+ def getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf: (Map[String,
String], CelebornConf) = {
val workerConf = Map(
"celeborn.master.endpoints" -> "localhost:9097",
"celeborn.push.heartbeat.interval" -> "4s",
@@ -106,7 +102,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
def testHeartbeatFromWorker2ClientWithNoHeartbeat(dataClientFactory:
TransportClientFactory)
: Unit = {
- val (workerConf, _) =
getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
+ val (workerConf, _) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
// client <- worker:default client do not send heartbeat to worker, and
worker sends hearbeat to client
// client active: after connection timeout, the channel still be active
@@ -121,8 +117,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
})
}
- def getTestHeartbeatFromWorker2ClientWithCloseChannelConf()
- : (Map[String, String], CelebornConf) = {
+ def getTestHeartbeatFromWorker2ClientWithCloseChannelConf: (Map[String,
String], CelebornConf) = {
val workerConf = Map(
CelebornConf.MASTER_ENDPOINTS.key -> "localhost:9097",
"celeborn.fetch.io.connectionTimeout" -> "9s",
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
index a092c1b11..a4e7c4f30 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
@@ -32,7 +32,7 @@ class WorkerArgumentsSuite extends AnyFunSuite with Logging {
val conf1 = new CelebornConf()
val arguments1 = new WorkerArguments(args1, conf1)
- assert(arguments1.host.equals(Utils.localHostName))
+ assert(arguments1.host.equals(Utils.localHostName(conf1)))
assert(arguments1.port == 0)
// should use celeborn conf