This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 993d3f258 [CELEBORN-1398] Support return leader ip to client
993d3f258 is described below

commit 993d3f2587b1c9cd8acd1a593a186d8044487c14
Author: Shuang <[email protected]>
AuthorDate: Wed May 8 15:01:55 2024 +0800

    [CELEBORN-1398] Support return leader ip to client
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Currently, if accessing services of a Celeborn cluster across Kubernetes 
clusters, one may encounter DNS resolution issues. However, connectivity may be 
achieved through IP addresses when combined with the Kubernetes setting 
hostNetwork=true for clients from different clusters. At present, the 
`celeborn.network.bind.preferIpAddress` configuration is only effective on 
worker nodes. This PR will enable the feature of returning the leader's IP when 
accessing the master node.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA
    
    Closes #2489 from RexXiong/CELEBORN-1398.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../common/client/MasterNotLeaderException.java    | 21 +++++--
 .../org/apache/celeborn/common/util/Utils.scala    | 21 +++++++
 .../deploy/master/clustermeta/ha/HAHelper.java     | 16 ++---
 .../deploy/master/clustermeta/ha/HARaftServer.java | 71 ++++++++++++++--------
 .../celeborn/service/deploy/master/Master.scala    |  5 +-
 .../deploy/master/clustermeta/ha/MasterNode.scala  |  4 ++
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 18 ++++--
 7 files changed, 109 insertions(+), 47 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
 
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index b613e1041..e1f931418 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import javax.annotation.Nullable;
 
+import scala.Tuple2;
+
 import org.apache.commons.lang3.StringUtils;
 
 public class MasterNotLeaderException extends IOException {
@@ -34,19 +36,25 @@ public class MasterNotLeaderException extends IOException {
 
   public MasterNotLeaderException(
       String currentPeer, String suggestedLeaderPeer, @Nullable Throwable 
cause) {
-    this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause);
+    this(
+        currentPeer,
+        Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
+        Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
+        false,
+        cause);
   }
 
   public MasterNotLeaderException(
       String currentPeer,
-      String suggestedLeaderPeer,
-      String suggestedInternalLeaderPeer,
+      Tuple2<String, String> suggestedLeaderPeer,
+      Tuple2<String, String> suggestedInternalLeaderPeer,
+      boolean bindPreferIp,
       @Nullable Throwable cause) {
     super(
         String.format(
             "Master:%s is not the leader.%s%s",
             currentPeer,
-            currentPeer.equals(suggestedLeaderPeer)
+            currentPeer.equals(suggestedLeaderPeer._1)
                 ? StringUtils.EMPTY
                 : String.format(
                     " Suggested leader is Master:%s (%s).",
@@ -55,8 +63,9 @@ public class MasterNotLeaderException extends IOException {
                 ? StringUtils.EMPTY
                 : String.format(" Exception:%s.", cause.getMessage())),
         cause);
-    this.leaderPeer = suggestedLeaderPeer;
-    this.internalLeaderPeer = suggestedInternalLeaderPeer;
+    this.leaderPeer = bindPreferIp ? suggestedLeaderPeer._1 : 
suggestedLeaderPeer._2;
+    this.internalLeaderPeer =
+        bindPreferIp ? suggestedInternalLeaderPeer._1 : 
suggestedInternalLeaderPeer._2;
   }
 
   public String getSuggestedLeaderAddress() {
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 7eef1f4f0..c5af4abf8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -432,6 +432,27 @@ object Utils extends Logging {
     }
   }
 
+  private def getIpHostAddressPair(host: String): (String, String) = {
+    try {
+      val inetAddress = InetAddress.getByName(host)
+      val hostAddress = inetAddress.getHostAddress
+      if (host.equals(hostAddress)) {
+        (hostAddress, inetAddress.getCanonicalHostName)
+      } else {
+        (hostAddress, host)
+      }
+    } catch {
+      case _: Throwable => (host, host) // return original input
+    }
+  }
+
+  // Convert address (ip:port or host:port) to (ip:port, host:port) pair
+  def addressToIpHostAddressPair(address: String): (String, String) = {
+    val (host, port) = Utils.parseHostPort(address)
+    val (_ip, _host) = Utils.getIpHostAddressPair(host)
+    (_ip + ":" + port, _host + ":" + port)
+  }
+
   def checkHostPort(hostPort: String): Unit = {
     if (hostPort != null && hostPort.split(":").length > 2) {
       assert(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 8d2669f20..71bcaf064 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -21,8 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
 
-import scala.Tuple2;
-
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -37,29 +35,31 @@ import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 public class HAHelper {
 
   public static boolean checkShouldProcess(
-      RpcCallContext context, AbstractMetaManager masterStatusSystem) {
+      RpcCallContext context, AbstractMetaManager masterStatusSystem, boolean 
bindPreferIp) {
     HARaftServer ratisServer = getRatisServer(masterStatusSystem);
     if (ratisServer != null) {
       if (ratisServer.isLeader()) {
         return true;
       }
-      sendFailure(context, ratisServer, null);
+      sendFailure(context, ratisServer, null, bindPreferIp);
       return false;
     }
     return true;
   }
 
   public static void sendFailure(
-      RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
+      RpcCallContext context, HARaftServer ratisServer, Throwable cause, 
boolean bindPreferIp) {
     if (context != null) {
       if (ratisServer != null) {
-        Optional<Tuple2<String, String>> leaderPeer = 
ratisServer.getCachedLeaderPeerRpcEndpoint();
+        Optional<HARaftServer.LeaderPeerEndpoints> leaderPeer =
+            ratisServer.getCachedLeaderPeerRpcEndpoint();
         if (leaderPeer.isPresent()) {
           context.sendFailure(
               new MasterNotLeaderException(
                   ratisServer.getRpcEndpoint(),
-                  leaderPeer.get()._1(),
-                  leaderPeer.get()._2(),
+                  leaderPeer.get().rpcEndpoints,
+                  leaderPeer.get().rpcInternalEndpoints,
+                  bindPreferIp,
                   cause));
         } else {
           context.sendFailure(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index b9d690d33..5a5694d86 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -55,6 +55,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.exception.CelebornRuntimeException;
 import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
 
@@ -72,6 +73,7 @@ public class HARaftServer {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
   }
 
+  private final MasterNode localNode;
   private final InetSocketAddress ratisAddr;
   private final String rpcEndpoint;
   private final String internalRpcEndpoint;
@@ -90,7 +92,8 @@ public class HARaftServer {
   private long roleCheckIntervalMs;
   private final ReentrantReadWriteLock roleCheckLock = new 
ReentrantReadWriteLock();
   private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
-  private Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoints = 
Optional.empty();
+  private Optional<LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoints = 
Optional.empty();
+
   private final CelebornConf conf;
   private long workerTimeoutDeadline;
   private long appTimeoutDeadline;
@@ -100,7 +103,7 @@ public class HARaftServer {
    *
    * @param conf configuration
    * @param localRaftPeerId raft peer id of this Ratis server
-   * @param ratisAddr address of the ratis server
+   * @param localNode local node of this Ratis server
    * @param raftPeers peer nodes in the raft ring
    * @throws IOException
    */
@@ -108,15 +111,14 @@ public class HARaftServer {
       MetaHandler metaHandler,
       CelebornConf conf,
       RaftPeerId localRaftPeerId,
-      InetSocketAddress ratisAddr,
-      String rpcEndpoint,
-      String internalRpcEndpoint,
+      MasterNode localNode,
       List<RaftPeer> raftPeers)
       throws IOException {
     this.metaHandler = metaHandler;
-    this.ratisAddr = ratisAddr;
-    this.rpcEndpoint = rpcEndpoint;
-    this.internalRpcEndpoint = internalRpcEndpoint;
+    this.localNode = localNode;
+    this.ratisAddr = localNode.ratisAddr();
+    this.rpcEndpoint = localNode.rpcEndpoint();
+    this.internalRpcEndpoint = localNode.internalRpcEndpoint();
     this.raftPeerId = localRaftPeerId;
     this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
     this.masterStateMachine = getStateMachine();
@@ -201,14 +203,8 @@ public class HARaftServer {
           // Add other nodes belonging to the same service to the Ratis ring
           raftPeers.add(raftPeer);
         });
-    return new HARaftServer(
-        metaHandler,
-        conf,
-        localRaftPeerId,
-        ratisAddr,
-        localNode.rpcEndpoint(),
-        localNode.internalRpcEndpoint(),
-        raftPeers);
+
+    return new HARaftServer(metaHandler, conf, localRaftPeerId, localNode, 
raftPeers);
   }
 
   public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
@@ -436,9 +432,9 @@ public class HARaftServer {
   /**
    * Get the suggested leader peer id.
    *
-   * @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint, 
internal rpc endpoint)
+   * @return RaftPeerId of the suggested leader node - 
Optional<LeaderPeerEndpoints>
    */
-  public Optional<Tuple2<String, String>> getCachedLeaderPeerRpcEndpoint() {
+  public Optional<LeaderPeerEndpoints> getCachedLeaderPeerRpcEndpoint() {
     this.roleCheckLock.readLock().lock();
     try {
       return cachedLeaderPeerRpcEndpoints;
@@ -455,23 +451,29 @@ public class HARaftServer {
       GroupInfoReply groupInfo = getGroupInfo();
       RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
       RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
-
+      Tuple2<String, String> leaderPeerRpcEndpoint = null;
+      Tuple2<String, String> leaderPeerInternalRpcEndpoint = null;
       if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
-        setServerRole(thisNodeRole, getRpcEndpoint(), 
getInternalRpcEndpoint());
+        // Current Node always uses original rpcEndpoint/internalRpcEndpoint, 
as if something wrong
+        // they would never return to client.
+        setServerRole(
+            thisNodeRole,
+            Tuple2.apply(this.rpcEndpoint, this.rpcEndpoint),
+            Tuple2.apply(this.internalRpcEndpoint, this.internalRpcEndpoint));
       } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
         ByteString leaderNodeId = 
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
         // There may be a chance, here we get leaderNodeId as null. For
         // example, in 3 node Ratis, if 2 nodes are down, there will
         // be no leader.
-        String leaderPeerRpcEndpoint = null;
-        String leaderPeerInternalRpcEndpoint = null;
         if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
-          leaderPeerRpcEndpoint =
+          String clientAddress =
               
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
+          leaderPeerRpcEndpoint = 
Utils.addressToIpHostAddressPair(clientAddress);
           // We use admin address to host the internal rpc address
           if (conf.internalPortEnabled()) {
-            leaderPeerInternalRpcEndpoint =
+            String adminAddress =
                 
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress();
+            leaderPeerInternalRpcEndpoint = 
Utils.addressToIpHostAddressPair(adminAddress);
           } else {
             leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint;
           }
@@ -495,8 +497,8 @@ public class HARaftServer {
   /** Set the current server role and the leader peer rpc endpoint. */
   private void setServerRole(
       RaftProtos.RaftPeerRole currentRole,
-      String leaderPeerRpcEndpoint,
-      String leaderPeerInternalRpcEndpoint) {
+      Tuple2<String, String> leaderPeerRpcEndpoint,
+      Tuple2<String, String> leaderPeerInternalRpcEndpoint) {
     this.roleCheckLock.writeLock().lock();
     try {
       boolean leaderChanged = false;
@@ -518,7 +520,8 @@ public class HARaftServer {
       this.cachedPeerRole = Optional.ofNullable(currentRole);
       if (null != leaderPeerRpcEndpoint) {
         this.cachedLeaderPeerRpcEndpoints =
-            Optional.of(Tuple2.apply(leaderPeerRpcEndpoint, 
leaderPeerInternalRpcEndpoint));
+            Optional.of(
+                new LeaderPeerEndpoints(leaderPeerRpcEndpoint, 
leaderPeerInternalRpcEndpoint));
       } else {
         this.cachedLeaderPeerRpcEndpoints = Optional.empty();
       }
@@ -578,4 +581,18 @@ public class HARaftServer {
   public long getAppTimeoutDeadline() {
     return appTimeoutDeadline;
   }
+
+  public static class LeaderPeerEndpoints {
+    // the rpcEndpoints Tuple2 (ip:port, host:port)
+    public final Tuple2<String, String> rpcEndpoints;
+
+    // the rpcInternalEndpoints Tuple2 (ip:port, host:port)
+    public final Tuple2<String, String> rpcInternalEndpoints;
+
+    public LeaderPeerEndpoints(
+        Tuple2<String, String> rpcEndpoints, Tuple2<String, String> 
rpcInternalEndpoints) {
+      this.rpcEndpoints = rpcEndpoints;
+      this.rpcInternalEndpoints = rpcInternalEndpoints;
+    }
+  }
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 0d8bd3bae..3cdc8c512 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -76,6 +76,7 @@ private[celeborn] class Master(
   metricsSystem.registerSource(new JVMCPUSource(conf, 
MetricsSystem.ROLE_MASTER))
   metricsSystem.registerSource(new SystemMiscSource(conf, 
MetricsSystem.ROLE_MASTER))
 
+  private val bindPreferIP: Boolean = conf.bindPreferIP
   private val authEnabled = conf.authEnabled
   private val secretRegistry = new MasterSecretRegistryImpl()
   private val sendApplicationMetaThreads = 
conf.masterSendApplicationMetaThreads
@@ -358,12 +359,12 @@ private[celeborn] class Master(
   }
 
   def executeWithLeaderChecker[T](context: RpcCallContext, f: => T): Unit =
-    if (HAHelper.checkShouldProcess(context, statusSystem)) {
+    if (HAHelper.checkShouldProcess(context, statusSystem, bindPreferIP)) {
       try {
         f
       } catch {
         case e: Exception =>
-          HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem), 
e)
+          HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem), 
e, bindPreferIP)
       }
     }
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index b60b76eda..0f2b09ca9 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -40,8 +40,12 @@ case class MasterNode(
 
   def rpcEndpoint: String = rpcHost + ":" + rpcPort
 
+  def rpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":" + rpcPort
+
   def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort
 
+  def internalRpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":" 
+ rpcPort
+
   lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)
 
   lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 75ba1a298..8f7307115 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -183,7 +183,7 @@ public class RatisMasterStatusSystemSuiteJ {
   }
 
   @Test
-  public void testLeaderAvaiable() {
+  public void testLeaderAvailable() {
     boolean hasLeader =
         RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || 
RATISSERVER3.isLeader();
     Assert.assertTrue(hasLeader);
@@ -201,12 +201,22 @@ public class RatisMasterStatusSystemSuiteJ {
     boolean isFollowerCurrentLeader = follower.isLeader();
     Assert.assertFalse(isFollowerCurrentLeader);
 
-    Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoint =
+    Optional<HARaftServer.LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoint =
         follower.getCachedLeaderPeerRpcEndpoint();
 
     Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
-    Assert.assertEquals(leader.getRpcEndpoint(), 
cachedLeaderPeerRpcEndpoint.get()._1());
-    Assert.assertEquals(leader.getInternalRpcEndpoint(), 
cachedLeaderPeerRpcEndpoint.get()._2());
+
+    Tuple2<String, String> rpcEndpointsPair = 
cachedLeaderPeerRpcEndpoint.get().rpcEndpoints;
+    Tuple2<String, String> rpcInternalEndpointsPair =
+        cachedLeaderPeerRpcEndpoint.get().rpcInternalEndpoints;
+
+    // rpc endpoint may use custom host name then this ut need check ever 
ip/host
+    Assert.assertTrue(
+        leader.getRpcEndpoint().equals(rpcEndpointsPair._1)
+            || leader.getRpcEndpoint().equals(rpcEndpointsPair._2));
+    Assert.assertTrue(
+        leader.getInternalRpcEndpoint().equals(rpcInternalEndpointsPair._1)
+            || leader.getRpcEndpoint().equals(rpcInternalEndpointsPair._2));
   }
 
   private static final String HOSTNAME1 = "host1";

Reply via email to