This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new c47fb26fe [CELEBORN-1398] Support return leader ip to client
c47fb26fe is described below
commit c47fb26fe6ac07c5fc7991f8c2a99677bb6bb952
Author: Shuang <[email protected]>
AuthorDate: Wed May 8 15:01:55 2024 +0800
[CELEBORN-1398] Support return leader ip to client
As title
Currently, if accessing services of a Celeborn cluster across Kubernetes
clusters, one may encounter DNS resolution issues. However, connectivity may be
achieved through IP addresses when combined with the Kubernetes setting
hostNetwork=true for clients from different clusters. At present, the
`celeborn.network.bind.preferIpAddress` configuration is only effective on
worker nodes. This PR will enable the feature of returning the leader's IP when
accessing the master node.
No
Pass GA
Closes #2489 from RexXiong/CELEBORN-1398.
Authored-by: Shuang <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 993d3f2587b1c9cd8acd1a593a186d8044487c14)
Signed-off-by: Shuang <[email protected]>
---
.../common/client/MasterNotLeaderException.java | 14 +++++-
.../org/apache/celeborn/common/util/Utils.scala | 21 +++++++++
.../deploy/master/clustermeta/ha/HAHelper.java | 14 ++++--
.../deploy/master/clustermeta/ha/HARaftServer.java | 54 +++++++++++++++-------
.../celeborn/service/deploy/master/Master.scala | 6 ++-
.../deploy/master/clustermeta/ha/MasterNode.scala | 2 +
.../ha/RatisMasterStatusSystemSuiteJ.java | 29 +++++++++++-
7 files changed, 113 insertions(+), 27 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index 38198cf78..4f9057a19 100644
---
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
+++
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import javax.annotation.Nullable;
+import scala.Tuple2;
+
import org.apache.commons.lang3.StringUtils;
public class MasterNotLeaderException extends IOException {
@@ -33,18 +35,26 @@ public class MasterNotLeaderException extends IOException {
public MasterNotLeaderException(
String currentPeer, String suggestedLeaderPeer, @Nullable Throwable
cause) {
+ this(currentPeer, Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
false, cause);
+ }
+
+ public MasterNotLeaderException(
+ String currentPeer,
+ Tuple2<String, String> suggestedLeaderPeer,
+ boolean bindPreferIp,
+ @Nullable Throwable cause) {
super(
String.format(
"Master:%s is not the leader.%s%s",
currentPeer,
- currentPeer.equals(suggestedLeaderPeer)
+ currentPeer.equals(suggestedLeaderPeer._1)
? StringUtils.EMPTY
: String.format(" Suggested leader is Master:%s.",
suggestedLeaderPeer),
cause == null
? StringUtils.EMPTY
: String.format(" Exception:%s.", cause.getMessage())),
cause);
- this.leaderPeer = suggestedLeaderPeer;
+ this.leaderPeer = bindPreferIp ? suggestedLeaderPeer._1 :
suggestedLeaderPeer._2;
}
public String getSuggestedLeaderAddress() {
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index d4531adbd..96e57f8cd 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -427,6 +427,27 @@ object Utils extends Logging {
}
}
+ private def getIpHostAddressPair(host: String): (String, String) = {
+ try {
+ val inetAddress = InetAddress.getByName(host)
+ val hostAddress = inetAddress.getHostAddress
+ if (host.equals(hostAddress)) {
+ (hostAddress, inetAddress.getCanonicalHostName)
+ } else {
+ (hostAddress, host)
+ }
+ } catch {
+ case _: Throwable => (host, host) // return original input
+ }
+ }
+
+ // Convert address (ip:port or host:port) to (ip:port, host:port) pair
+ def addressToIpHostAddressPair(address: String): (String, String) = {
+ val (host, port) = Utils.parseHostPort(address)
+ val (_ip, _host) = Utils.getIpHostAddressPair(host)
+ (_ip + ":" + port, _host + ":" + port)
+ }
+
def checkHostPort(hostPort: String): Unit = {
if (hostPort != null && hostPort.split(":").length > 2) {
assert(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index fdd22443e..b0afab4b1 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -19,6 +19,7 @@ package
org.apache.celeborn.service.deploy.master.clustermeta.ha;
import java.io.File;
import java.io.IOException;
+import java.util.Optional;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.protocol.Message;
@@ -34,27 +35,30 @@ import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
public class HAHelper {
public static boolean checkShouldProcess(
- RpcCallContext context, AbstractMetaManager masterStatusSystem) {
+ RpcCallContext context, AbstractMetaManager masterStatusSystem, boolean
bindPreferIp) {
HARaftServer ratisServer = getRatisServer(masterStatusSystem);
if (ratisServer != null) {
if (ratisServer.isLeader()) {
return true;
}
- sendFailure(context, ratisServer, null);
+ sendFailure(context, ratisServer, null, bindPreferIp);
return false;
}
return true;
}
public static void sendFailure(
- RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
+ RpcCallContext context, HARaftServer ratisServer, Throwable cause,
boolean bindPreferIp) {
if (context != null) {
if (ratisServer != null) {
- if (ratisServer.getCachedLeaderPeerRpcEndpoint().isPresent()) {
+ Optional<HARaftServer.LeaderPeerEndpoints> leaderPeer =
+ ratisServer.getCachedLeaderPeerRpcEndpoint();
+ if (leaderPeer.isPresent()) {
context.sendFailure(
new MasterNotLeaderException(
ratisServer.getRpcEndpoint(),
- ratisServer.getCachedLeaderPeerRpcEndpoint().get(),
+ leaderPeer.get().rpcEndpoints,
+ bindPreferIp,
cause));
} else {
context.sendFailure(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index 9e502058f..61222549c 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -55,6 +55,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -72,6 +73,7 @@ public class HARaftServer {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
+ private final MasterNode localNode;
private final InetSocketAddress ratisAddr;
private final String rpcEndpoint;
private final RaftServer server;
@@ -89,7 +91,8 @@ public class HARaftServer {
private long roleCheckIntervalMs;
private final ReentrantReadWriteLock roleCheckLock = new
ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
- private Optional<String> cachedLeaderPeerRpcEndpoint = Optional.empty();
+ private Optional<LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoints =
Optional.empty();
+
private final CelebornConf conf;
private long workerTimeoutDeadline;
private long appTimeoutDeadline;
@@ -99,7 +102,7 @@ public class HARaftServer {
*
* @param conf configuration
* @param localRaftPeerId raft peer id of this Ratis server
- * @param ratisAddr address of the ratis server
+ * @param localNode local node of this Ratis server
* @param raftPeers peer nodes in the raft ring
* @throws IOException
*/
@@ -107,13 +110,13 @@ public class HARaftServer {
MetaHandler metaHandler,
CelebornConf conf,
RaftPeerId localRaftPeerId,
- InetSocketAddress ratisAddr,
- String rpcEndpoint,
+ MasterNode localNode,
List<RaftPeer> raftPeers)
throws IOException {
this.metaHandler = metaHandler;
- this.ratisAddr = ratisAddr;
- this.rpcEndpoint = rpcEndpoint;
+ this.localNode = localNode;
+ this.ratisAddr = localNode.ratisAddr();
+ this.rpcEndpoint = localNode.rpcEndpoint();
this.raftPeerId = localRaftPeerId;
this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
this.masterStateMachine = getStateMachine();
@@ -192,8 +195,8 @@ public class HARaftServer {
// Add other nodes belonging to the same service to the Ratis ring
raftPeers.add(raftPeer);
});
- return new HARaftServer(
- metaHandler, conf, localRaftPeerId, ratisAddr,
localNode.rpcEndpoint(), raftPeers);
+
+ return new HARaftServer(metaHandler, conf, localRaftPeerId, localNode,
raftPeers);
}
public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
@@ -421,12 +424,12 @@ public class HARaftServer {
/**
* Get the suggested leader peer id.
*
- * @return RaftPeerId of the suggested leader node.
+ * @return RaftPeerId of the suggested leader node -
Optional<LeaderPeerEndpoints>
*/
- public Optional<String> getCachedLeaderPeerRpcEndpoint() {
+ public Optional<LeaderPeerEndpoints> getCachedLeaderPeerRpcEndpoint() {
this.roleCheckLock.readLock().lock();
try {
- return cachedLeaderPeerRpcEndpoint;
+ return cachedLeaderPeerRpcEndpoints;
} finally {
this.roleCheckLock.readLock().unlock();
}
@@ -440,18 +443,20 @@ public class HARaftServer {
GroupInfoReply groupInfo = getGroupInfo();
RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
-
+ Tuple2<String, String> leaderPeerRpcEndpoint = null;
if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
- setServerRole(thisNodeRole, getRpcEndpoint());
+ // Current Node always uses original rpcEndpoint/internalRpcEndpoint,
as if something wrong
+ // they would never return to client.
+ setServerRole(thisNodeRole, Tuple2.apply(this.rpcEndpoint,
this.rpcEndpoint));
} else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node Ratis, if 2 nodes are down, there will
// be no leader.
- String leaderPeerRpcEndpoint = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
- leaderPeerRpcEndpoint =
+ String clientAddress =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
+ leaderPeerRpcEndpoint =
Utils.addressToIpHostAddressPair(clientAddress);
}
setServerRole(thisNodeRole, leaderPeerRpcEndpoint);
@@ -470,7 +475,8 @@ public class HARaftServer {
}
/** Set the current server role and the leader peer rpc endpoint. */
- private void setServerRole(RaftProtos.RaftPeerRole currentRole, String
leaderPeerRpcEndpoint) {
+ private void setServerRole(
+ RaftProtos.RaftPeerRole currentRole, Tuple2<String, String>
leaderPeerRpcEndpoint) {
this.roleCheckLock.writeLock().lock();
try {
boolean leaderChanged = false;
@@ -490,7 +496,12 @@ public class HARaftServer {
}
this.cachedPeerRole = Optional.ofNullable(currentRole);
- this.cachedLeaderPeerRpcEndpoint =
Optional.ofNullable(leaderPeerRpcEndpoint);
+ if (null != leaderPeerRpcEndpoint) {
+ this.cachedLeaderPeerRpcEndpoints =
+ Optional.of(new LeaderPeerEndpoints(leaderPeerRpcEndpoint));
+ } else {
+ this.cachedLeaderPeerRpcEndpoints = Optional.empty();
+ }
} finally {
this.roleCheckLock.writeLock().unlock();
}
@@ -543,4 +554,13 @@ public class HARaftServer {
public long getAppTimeoutDeadline() {
return appTimeoutDeadline;
}
+
+ public static class LeaderPeerEndpoints {
+ // the rpcEndpoints Tuple2 (ip:port, host:port)
+ public final Tuple2<String, String> rpcEndpoints;
+
+ public LeaderPeerEndpoints(Tuple2<String, String> rpcEndpoints) {
+ this.rpcEndpoints = rpcEndpoints;
+ }
+ }
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index d126edb4a..e899990d9 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -70,6 +70,8 @@ private[celeborn] class Master(
metricsSystem.registerSource(new JVMCPUSource(conf,
MetricsSystem.ROLE_MASTER))
metricsSystem.registerSource(new SystemMiscSource(conf,
MetricsSystem.ROLE_MASTER))
+ private val bindPreferIP: Boolean = conf.bindPreferIP
+
override val rpcEnv: RpcEnv = RpcEnv.create(
RpcNameConstants.MASTER_SYS,
masterArgs.host,
@@ -269,12 +271,12 @@ private[celeborn] class Master(
}
def executeWithLeaderChecker[T](context: RpcCallContext, f: => T): Unit =
- if (HAHelper.checkShouldProcess(context, statusSystem)) {
+ if (HAHelper.checkShouldProcess(context, statusSystem, bindPreferIP)) {
try {
f
} catch {
case e: Exception =>
- HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem),
e)
+ HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem),
e, bindPreferIP)
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index a1ac2b67e..19a27ad13 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -39,6 +39,8 @@ case class MasterNode(
def rpcEndpoint: String = rpcHost + ":" + rpcPort
+ def rpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":" + rpcPort
+
lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)
lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index f16d27bdf..32ec17780 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
+import scala.Tuple2;
+
import org.junit.*;
import org.mockito.Mockito;
@@ -154,10 +156,35 @@ public class RatisMasterStatusSystemSuiteJ {
}
@Test
- public void testLeaderAvaiable() {
+ public void testLeaderAvailable() {
boolean hasLeader =
RATISSERVER1.isLeader() || RATISSERVER2.isLeader() ||
RATISSERVER3.isLeader();
Assert.assertTrue(hasLeader);
+
+ // Check if the rpc endpoint of the leader is as expected.
+
+ HARaftServer leader =
+ RATISSERVER1.isLeader()
+ ? RATISSERVER1
+ : (RATISSERVER2.isLeader() ? RATISSERVER2 : RATISSERVER3);
+ // one of them must be the follower given the three servers we have
+ HARaftServer follower = RATISSERVER1.isLeader() ? RATISSERVER2 :
RATISSERVER1;
+
+ // This is expected to be false, but as a side effect, updates
getCachedLeaderPeerRpcEndpoint
+ boolean isFollowerCurrentLeader = follower.isLeader();
+ Assert.assertFalse(isFollowerCurrentLeader);
+
+ Optional<HARaftServer.LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoint =
+ follower.getCachedLeaderPeerRpcEndpoint();
+
+ Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
+
+ Tuple2<String, String> rpcEndpointsPair =
cachedLeaderPeerRpcEndpoint.get().rpcEndpoints;
+
+ // rpc endpoint may use custom host name then this ut need check ever
ip/host
+ Assert.assertTrue(
+ leader.getRpcEndpoint().equals(rpcEndpointsPair._1)
+ || leader.getRpcEndpoint().equals(rpcEndpointsPair._2));
}
private static final String HOSTNAME1 = "host1";