This is an automated email from the ASF dual-hosted git repository.
csingh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 186899f53 [CELEBORN-1371] Update ratis with internal port endpoint
address as well (#2446)
186899f53 is described below
commit 186899f53f8fa807b429684a7703c9b5ee797ec2
Author: Mridul Muralidharan <[email protected]>
AuthorDate: Fri Apr 5 13:42:03 2024 -0500
[CELEBORN-1371] Update ratis with internal port endpoint address as well
(#2446)
* Update ratis with internal port endpoint address as well, and propagate
it to workers, while keeping existing path for applications the same
---------
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
---
.../celeborn/common/client/MasterClient.java | 5 +-
.../common/client/MasterNotLeaderException.java | 18 ++++++-
.../deploy/master/clustermeta/ha/HAHelper.java | 9 +++-
.../deploy/master/clustermeta/ha/HARaftServer.java | 57 +++++++++++++++++-----
.../master/clustermeta/ha/MasterClusterInfo.scala | 4 +-
.../deploy/master/clustermeta/ha/MasterNode.scala | 14 +++++-
.../ha/RatisMasterStatusSystemSuiteJ.java | 25 ++++++++++
7 files changed, 114 insertions(+), 18 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index c34e1050b..94e4201a9 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -175,7 +175,10 @@ public class MasterClient {
// 'CelebornException: Exception thrown in awaitResult'
if (e.getCause() instanceof MasterNotLeaderException) {
MasterNotLeaderException exception = (MasterNotLeaderException)
e.getCause();
- String leaderAddr = exception.getSuggestedLeaderAddress();
+ String leaderAddr =
+ isWorker
+ ? exception.getSuggestedInternalLeaderAddress()
+ : exception.getSuggestedLeaderAddress();
if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) {
setRpcEndpointRef(leaderAddr);
} else {
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index 38198cf78..b613e1041 100644
---
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
+++
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -28,26 +28,42 @@ public class MasterNotLeaderException extends IOException {
private static final long serialVersionUID = -2552475565785098271L;
private final String leaderPeer;
+ private final String internalLeaderPeer;
public static final String LEADER_NOT_PRESENTED = "leader is not present";
public MasterNotLeaderException(
String currentPeer, String suggestedLeaderPeer, @Nullable Throwable
cause) {
+ this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause);
+ }
+
+ public MasterNotLeaderException(
+ String currentPeer,
+ String suggestedLeaderPeer,
+ String suggestedInternalLeaderPeer,
+ @Nullable Throwable cause) {
super(
String.format(
"Master:%s is not the leader.%s%s",
currentPeer,
currentPeer.equals(suggestedLeaderPeer)
? StringUtils.EMPTY
- : String.format(" Suggested leader is Master:%s.",
suggestedLeaderPeer),
+ : String.format(
+ " Suggested leader is Master:%s (%s).",
+ suggestedLeaderPeer, suggestedInternalLeaderPeer),
cause == null
? StringUtils.EMPTY
: String.format(" Exception:%s.", cause.getMessage())),
cause);
this.leaderPeer = suggestedLeaderPeer;
+ this.internalLeaderPeer = suggestedInternalLeaderPeer;
}
public String getSuggestedLeaderAddress() {
return leaderPeer;
}
+
+ public String getSuggestedInternalLeaderAddress() {
+ return internalLeaderPeer;
+ }
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index fdd22443e..8d2669f20 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -19,6 +19,9 @@ package
org.apache.celeborn.service.deploy.master.clustermeta.ha;
import java.io.File;
import java.io.IOException;
+import java.util.Optional;
+
+import scala.Tuple2;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.protocol.Message;
@@ -50,11 +53,13 @@ public class HAHelper {
RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
if (context != null) {
if (ratisServer != null) {
- if (ratisServer.getCachedLeaderPeerRpcEndpoint().isPresent()) {
+ Optional<Tuple2<String, String>> leaderPeer =
ratisServer.getCachedLeaderPeerRpcEndpoint();
+ if (leaderPeer.isPresent()) {
context.sendFailure(
new MasterNotLeaderException(
ratisServer.getRpcEndpoint(),
- ratisServer.getCachedLeaderPeerRpcEndpoint().get(),
+ leaderPeer.get()._1(),
+ leaderPeer.get()._2(),
cause));
} else {
context.sendFailure(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index 9e502058f..b9d690d33 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -74,6 +74,7 @@ public class HARaftServer {
private final InetSocketAddress ratisAddr;
private final String rpcEndpoint;
+ private final String internalRpcEndpoint;
private final RaftServer server;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
@@ -89,7 +90,7 @@ public class HARaftServer {
private long roleCheckIntervalMs;
private final ReentrantReadWriteLock roleCheckLock = new
ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
- private Optional<String> cachedLeaderPeerRpcEndpoint = Optional.empty();
+ private Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoints =
Optional.empty();
private final CelebornConf conf;
private long workerTimeoutDeadline;
private long appTimeoutDeadline;
@@ -109,11 +110,13 @@ public class HARaftServer {
RaftPeerId localRaftPeerId,
InetSocketAddress ratisAddr,
String rpcEndpoint,
+ String internalRpcEndpoint,
List<RaftPeer> raftPeers)
throws IOException {
this.metaHandler = metaHandler;
this.ratisAddr = ratisAddr;
this.rpcEndpoint = rpcEndpoint;
+ this.internalRpcEndpoint = internalRpcEndpoint;
this.raftPeerId = localRaftPeerId;
this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
this.masterStateMachine = getStateMachine();
@@ -163,6 +166,8 @@ public class HARaftServer {
.setId(localRaftPeerId)
.setAddress(ratisAddr)
.setClientAddress(localNode.rpcEndpoint())
+ // We use admin address to host the internal rpc address
+ .setAdminAddress(localNode.internalRpcEndpoint())
.build();
List<RaftPeer> raftPeers = new ArrayList<>();
// Add this Ratis server to the Ratis ring
@@ -178,6 +183,8 @@ public class HARaftServer {
.setId(raftPeerId)
.setAddress(peer.ratisEndpoint())
.setClientAddress(peer.rpcEndpoint())
+ // We use admin address to host the internal rpc address
+ .setAdminAddress(peer.internalRpcEndpoint())
.build();
} else {
InetSocketAddress peerRatisAddr = peer.ratisAddr();
@@ -186,6 +193,8 @@ public class HARaftServer {
.setId(raftPeerId)
.setAddress(peerRatisAddr)
.setClientAddress(peer.rpcEndpoint())
+ // We use admin address to host the internal rpc address
+ .setAdminAddress(peer.internalRpcEndpoint())
.build();
}
@@ -193,7 +202,13 @@ public class HARaftServer {
raftPeers.add(raftPeer);
});
return new HARaftServer(
- metaHandler, conf, localRaftPeerId, ratisAddr,
localNode.rpcEndpoint(), raftPeers);
+ metaHandler,
+ conf,
+ localRaftPeerId,
+ ratisAddr,
+ localNode.rpcEndpoint(),
+ localNode.internalRpcEndpoint(),
+ raftPeers);
}
public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
@@ -421,12 +436,12 @@ public class HARaftServer {
/**
* Get the suggested leader peer id.
*
- * @return RaftPeerId of the suggested leader node.
+ * @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint,
internal rpc endpoint)
*/
- public Optional<String> getCachedLeaderPeerRpcEndpoint() {
+ public Optional<Tuple2<String, String>> getCachedLeaderPeerRpcEndpoint() {
this.roleCheckLock.readLock().lock();
try {
- return cachedLeaderPeerRpcEndpoint;
+ return cachedLeaderPeerRpcEndpoints;
} finally {
this.roleCheckLock.readLock().unlock();
}
@@ -442,22 +457,30 @@ public class HARaftServer {
RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
- setServerRole(thisNodeRole, getRpcEndpoint());
+ setServerRole(thisNodeRole, getRpcEndpoint(),
getInternalRpcEndpoint());
} else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node Ratis, if 2 nodes are down, there will
// be no leader.
String leaderPeerRpcEndpoint = null;
+ String leaderPeerInternalRpcEndpoint = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
leaderPeerRpcEndpoint =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
+ // We use admin address to host the internal rpc address
+ if (conf.internalPortEnabled()) {
+ leaderPeerInternalRpcEndpoint =
+
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress();
+ } else {
+ leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint;
+ }
}
- setServerRole(thisNodeRole, leaderPeerRpcEndpoint);
+ setServerRole(thisNodeRole, leaderPeerRpcEndpoint,
leaderPeerInternalRpcEndpoint);
} else {
- setServerRole(thisNodeRole, null);
+ setServerRole(thisNodeRole, null, null);
}
} catch (IOException e) {
LOG.error(
@@ -465,12 +488,15 @@ public class HARaftServer {
+ "{} and resetting leader info.",
RaftProtos.RaftPeerRole.UNRECOGNIZED,
e);
- setServerRole(null, null);
+ setServerRole(null, null, null);
}
}
/** Set the current server role and the leader peer rpc endpoint. */
- private void setServerRole(RaftProtos.RaftPeerRole currentRole, String
leaderPeerRpcEndpoint) {
+ private void setServerRole(
+ RaftProtos.RaftPeerRole currentRole,
+ String leaderPeerRpcEndpoint,
+ String leaderPeerInternalRpcEndpoint) {
this.roleCheckLock.writeLock().lock();
try {
boolean leaderChanged = false;
@@ -490,7 +516,12 @@ public class HARaftServer {
}
this.cachedPeerRole = Optional.ofNullable(currentRole);
- this.cachedLeaderPeerRpcEndpoint =
Optional.ofNullable(leaderPeerRpcEndpoint);
+ if (null != leaderPeerRpcEndpoint) {
+ this.cachedLeaderPeerRpcEndpoints =
+ Optional.of(Tuple2.apply(leaderPeerRpcEndpoint,
leaderPeerInternalRpcEndpoint));
+ } else {
+ this.cachedLeaderPeerRpcEndpoints = Optional.empty();
+ }
} finally {
this.roleCheckLock.writeLock().unlock();
}
@@ -510,6 +541,10 @@ public class HARaftServer {
return this.rpcEndpoint;
}
+ public String getInternalRpcEndpoint() {
+ return this.internalRpcEndpoint;
+ }
+
void stepDown() {
try {
TransferLeadershipRequest request =
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
index a661c6aca..1042f4627 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala
@@ -43,7 +43,9 @@ object MasterClusterInfo extends Logging {
val ratisPort = conf.haMasterRatisPort(nodeId)
val rpcHost = conf.haMasterNodeHost(nodeId)
val rpcPort = conf.haMasterNodePort(nodeId)
- MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort)
+ val internalPort =
+ if (conf.internalPortEnabled) conf.haMasterNodeInternalPort(nodeId)
else rpcPort
+ MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, internalPort)
}
val (localNodes, peerNodes) = localNodeIdOpt match {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index a1ac2b67e..b60b76eda 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -29,7 +29,8 @@ case class MasterNode(
ratisHost: String,
ratisPort: Int,
rpcHost: String,
- rpcPort: Int) {
+ rpcPort: Int,
+ internalRpcPort: Int) {
def isRatisHostUnresolved: Boolean = ratisAddr.isUnresolved
@@ -39,6 +40,8 @@ case class MasterNode(
def rpcEndpoint: String = rpcHost + ":" + rpcPort
+ def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort
+
lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)
lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
@@ -52,6 +55,7 @@ object MasterNode extends Logging {
private var ratisPort = 0
private var rpcHost: String = _
private var rpcPort = 0
+ private var internalRpcPort = 0
def setNodeId(nodeId: String): this.type = {
this.nodeId = nodeId
@@ -84,7 +88,13 @@ object MasterNode extends Logging {
this
}
- def build: MasterNode = MasterNode(nodeId, ratisHost, ratisPort, rpcHost,
rpcPort)
+ def setInternalRpcPort(internalRpcPort: Int): this.type = {
+ this.internalRpcPort = internalRpcPort
+ this
+ }
+
+ def build: MasterNode =
+ MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort,
internalRpcPort)
}
private def createSocketAddr(host: String, port: Int): InetSocketAddress = {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index d7cf715a0..9dbcff797 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
+import scala.Tuple2;
+
import org.junit.*;
import org.mockito.Mockito;
@@ -127,6 +129,7 @@ public class RatisMasterStatusSystemSuiteJ {
.setHost(Utils.localHostName(conf1))
.setRatisPort(ratisPort1)
.setRpcPort(ratisPort1)
+ .setInternalRpcPort(ratisPort1)
.setNodeId(id1)
.build();
MasterNode masterNode2 =
@@ -134,6 +137,7 @@ public class RatisMasterStatusSystemSuiteJ {
.setHost(Utils.localHostName(conf2))
.setRatisPort(ratisPort2)
.setRpcPort(ratisPort2)
+ .setInternalRpcPort(ratisPort2)
.setNodeId(id2)
.build();
MasterNode masterNode3 =
@@ -141,6 +145,7 @@ public class RatisMasterStatusSystemSuiteJ {
.setHost(Utils.localHostName(conf3))
.setRatisPort(ratisPort3)
.setRpcPort(ratisPort3)
+ .setInternalRpcPort(ratisPort3)
.setNodeId(id3)
.build();
@@ -179,6 +184,26 @@ public class RatisMasterStatusSystemSuiteJ {
boolean hasLeader =
RATISSERVER1.isLeader() || RATISSERVER2.isLeader() ||
RATISSERVER3.isLeader();
Assert.assertTrue(hasLeader);
+
+ // Check if the rpc endpoint and internal rpc endpoint of the leader is as
expected.
+
+ HARaftServer leader =
+ RATISSERVER1.isLeader()
+ ? RATISSERVER1
+ : (RATISSERVER2.isLeader() ? RATISSERVER2 : RATISSERVER3);
+ // one of them must be the follower given the three servers we have
+ HARaftServer follower = RATISSERVER1.isLeader() ? RATISSERVER2 :
RATISSERVER1;
+
+ // This is expected to be false, but as a side effect, updates
getCachedLeaderPeerRpcEndpoint
+ boolean isFollowerCurrentLeader = follower.isLeader();
+ Assert.assertFalse(isFollowerCurrentLeader);
+
+ Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoint =
+ follower.getCachedLeaderPeerRpcEndpoint();
+
+ Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
+ Assert.assertEquals(leader.getRpcEndpoint(),
cachedLeaderPeerRpcEndpoint.get()._1());
+ Assert.assertEquals(leader.getInternalRpcEndpoint(),
cachedLeaderPeerRpcEndpoint.get()._2());
}
private static final String HOSTNAME1 = "host1";