This is an automated email from the ASF dual-hosted git repository.

csingh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 186899f53 [CELEBORN-1371] Update ratis with internal port endpoint 
address as well (#2446)
186899f53 is described below

commit 186899f53f8fa807b429684a7703c9b5ee797ec2
Author: Mridul Muralidharan <[email protected]>
AuthorDate: Fri Apr 5 13:42:03 2024 -0500

    [CELEBORN-1371] Update ratis with internal port endpoint address as well 
(#2446)
    
    * Update ratis with internal port endpoint address as well, and propagate 
it to workers, while keeping existing path for applications the same
    ---------
    
    Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../celeborn/common/client/MasterClient.java       |  5 +-
 .../common/client/MasterNotLeaderException.java    | 18 ++++++-
 .../deploy/master/clustermeta/ha/HAHelper.java     |  9 +++-
 .../deploy/master/clustermeta/ha/HARaftServer.java | 57 +++++++++++++++++-----
 .../master/clustermeta/ha/MasterClusterInfo.scala  |  4 +-
 .../deploy/master/clustermeta/ha/MasterNode.scala  | 14 +++++-
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 25 ++++++++++
 7 files changed, 114 insertions(+), 18 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java 
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index c34e1050b..94e4201a9 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -175,7 +175,10 @@ public class MasterClient {
     // 'CelebornException: Exception thrown in awaitResult'
     if (e.getCause() instanceof MasterNotLeaderException) {
       MasterNotLeaderException exception = (MasterNotLeaderException) 
e.getCause();
-      String leaderAddr = exception.getSuggestedLeaderAddress();
+      String leaderAddr =
+          isWorker
+              ? exception.getSuggestedInternalLeaderAddress()
+              : exception.getSuggestedLeaderAddress();
       if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) {
         setRpcEndpointRef(leaderAddr);
       } else {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
 
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index 38198cf78..b613e1041 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -28,26 +28,42 @@ public class MasterNotLeaderException extends IOException {
   private static final long serialVersionUID = -2552475565785098271L;
 
   private final String leaderPeer;
+  private final String internalLeaderPeer;
 
   public static final String LEADER_NOT_PRESENTED = "leader is not present";
 
   public MasterNotLeaderException(
       String currentPeer, String suggestedLeaderPeer, @Nullable Throwable 
cause) {
+    this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause);
+  }
+
+  public MasterNotLeaderException(
+      String currentPeer,
+      String suggestedLeaderPeer,
+      String suggestedInternalLeaderPeer,
+      @Nullable Throwable cause) {
     super(
         String.format(
             "Master:%s is not the leader.%s%s",
             currentPeer,
             currentPeer.equals(suggestedLeaderPeer)
                 ? StringUtils.EMPTY
-                : String.format(" Suggested leader is Master:%s.", 
suggestedLeaderPeer),
+                : String.format(
+                    " Suggested leader is Master:%s (%s).",
+                    suggestedLeaderPeer, suggestedInternalLeaderPeer),
             cause == null
                 ? StringUtils.EMPTY
                 : String.format(" Exception:%s.", cause.getMessage())),
         cause);
     this.leaderPeer = suggestedLeaderPeer;
+    this.internalLeaderPeer = suggestedInternalLeaderPeer;
   }
 
   public String getSuggestedLeaderAddress() {
     return leaderPeer;
   }
+
+  public String getSuggestedInternalLeaderAddress() {
+    return internalLeaderPeer;
+  }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index fdd22443e..8d2669f20 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -19,6 +19,9 @@ package 
org.apache.celeborn.service.deploy.master.clustermeta.ha;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Optional;
+
+import scala.Tuple2;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.protocol.Message;
@@ -50,11 +53,13 @@ public class HAHelper {
       RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
     if (context != null) {
       if (ratisServer != null) {
-        if (ratisServer.getCachedLeaderPeerRpcEndpoint().isPresent()) {
+        Optional<Tuple2<String, String>> leaderPeer = 
ratisServer.getCachedLeaderPeerRpcEndpoint();
+        if (leaderPeer.isPresent()) {
           context.sendFailure(
               new MasterNotLeaderException(
                   ratisServer.getRpcEndpoint(),
-                  ratisServer.getCachedLeaderPeerRpcEndpoint().get(),
+                  leaderPeer.get()._1(),
+                  leaderPeer.get()._2(),
                   cause));
         } else {
           context.sendFailure(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index 9e502058f..b9d690d33 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -74,6 +74,7 @@ public class HARaftServer {
 
   private final InetSocketAddress ratisAddr;
   private final String rpcEndpoint;
+  private final String internalRpcEndpoint;
   private final RaftServer server;
   private final RaftGroup raftGroup;
   private final RaftPeerId raftPeerId;
@@ -89,7 +90,7 @@ public class HARaftServer {
   private long roleCheckIntervalMs;
   private final ReentrantReadWriteLock roleCheckLock = new 
ReentrantReadWriteLock();
   private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
-  private Optional<String> cachedLeaderPeerRpcEndpoint = Optional.empty();
+  private Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoints = 
Optional.empty();
   private final CelebornConf conf;
   private long workerTimeoutDeadline;
   private long appTimeoutDeadline;
@@ -109,11 +110,13 @@ public class HARaftServer {
       RaftPeerId localRaftPeerId,
       InetSocketAddress ratisAddr,
       String rpcEndpoint,
+      String internalRpcEndpoint,
       List<RaftPeer> raftPeers)
       throws IOException {
     this.metaHandler = metaHandler;
     this.ratisAddr = ratisAddr;
     this.rpcEndpoint = rpcEndpoint;
+    this.internalRpcEndpoint = internalRpcEndpoint;
     this.raftPeerId = localRaftPeerId;
     this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
     this.masterStateMachine = getStateMachine();
@@ -163,6 +166,8 @@ public class HARaftServer {
             .setId(localRaftPeerId)
             .setAddress(ratisAddr)
             .setClientAddress(localNode.rpcEndpoint())
+            // We use admin address to host the internal rpc address
+            .setAdminAddress(localNode.internalRpcEndpoint())
             .build();
     List<RaftPeer> raftPeers = new ArrayList<>();
     // Add this Ratis server to the Ratis ring
@@ -178,6 +183,8 @@ public class HARaftServer {
                     .setId(raftPeerId)
                     .setAddress(peer.ratisEndpoint())
                     .setClientAddress(peer.rpcEndpoint())
+                    // We use admin address to host the internal rpc address
+                    .setAdminAddress(peer.internalRpcEndpoint())
                     .build();
           } else {
             InetSocketAddress peerRatisAddr = peer.ratisAddr();
@@ -186,6 +193,8 @@ public class HARaftServer {
                     .setId(raftPeerId)
                     .setAddress(peerRatisAddr)
                     .setClientAddress(peer.rpcEndpoint())
+                    // We use admin address to host the internal rpc address
+                    .setAdminAddress(peer.internalRpcEndpoint())
                     .build();
           }
 
@@ -193,7 +202,13 @@ public class HARaftServer {
           raftPeers.add(raftPeer);
         });
     return new HARaftServer(
-        metaHandler, conf, localRaftPeerId, ratisAddr, 
localNode.rpcEndpoint(), raftPeers);
+        metaHandler,
+        conf,
+        localRaftPeerId,
+        ratisAddr,
+        localNode.rpcEndpoint(),
+        localNode.internalRpcEndpoint(),
+        raftPeers);
   }
 
   public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
@@ -421,12 +436,12 @@ public class HARaftServer {
   /**
    * Get the suggested leader peer id.
    *
-   * @return RaftPeerId of the suggested leader node.
+   * @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint, 
internal rpc endpoint)
    */
-  public Optional<String> getCachedLeaderPeerRpcEndpoint() {
+  public Optional<Tuple2<String, String>> getCachedLeaderPeerRpcEndpoint() {
     this.roleCheckLock.readLock().lock();
     try {
-      return cachedLeaderPeerRpcEndpoint;
+      return cachedLeaderPeerRpcEndpoints;
     } finally {
       this.roleCheckLock.readLock().unlock();
     }
@@ -442,22 +457,30 @@ public class HARaftServer {
       RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
 
       if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
-        setServerRole(thisNodeRole, getRpcEndpoint());
+        setServerRole(thisNodeRole, getRpcEndpoint(), 
getInternalRpcEndpoint());
       } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
         ByteString leaderNodeId = 
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
         // There may be a chance, here we get leaderNodeId as null. For
         // example, in 3 node Ratis, if 2 nodes are down, there will
         // be no leader.
         String leaderPeerRpcEndpoint = null;
+        String leaderPeerInternalRpcEndpoint = null;
         if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
           leaderPeerRpcEndpoint =
               
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
+          // We use admin address to host the internal rpc address
+          if (conf.internalPortEnabled()) {
+            leaderPeerInternalRpcEndpoint =
+                
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress();
+          } else {
+            leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint;
+          }
         }
 
-        setServerRole(thisNodeRole, leaderPeerRpcEndpoint);
+        setServerRole(thisNodeRole, leaderPeerRpcEndpoint, 
leaderPeerInternalRpcEndpoint);
 
       } else {
-        setServerRole(thisNodeRole, null);
+        setServerRole(thisNodeRole, null, null);
       }
     } catch (IOException e) {
       LOG.error(
@@ -465,12 +488,15 @@ public class HARaftServer {
               + "{} and resetting leader info.",
           RaftProtos.RaftPeerRole.UNRECOGNIZED,
           e);
-      setServerRole(null, null);
+      setServerRole(null, null, null);
     }
   }
 
   /** Set the current server role and the leader peer rpc endpoint. */
-  private void setServerRole(RaftProtos.RaftPeerRole currentRole, String 
leaderPeerRpcEndpoint) {
+  private void setServerRole(
+      RaftProtos.RaftPeerRole currentRole,
+      String leaderPeerRpcEndpoint,
+      String leaderPeerInternalRpcEndpoint) {
     this.roleCheckLock.writeLock().lock();
     try {
       boolean leaderChanged = false;
@@ -490,7 +516,12 @@ public class HARaftServer {
       }
 
       this.cachedPeerRole = Optional.ofNullable(currentRole);
-      this.cachedLeaderPeerRpcEndpoint = 
Optional.ofNullable(leaderPeerRpcEndpoint);
+      if (null != leaderPeerRpcEndpoint) {
+        this.cachedLeaderPeerRpcEndpoints =
+            Optional.of(Tuple2.apply(leaderPeerRpcEndpoint, 
leaderPeerInternalRpcEndpoint));
+      } else {
+        this.cachedLeaderPeerRpcEndpoints = Optional.empty();
+      }
     } finally {
       this.roleCheckLock.writeLock().unlock();
     }
@@ -510,6 +541,10 @@ public class HARaftServer {
     return this.rpcEndpoint;
   }
 
+  public String getInternalRpcEndpoint() {
+    return this.internalRpcEndpoint;
+  }
+
   void stepDown() {
     try {
       TransferLeadershipRequest request =
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
index a661c6aca..1042f4627 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
@@ -43,7 +43,9 @@ object MasterClusterInfo extends Logging {
       val ratisPort = conf.haMasterRatisPort(nodeId)
       val rpcHost = conf.haMasterNodeHost(nodeId)
       val rpcPort = conf.haMasterNodePort(nodeId)
-      MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort)
+      val internalPort =
+        if (conf.internalPortEnabled) conf.haMasterNodeInternalPort(nodeId) 
else rpcPort
+      MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, internalPort)
     }
 
     val (localNodes, peerNodes) = localNodeIdOpt match {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index a1ac2b67e..b60b76eda 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -29,7 +29,8 @@ case class MasterNode(
     ratisHost: String,
     ratisPort: Int,
     rpcHost: String,
-    rpcPort: Int) {
+    rpcPort: Int,
+    internalRpcPort: Int) {
 
   def isRatisHostUnresolved: Boolean = ratisAddr.isUnresolved
 
@@ -39,6 +40,8 @@ case class MasterNode(
 
   def rpcEndpoint: String = rpcHost + ":" + rpcPort
 
+  def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort
+
   lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)
 
   lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
@@ -52,6 +55,7 @@ object MasterNode extends Logging {
     private var ratisPort = 0
     private var rpcHost: String = _
     private var rpcPort = 0
+    private var internalRpcPort = 0
 
     def setNodeId(nodeId: String): this.type = {
       this.nodeId = nodeId
@@ -84,7 +88,13 @@ object MasterNode extends Logging {
       this
     }
 
-    def build: MasterNode = MasterNode(nodeId, ratisHost, ratisPort, rpcHost, 
rpcPort)
+    def setInternalRpcPort(internalRpcPort: Int): this.type = {
+      this.internalRpcPort = internalRpcPort
+      this
+    }
+
+    def build: MasterNode =
+      MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, 
internalRpcPort)
   }
 
   private def createSocketAddr(host: String, port: Int): InetSocketAddress = {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index d7cf715a0..9dbcff797 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
+import scala.Tuple2;
+
 import org.junit.*;
 import org.mockito.Mockito;
 
@@ -127,6 +129,7 @@ public class RatisMasterStatusSystemSuiteJ {
                 .setHost(Utils.localHostName(conf1))
                 .setRatisPort(ratisPort1)
                 .setRpcPort(ratisPort1)
+                .setInternalRpcPort(ratisPort1)
                 .setNodeId(id1)
                 .build();
         MasterNode masterNode2 =
@@ -134,6 +137,7 @@ public class RatisMasterStatusSystemSuiteJ {
                 .setHost(Utils.localHostName(conf2))
                 .setRatisPort(ratisPort2)
                 .setRpcPort(ratisPort2)
+                .setInternalRpcPort(ratisPort2)
                 .setNodeId(id2)
                 .build();
         MasterNode masterNode3 =
@@ -141,6 +145,7 @@ public class RatisMasterStatusSystemSuiteJ {
                 .setHost(Utils.localHostName(conf3))
                 .setRatisPort(ratisPort3)
                 .setRpcPort(ratisPort3)
+                .setInternalRpcPort(ratisPort3)
                 .setNodeId(id3)
                 .build();
 
@@ -179,6 +184,26 @@ public class RatisMasterStatusSystemSuiteJ {
     boolean hasLeader =
         RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || 
RATISSERVER3.isLeader();
     Assert.assertTrue(hasLeader);
+
+    // Check if the rpc endpoint and internal rpc endpoint of the leader is as 
expected.
+
+    HARaftServer leader =
+        RATISSERVER1.isLeader()
+            ? RATISSERVER1
+            : (RATISSERVER2.isLeader() ? RATISSERVER2 : RATISSERVER3);
+    // one of them must be the follower given the three servers we have
+    HARaftServer follower = RATISSERVER1.isLeader() ? RATISSERVER2 : 
RATISSERVER1;
+
+    // This is expected to be false, but as a side effect, updates 
getCachedLeaderPeerRpcEndpoint
+    boolean isFollowerCurrentLeader = follower.isLeader();
+    Assert.assertFalse(isFollowerCurrentLeader);
+
+    Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoint =
+        follower.getCachedLeaderPeerRpcEndpoint();
+
+    Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
+    Assert.assertEquals(leader.getRpcEndpoint(), 
cachedLeaderPeerRpcEndpoint.get()._1());
+    Assert.assertEquals(leader.getInternalRpcEndpoint(), 
cachedLeaderPeerRpcEndpoint.get()._2());
   }
 
   private static final String HOSTNAME1 = "host1";

Reply via email to