This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d35f6f64f [CELEBORN-713] Local network binding support IP or FQDN
d35f6f64f is described below

commit d35f6f64fd44ccb112db71820b7984dddfdf6695
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jun 27 09:42:11 2023 +0800

    [CELEBORN-713] Local network binding support IP or FQDN
    
    ### What changes were proposed in this pull request?
    
    This PR aims to make network local address binding support both IP and FQDN 
strategy.
    
    Additional, it refactors the `ShuffleClientImpl#genAddressPair`, from 
`${hostAndPort}-${hostAndPort}` to `Pair<String, String>`, which works properly 
when using IP but may not on FQDN because FQDN may contain `-`
    
    ### Why are the changes needed?
    
    Currently, when the bind hostname is not set explicitly, Celeborn will find 
the first non-loopback address and always uses the IP to bind, this is not 
suitable for K8s cases, as the STS has a stable FQDN but Pod IP will be changed 
once Pod restarting.
    
    For `ShuffleClientImpl#genAddressPair`, it must be changed otherwise may 
cause
    
    ```
    java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 11657 in stage 0.0 failed 4 times, most recent 
failure: Lost task 11657.3 in stage 0.0 (TID 12747) (10.153.253.198 executor 
157): java.lang.ArrayIndexOutOfBoundsException: 1
            at 
org.apache.celeborn.client.ShuffleClientImpl.doPushMergedData(ShuffleClientImpl.java:874)
            at 
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:735)
            at 
org.apache.celeborn.client.ShuffleClientImpl.mergeData(ShuffleClientImpl.java:827)
            at 
org.apache.spark.shuffle.celeborn.SortBasedPusher.pushData(SortBasedPusher.java:140)
            at 
org.apache.spark.shuffle.celeborn.SortBasedPusher.insertRecord(SortBasedPusher.java:192)
            at 
org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.fastWrite0(SortBasedShuffleWriter.java:192)
            at 
org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.write(SortBasedShuffleWriter.java:145)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:136)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, a new configuration `celeborn.network.bind.preferIpAddress` is 
introduced, and the default value is `true` to preserve the existing behavior.
    
    ### How was this patch tested?
    
    Manually testing with `celeborn.network.bind.preferIpAddress=false`
    ```
    Server:         10.178.96.64
    Address:        10.178.96.64#53
    
    Name:   celeborn-master-0.celeborn-master-svc.spark.svc.cluster.local
    Address: 10.153.143.252
    
    Server:         10.178.96.64
    Address:        10.178.96.64#53
    
    Name:   celeborn-master-1.celeborn-master-svc.spark.svc.cluster.local
    Address: 10.153.173.94
    
    Server:         10.178.96.64
    Address:        10.178.96.64#53
    
    Name:   celeborn-master-2.celeborn-master-svc.spark.svc.cluster.local
    Address: 10.153.149.42
    
    starting org.apache.celeborn.service.deploy.worker.Worker, logging to 
/opt/celeborn/logs/celeborn--org.apache.celeborn.service.deploy.worker.Worker-1-celeborn-worker-4.out
    2023-06-25 23:49:52 [INFO] [main] 
org.apache.celeborn.common.rpc.netty.Dispatcher#51 - Dispatcher numThreads: 4
    2023-06-25 23:49:52 [INFO] [main] 
org.apache.celeborn.common.network.client.TransportClientFactory#91 - mode NIO 
threads 64
    2023-06-25 23:49:52 [INFO] [main] 
org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory#51 - Starting RPC 
Server [WorkerSys] on 
celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0 with advisor 
endpoint celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0
    2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.util.Utils#51 
- Successfully started service 'WorkerSys' on port 38303.
    ```
    
    Closes #1622 from pan3793/CELEBORN-713.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 1753556565e73fa74dd9f3d7722ec09ac0a5b476)
    Signed-off-by: Cheng Pan <[email protected]>
---
 client/pom.xml                                     |  1 -
 .../apache/celeborn/client/ShuffleClientImpl.java  | 31 +++++++++++-----------
 .../apache/celeborn/client/LifecycleManager.scala  |  2 +-
 .../apache/celeborn/common/write/PushState.java    | 23 +++++++---------
 .../org/apache/celeborn/common/CelebornConf.scala  | 31 +++++++++++++++-------
 .../org/apache/celeborn/common/util/Utils.scala    | 14 +++-------
 .../apache/celeborn/common/CelebornConfSuite.scala |  4 +--
 docs/configuration/metrics.md                      |  4 +--
 docs/configuration/network.md                      |  1 +
 docs/deploy_on_k8s.md                              | 26 ++++++++++++++++--
 .../ha/RatisMasterStatusSystemSuiteJ.java          |  8 +++---
 .../deploy/master/MasterArgumentsSuite.scala       |  4 ++-
 .../celeborn/tests/flink/HeartbeatTest.scala       |  6 ++---
 .../celeborn/tests/spark/HeartbeatTest.scala       |  6 ++---
 .../service/deploy/worker/WorkerArguments.scala    |  2 +-
 .../celeborn/service/deploy/HeartbeatFeature.scala | 25 +++++++----------
 .../service/deploy/WorkerArgumentsSuite.scala      |  2 +-
 17 files changed, 102 insertions(+), 88 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 864bfbdb7..813d208f3 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -61,7 +61,6 @@
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 6edf24a04..1dc7605cd 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -30,6 +30,7 @@ import scala.reflect.ClassTag$;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,7 +169,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     }
 
     // init rpc env and master endpointRef
-    rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, conf);
+    rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(conf), 0, 
conf);
 
     String module = TransportModuleConstants.DATA_MODULE;
     TransportConf dataTransportConf =
@@ -281,7 +282,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       StatusCode cause,
       Integer oldGroupedBatchId,
       int remainReviveTimes) {
-    HashMap<String, DataBatches> newDataBatchesMap = new HashMap<>();
+    HashMap<Pair<String, String>, DataBatches> newDataBatchesMap = new 
HashMap<>();
     ArrayList<DataBatches.DataBatch> reviveFailedBatchesMap = new 
ArrayList<>();
     for (DataBatches.DataBatch batch : batches) {
       int partitionId = batch.loc.getId();
@@ -326,8 +327,8 @@ public class ShuffleClientImpl extends ShuffleClient {
       }
     }
 
-    for (Map.Entry<String, DataBatches> entry : newDataBatchesMap.entrySet()) {
-      String addressPair = entry.getKey();
+    for (Map.Entry<Pair<String, String>, DataBatches> entry : 
newDataBatchesMap.entrySet()) {
+      Pair<String, String> addressPair = entry.getKey();
       DataBatches newDataBatches = entry.getValue();
       doPushMergedData(
           addressPair,
@@ -355,14 +356,12 @@ public class ShuffleClientImpl extends ShuffleClient {
     }
   }
 
-  private String genAddressPair(PartitionLocation loc) {
-    String addressPair;
+  private Pair<String, String> genAddressPair(PartitionLocation loc) {
     if (loc.hasPeer()) {
-      addressPair = loc.hostAndPushPort() + "-" + 
loc.getPeer().hostAndPushPort();
+      return Pair.of(loc.hostAndPushPort(), loc.getPeer().hostAndPushPort());
     } else {
-      addressPair = loc.hostAndPushPort();
+      return Pair.of(loc.hostAndPushPort(), null);
     }
-    return addressPair;
   }
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
@@ -908,7 +907,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     } else {
       // add batch data
       logger.debug("Merge batch {}.", nextBatchId);
-      String addressPair = genAddressPair(loc);
+      Pair<String, String> addressPair = genAddressPair(loc);
       boolean shouldPush = pushState.addBatchData(addressPair, loc, 
nextBatchId, body);
       if (shouldPush) {
         limitMaxInFlight(mapKey, pushState, loc.hostAndPushPort());
@@ -1019,12 +1018,12 @@ public class ShuffleClientImpl extends ShuffleClient {
     if (pushState == null) {
       return;
     }
-    ArrayList<Map.Entry<String, DataBatches>> batchesArr =
+    ArrayList<Map.Entry<Pair<String, String>, DataBatches>> batchesArr =
         new ArrayList<>(pushState.batchesMap.entrySet());
     while (!batchesArr.isEmpty()) {
-      Map.Entry<String, DataBatches> entry = 
batchesArr.get(RND.nextInt(batchesArr.size()));
-      String[] tokens = entry.getKey().split("-");
-      limitMaxInFlight(mapKey, pushState, tokens[0]);
+      Map.Entry<Pair<String, String>, DataBatches> entry =
+          batchesArr.get(RND.nextInt(batchesArr.size()));
+      limitMaxInFlight(mapKey, pushState, entry.getKey().getLeft());
       ArrayList<DataBatches.DataBatch> batches = 
entry.getValue().requireBatches(pushBufferMaxSize);
       if (entry.getValue().getTotalSize() == 0) {
         batchesArr.remove(entry);
@@ -1035,14 +1034,14 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   private void doPushMergedData(
-      String addressPair,
+      Pair<String, String> addressPair,
       int shuffleId,
       int mapId,
       int attemptId,
       ArrayList<DataBatches.DataBatch> batches,
       PushState pushState,
       int remainReviveTimes) {
-    String hostPort = addressPair.split("-")[0];
+    String hostPort = addressPair.getLeft();
     final String[] splits = hostPort.split(":");
     final String host = splits[0];
     final int port = Integer.parseInt(splits[1]);
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index f6e4586cc..840a85f27 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -55,7 +55,7 @@ object LifecycleManager {
 class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) 
extends RpcEndpoint
   with Logging {
 
-  private val lifecycleHost = Utils.localHostName
+  private val lifecycleHost = Utils.localHostName(conf)
 
   private val shuffleExpiredCheckIntervalMs = 
conf.shuffleExpiredCheckIntervalMs
   private val pushReplicateEnabled = conf.clientPushReplicateEnabled
diff --git 
a/common/src/main/java/org/apache/celeborn/common/write/PushState.java 
b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
index b082cc489..b61384f2d 100644
--- a/common/src/main/java/org/apache/celeborn/common/write/PushState.java
+++ b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.util.JavaUtils;
@@ -40,25 +42,18 @@ public class PushState {
     inFlightRequestTracker.cleanup();
   }
 
-  // key: ${master addr}-${slave addr} value: list of data batch
-  public final ConcurrentHashMap<String, DataBatches> batchesMap = 
JavaUtils.newConcurrentHashMap();
-
-  /**
-   * Not thread-safe
-   *
-   * @param addressPair
-   * @param loc
-   * @param batchId
-   * @param body
-   * @return
-   */
-  public boolean addBatchData(String addressPair, PartitionLocation loc, int 
batchId, byte[] body) {
+  // key: ${master addr}, ${slave addr} value: list of data batch
+  public final ConcurrentHashMap<Pair<String, String>, DataBatches> batchesMap 
=
+      JavaUtils.newConcurrentHashMap();
+
+  public boolean addBatchData(
+      Pair<String, String> addressPair, PartitionLocation loc, int batchId, 
byte[] body) {
     DataBatches batches = batchesMap.computeIfAbsent(addressPair, (s) -> new 
DataBatches());
     batches.addDataBatch(loc, batchId, body);
     return batches.getTotalSize() > pushBufferMaxSize;
   }
 
-  public DataBatches takeDataBatches(String addressPair) {
+  public DataBatches takeDataBatches(Pair<String, String> addressPair) {
     return batchesMap.remove(addressPair);
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 92a201018..af31918e8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -369,6 +369,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   // //////////////////////////////////////////////////////
   //                      Network                        //
   // //////////////////////////////////////////////////////
+  def bindPreferIP: Boolean = get(NETWORK_BIND_PREFER_IP)
   def portMaxRetries: Int = get(PORT_MAX_RETRY)
   def networkTimeout: RpcTimeout =
     new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key)
@@ -517,7 +518,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   // //////////////////////////////////////////////////////
   def masterEndpoints: Array[String] =
     get(MASTER_ENDPOINTS).toArray.map { endpoint =>
-      Utils.parseHostPort(endpoint) match {
+      Utils.parseHostPort(endpoint.replace("<localhost>", 
Utils.localHostName(this))) match {
         case (host, 0) => s"$host:${HA_MASTER_NODE_PORT.defaultValue.get}"
         case (host, port) => s"$host:$port"
       }
@@ -528,7 +529,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
 
   def masterClientMaxRetries: Int = get(MASTER_CLIENT_MAX_RETRIES)
 
-  def masterHost: String = get(MASTER_HOST)
+  def masterHost: String = get(MASTER_HOST).replace("<localhost>", 
Utils.localHostName(this))
 
   def masterPort: Int = get(MASTER_PORT)
 
@@ -563,7 +564,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def haMasterNodeHost(nodeId: String): String = {
     val key = HA_MASTER_NODE_HOST.key.replace("<id>", nodeId)
     val legacyKey = HA_MASTER_NODE_HOST.alternatives.head._1.replace("<id>", 
nodeId)
-    get(key, get(legacyKey, Utils.localHostName))
+    get(key, get(legacyKey, Utils.localHostName(this)))
   }
 
   def haMasterNodePort(nodeId: String): Int = {
@@ -652,12 +653,14 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE)
   def metricsCollectCriticalEnabled: Boolean = 
get(METRICS_COLLECT_CRITICAL_ENABLED)
   def metricsCapacity: Int = get(METRICS_CAPACITY)
-  def masterPrometheusMetricHost: String = get(MASTER_PROMETHEUS_HOST)
+  def masterPrometheusMetricHost: String =
+    get(MASTER_PROMETHEUS_HOST).replace("<localhost>", 
Utils.localHostName(this))
   def masterPrometheusMetricPort: Int = get(MASTER_PROMETHEUS_PORT)
-  def workerPrometheusMetricHost: String = get(WORKER_PROMETHEUS_HOST)
+  def workerPrometheusMetricHost: String =
+    get(WORKER_PROMETHEUS_HOST).replace("<localhost>", 
Utils.localHostName(this))
   def workerPrometheusMetricPort: Int = get(WORKER_PROMETHEUS_PORT)
   def metricsExtraLabels: Map[String, String] =
-    get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels(_)).toMap
+    get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
   def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
   def metricsAppTopDiskUsageWindowSize: Int = 
get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
   def metricsAppTopDiskUsageInterval: Long = 
get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
@@ -1171,6 +1174,16 @@ object CelebornConf extends Logging {
 
   private def buildConf(key: String): ConfigBuilder = 
ConfigBuilder(key).onCreate(register)
 
+  val NETWORK_BIND_PREFER_IP: ConfigEntry[Boolean] =
+    buildConf("celeborn.network.bind.preferIpAddress")
+      .categories("network")
+      .version("0.3.0")
+      .doc("When `ture`, prefer to use IP address, otherwise FQDN. This 
configuration only " +
+        "takes effects when the bind hostname is not set explicitly, in such 
case, Celeborn " +
+        "will find the first non-loopback address to bind.")
+      .booleanConf
+      .createWithDefault(true)
+
   val NETWORK_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.network.timeout")
       .categories("network")
@@ -1448,7 +1461,6 @@ object CelebornConf extends Logging {
         "If the port is omitted, 9097 will be used.")
       .version("0.2.0")
       .stringConf
-      .transform(_.replace("<localhost>", Utils.localHostName))
       .toSequence
       .checkValue(
         endpoints => endpoints.map(_ => 
Try(Utils.parseHostPort(_))).forall(_.isSuccess),
@@ -1498,7 +1510,6 @@ object CelebornConf extends Logging {
       .version("0.2.0")
       .doc("Hostname for master to bind.")
       .stringConf
-      .transform(_.replace("<localhost>", Utils.localHostName))
       .createWithDefaultString("<localhost>")
 
   val MASTER_PORT: ConfigEntry[Int] =
@@ -3293,7 +3304,7 @@ object CelebornConf extends Logging {
       .doc("Master's Prometheus host.")
       .version("0.3.0")
       .stringConf
-      .createWithDefault("0.0.0.0")
+      .createWithDefault("<localhost>")
 
   val MASTER_PROMETHEUS_PORT: ConfigEntry[Int] =
     buildConf("celeborn.metrics.master.prometheus.port")
@@ -3312,7 +3323,7 @@ object CelebornConf extends Logging {
       .doc("Worker's Prometheus host.")
       .version("0.3.0")
       .stringConf
-      .createWithDefault("0.0.0.0")
+      .createWithDefault("<localhost>")
 
   val WORKER_PROMETHEUS_PORT: ConfigEntry[Int] =
     buildConf("celeborn.metrics.worker.prometheus.port")
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index d7fb75073..247d77657 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -382,22 +382,14 @@ object Utils extends Logging {
 
   private var customHostname: Option[String] = 
sys.env.get("CELEBORN_LOCAL_HOSTNAME")
 
+  // for testing
   def setCustomHostname(hostname: String) {
-    // DEBUG code
     Utils.checkHost(hostname)
     customHostname = Some(hostname)
   }
 
-  def localCanonicalHostName: String = {
-    customHostname.getOrElse(localIpAddress.getCanonicalHostName)
-  }
-
-  def localHostName: String = {
-    customHostname.getOrElse(localIpAddress.getHostAddress)
-  }
-
-  def localHostNameForURI: String = {
-    customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
+  def localHostName(conf: CelebornConf): String = customHostname.getOrElse {
+    if (conf.bindPreferIP) localIpAddress.getHostAddress else 
localIpAddress.getCanonicalHostName
   }
 
   /**
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 2b12b9f74..e7766040d 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -144,11 +144,11 @@ class CelebornConfSuite extends CelebornFunSuite {
     val conf = new CelebornConf()
     val replacedHost = conf.masterHost
     assert(!replacedHost.contains("<localhost>"))
-    assert(replacedHost === Utils.localHostName)
+    assert(replacedHost === Utils.localHostName(conf))
     val replacedHosts = conf.masterEndpoints
     replacedHosts.foreach { replacedHost =>
       assert(!replacedHost.contains("<localhost>"))
-      assert(replacedHost contains Utils.localHostName)
+      assert(replacedHost contains Utils.localHostName(conf))
     }
   }
 
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index 8a2999af5..817e03572 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -27,10 +27,10 @@ license: |
 | celeborn.metrics.conf | &lt;undefined&gt; | Custom metrics configuration 
file path. Default use `metrics.properties` in classpath. | 0.3.0 | 
 | celeborn.metrics.enabled | true | When true, enable metrics system. | 0.2.0 
| 
 | celeborn.metrics.extraLabels |  | If default metric labels are not enough, 
extra metric labels can be customized. Labels' pattern is: 
`<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`; e.g. 
`env=prod,version=1` | 0.3.0 | 
-| celeborn.metrics.master.prometheus.host | 0.0.0.0 | Master's Prometheus 
host. | 0.3.0 | 
+| celeborn.metrics.master.prometheus.host | &lt;localhost&gt; | Master's 
Prometheus host. | 0.3.0 | 
 | celeborn.metrics.master.prometheus.port | 9098 | Master's Prometheus port. | 
0.3.0 | 
 | celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer 
metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | 
 | celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size 
of timer metric. | 0.2.0 | 
-| celeborn.metrics.worker.prometheus.host | 0.0.0.0 | Worker's Prometheus 
host. | 0.3.0 | 
+| celeborn.metrics.worker.prometheus.host | &lt;localhost&gt; | Worker's 
Prometheus host. | 0.3.0 | 
 | celeborn.metrics.worker.prometheus.port | 9096 | Worker's Prometheus port. | 
0.3.0 | 
 <!--end-include-->
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 28dc7fd01..1784ca3ec 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -38,6 +38,7 @@ license: |
 | celeborn.&lt;module&gt;.io.serverThreads | 0 | Number of threads used in the 
server thread pool. Default to 0, which is 2x#cores. |  | 
 | celeborn.&lt;module&gt;.push.timeoutCheck.interval | 5s | Interval for 
checking push data timeout. If setting <module> to `data`, it works for shuffle 
client push data and should be configured on client side. If setting <module> 
to `replicate`, it works for worker replicate data to peer worker and should be 
configured on worker side. | 0.3.0 | 
 | celeborn.&lt;module&gt;.push.timeoutCheck.threads | 16 | Threads num for 
checking push data timeout. If setting <module> to `data`, it works for shuffle 
client push data and should be configured on client side. If setting <module> 
to `replicate`, it works for worker replicate data to peer worker and should be 
configured on worker side. | 0.3.0 | 
+| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP 
address, otherwise FQDN. This configuration only takes effects when the bind 
hostname is not set explicitly, in such case, Celeborn will find the first 
non-loopback address to bind. | 0.3.0 | 
 | celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 
0.2.0 | 
 | celeborn.network.memory.allocator.numArenas | &lt;undefined&gt; | Number of 
arenas for pooled memory allocator. Default value is 
Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 | 
 | celeborn.network.memory.allocator.share | false | Whether to share memory 
allocator. | 0.3.0 | 
diff --git a/docs/deploy_on_k8s.md b/docs/deploy_on_k8s.md
index c7d79ee08..2e1ad36a7 100644
--- a/docs/deploy_on_k8s.md
+++ b/docs/deploy_on_k8s.md
@@ -82,7 +82,7 @@ celeborn-worker-0   1/1       Running            0          1m
 ...
 ```
 
-Given that Celeborn Master/Worker takes time to start, you can see the 
following phenomenon:
+Given that Celeborn Master/Worker Pod takes time to start, you can see the 
following phenomenon:
 
 ```
 ** server can't find 
celeborn-master-0.celeborn-master-svc.default.svc.cluster.local: NXDOMAIN
@@ -113,7 +113,29 @@ starting org.apache.celeborn.service.deploy.master.Master, 
logging to /opt/celeb
 23/03/23 14:10:56,216 INFO [main] Master: Master started.
 ```
 
-### 5. Build Celeborn Client
+### 5. Access Celeborn Service
+
+The Celeborn Master/Worker nodes deployed via official Helm charts run as 
[StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/),
+it can be accessed through Pod IP or [Stable Network ID (DNS 
name)](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#stable-network-id),
+in above case, the Master/Worker nodes can be accessed through:
+
+```
+celeborn-master-0.celeborn-master-svc.default.svc.cluster.local`
+...
+celeborn-worker-0.celeborn-worker-svc.default.svc.cluster.local`
+...
+```
+
+After a restart, the StatefulSet Pod IP changes but the DNS name remains, this 
is important for rolling upgrade.
+
+When bind address is not set explicitly, Celeborn worker is going to find the 
first non-loopback address to bind. By default,
+it use IP address both for address binding and registering, that causes the 
Master and Client use the IP address to access the
+Worker, it's problematic after Worker restart as explained above, especially 
when Graceful Shutdown is enabled.
+
+You may want to set `celeborn.network.bind.preferIpAddress=false` to address 
such issue. Note that, depends on your Kubernetes
+network infrastructure, this may cause pressure on DNS service or other 
network issues compared with using IP address directly.
+
+### 6. Build Celeborn Client
 
 Here, without going into detail on how to configure spark/flink to find 
celeborn master/worker, mention the key
 configuration:
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index fdb0ca0e1..9b9bea917 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -112,25 +112,23 @@ public class RatisMasterStatusSystemSuiteJ {
     int ratisPort2 = 9873;
     int ratisPort3 = 9874;
 
-    String localHost = Utils.localHostName();
-
     MasterNode masterNode1 =
         new MasterNode.Builder()
-            .setHost(localHost)
+            .setHost(Utils.localHostName(conf1))
             .setRatisPort(ratisPort1)
             .setRpcPort(9872)
             .setNodeId(id1)
             .build();
     MasterNode masterNode2 =
         new MasterNode.Builder()
-            .setHost(localHost)
+            .setHost(Utils.localHostName(conf2))
             .setRatisPort(ratisPort2)
             .setRpcPort(9873)
             .setNodeId(id2)
             .build();
     MasterNode masterNode3 =
         new MasterNode.Builder()
-            .setHost(localHost)
+            .setHost(Utils.localHostName(conf3))
             .setRatisPort(ratisPort3)
             .setRpcPort(9874)
             .setNodeId(id3)
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
index 3e17e1ab7..ae7402585 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterArgumentsSuite.scala
@@ -31,7 +31,9 @@ class MasterArgumentsSuite extends AnyFunSuite with Logging {
     val conf1 = new CelebornConf()
 
     val arguments1 = new MasterArguments(args1, conf1)
-    assert(arguments1.host === sys.env.getOrElse("CELEBORN_LOCAL_HOSTNAME", 
Utils.localHostName))
+    assert(arguments1.host === sys.env.getOrElse(
+      "CELEBORN_LOCAL_HOSTNAME",
+      Utils.localHostName(conf1)))
     assert(arguments1.port === 9097)
 
     // should use celeborn conf
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
index 0f7a065a2..fb2a4b280 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
@@ -30,7 +30,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   with BeforeAndAfterAll with BeforeAndAfterEach {
 
   test("celeborn flink hearbeat test - client <- worker") {
-    val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf()
+    val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
     val flinkShuffleClientImpl =
       new FlinkShuffleClientImpl(
         "",
@@ -45,7 +45,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   }
 
   test("celeborn flink hearbeat test - client <- worker no heartbeat") {
-    val (_, clientConf) = 
getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
+    val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
     val flinkShuffleClientImpl =
       new FlinkShuffleClientImpl(
         "",
@@ -60,7 +60,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   }
 
   test("celeborn flink hearbeat test - client <- worker timeout") {
-    val (_, clientConf) = 
getTestHeartbeatFromWorker2ClientWithCloseChannelConf()
+    val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
     val flinkShuffleClientImpl =
       new FlinkShuffleClientImpl(
         "",
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
index 5e167f18c..af6142022 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
@@ -37,21 +37,21 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   }
 
   test("celeborn spark heartbeat test - client <- worker") {
-    val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf()
+    val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
     val shuffleClientImpl =
       new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
     testHeartbeatFromWorker2Client(shuffleClientImpl.getDataClientFactory)
   }
 
   test("celeborn spark heartbeat test - client <- worker on heartbeat") {
-    val (_, clientConf) = 
getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
+    val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
     val shuffleClientImpl =
       new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
     
testHeartbeatFromWorker2ClientWithNoHeartbeat(shuffleClientImpl.getDataClientFactory)
   }
 
   test("celeborn spark heartbeat test - client <- worker timeout") {
-    val (_, clientConf) = 
getTestHeartbeatFromWorker2ClientWithCloseChannelConf()
+    val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
     val shuffleClientImpl =
       new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
     
testHeartbeatFromWorker2ClientWithCloseChannel(shuffleClientImpl.getDataClientFactory)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
index 32231f8f5..6f8fb682b 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
@@ -34,7 +34,7 @@ class WorkerArguments(args: Array[String], conf: 
CelebornConf) {
   parse(args.toList)
   // 2nd read from configuration file
   _propertiesFile = Some(Utils.loadDefaultCelebornProperties(conf, 
_propertiesFile.orNull))
-  _host = _host.orElse(Some(Utils.localHostName))
+  _host = _host.orElse(Some(Utils.localHostName(conf)))
   _port = _port.orElse(Some(conf.workerRpcPort))
 
   def host: String = _host.get
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
index 27112eef4..dad8dcd38 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
@@ -41,29 +41,26 @@ trait HeartbeatFeature extends MiniClusterFeature {
       val (_master, _workers) = setUpMiniCluster(masterConf, workerConf, 
workerNum = 1)
       master = _master
       workers = _workers
-      workers.map(w => {
+      workers.foreach { w =>
         val (pushPort, fetchPort) = w.getPushFetchServerPort
         logInfo(s"worker port1:$pushPort $fetchPort")
         val clientPush =
-          dataClientFactory.createClient(Utils.localHostName, pushPort, 0)
+          dataClientFactory.createClient(Utils.localHostName(w.conf), 
pushPort, 0)
         val clientFetch =
-          dataClientFactory.createClient(
-            Utils.localHostName,
-            fetchPort,
-            0)
+          dataClientFactory.createClient(Utils.localHostName(w.conf), 
fetchPort, 0)
         logInfo(s"worker port2:$clientPush $clientFetch")
-        // At Beggining, the client is active
+        // At beginning, the client is active
         Assert.assertTrue(clientPush.isActive)
         Assert.assertTrue(clientFetch.isActive)
         assertFunc(clientPush, clientFetch)
-      })
+      }
     } finally {
       if (master != null && workers != null)
         shutdownMiniCluster()
     }
   }
 
-  def getTestHeartbeatFromWorker2ClientConf(): (Map[String, String], 
CelebornConf) = {
+  def getTestHeartbeatFromWorker2ClientConf: (Map[String, String], 
CelebornConf) = {
     val workerConf = Map(
       CelebornConf.MASTER_ENDPOINTS.key -> "localhost:9097",
       "celeborn.push.heartbeat.interval" -> "4s",
@@ -77,7 +74,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
   }
 
   def testHeartbeatFromWorker2Client(dataClientFactory: 
TransportClientFactory): Unit = {
-    val (workerConf, _) = getTestHeartbeatFromWorker2ClientConf()
+    val (workerConf, _) = getTestHeartbeatFromWorker2ClientConf
     // client <- worker:default client do not send heartbeat to worker, and 
worker sends hearbeat to client
     // client active: after connection timeout, the channel still be active
     testCore(
@@ -91,8 +88,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
       })
   }
 
-  def getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
-      : (Map[String, String], CelebornConf) = {
+  def getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf: (Map[String, 
String], CelebornConf) = {
     val workerConf = Map(
       "celeborn.master.endpoints" -> "localhost:9097",
       "celeborn.push.heartbeat.interval" -> "4s",
@@ -106,7 +102,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
 
   def testHeartbeatFromWorker2ClientWithNoHeartbeat(dataClientFactory: 
TransportClientFactory)
       : Unit = {
-    val (workerConf, _) = 
getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf()
+    val (workerConf, _) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
 
     // client <- worker:default client do not send heartbeat to worker, and 
worker sends hearbeat to client
     // client active: after connection timeout, the channel still be active
@@ -121,8 +117,7 @@ trait HeartbeatFeature extends MiniClusterFeature {
       })
   }
 
-  def getTestHeartbeatFromWorker2ClientWithCloseChannelConf()
-      : (Map[String, String], CelebornConf) = {
+  def getTestHeartbeatFromWorker2ClientWithCloseChannelConf: (Map[String, 
String], CelebornConf) = {
     val workerConf = Map(
       CelebornConf.MASTER_ENDPOINTS.key -> "localhost:9097",
       "celeborn.fetch.io.connectionTimeout" -> "9s",
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
index a092c1b11..a4e7c4f30 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/WorkerArgumentsSuite.scala
@@ -32,7 +32,7 @@ class WorkerArgumentsSuite extends AnyFunSuite with Logging {
     val conf1 = new CelebornConf()
 
     val arguments1 = new WorkerArguments(args1, conf1)
-    assert(arguments1.host.equals(Utils.localHostName))
+    assert(arguments1.host.equals(Utils.localHostName(conf1)))
     assert(arguments1.port == 0)
 
     // should use celeborn conf


Reply via email to