This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 993d3f258 [CELEBORN-1398] Support return leader ip to client
993d3f258 is described below
commit 993d3f2587b1c9cd8acd1a593a186d8044487c14
Author: Shuang <[email protected]>
AuthorDate: Wed May 8 15:01:55 2024 +0800
[CELEBORN-1398] Support return leader ip to client
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Currently, if accessing services of a Celeborn cluster across Kubernetes
clusters, one may encounter DNS resolution issues. However, connectivity may be
achieved through IP addresses when combined with the Kubernetes setting
hostNetwork=true for clients from different clusters. At present, the
`celeborn.network.bind.preferIpAddress` configuration is only effective on
worker nodes. This PR will enable the feature of returning the leader's IP when
accessing the master node.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #2489 from RexXiong/CELEBORN-1398.
Authored-by: Shuang <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../common/client/MasterNotLeaderException.java | 21 +++++--
.../org/apache/celeborn/common/util/Utils.scala | 21 +++++++
.../deploy/master/clustermeta/ha/HAHelper.java | 16 ++---
.../deploy/master/clustermeta/ha/HARaftServer.java | 71 ++++++++++++++--------
.../celeborn/service/deploy/master/Master.scala | 5 +-
.../deploy/master/clustermeta/ha/MasterNode.scala | 4 ++
.../ha/RatisMasterStatusSystemSuiteJ.java | 18 ++++--
7 files changed, 109 insertions(+), 47 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index b613e1041..e1f931418 100644
---
a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
+++
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import javax.annotation.Nullable;
+import scala.Tuple2;
+
import org.apache.commons.lang3.StringUtils;
public class MasterNotLeaderException extends IOException {
@@ -34,19 +36,25 @@ public class MasterNotLeaderException extends IOException {
public MasterNotLeaderException(
String currentPeer, String suggestedLeaderPeer, @Nullable Throwable
cause) {
- this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause);
+ this(
+ currentPeer,
+ Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
+ Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
+ false,
+ cause);
}
public MasterNotLeaderException(
String currentPeer,
- String suggestedLeaderPeer,
- String suggestedInternalLeaderPeer,
+ Tuple2<String, String> suggestedLeaderPeer,
+ Tuple2<String, String> suggestedInternalLeaderPeer,
+ boolean bindPreferIp,
@Nullable Throwable cause) {
super(
String.format(
"Master:%s is not the leader.%s%s",
currentPeer,
- currentPeer.equals(suggestedLeaderPeer)
+ currentPeer.equals(suggestedLeaderPeer._1)
? StringUtils.EMPTY
: String.format(
" Suggested leader is Master:%s (%s).",
@@ -55,8 +63,9 @@ public class MasterNotLeaderException extends IOException {
? StringUtils.EMPTY
: String.format(" Exception:%s.", cause.getMessage())),
cause);
- this.leaderPeer = suggestedLeaderPeer;
- this.internalLeaderPeer = suggestedInternalLeaderPeer;
+ this.leaderPeer = bindPreferIp ? suggestedLeaderPeer._1 :
suggestedLeaderPeer._2;
+ this.internalLeaderPeer =
+ bindPreferIp ? suggestedInternalLeaderPeer._1 :
suggestedInternalLeaderPeer._2;
}
public String getSuggestedLeaderAddress() {
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 7eef1f4f0..c5af4abf8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -432,6 +432,27 @@ object Utils extends Logging {
}
}
+ private def getIpHostAddressPair(host: String): (String, String) = {
+ try {
+ val inetAddress = InetAddress.getByName(host)
+ val hostAddress = inetAddress.getHostAddress
+ if (host.equals(hostAddress)) {
+ (hostAddress, inetAddress.getCanonicalHostName)
+ } else {
+ (hostAddress, host)
+ }
+ } catch {
+ case _: Throwable => (host, host) // return original input
+ }
+ }
+
+ // Convert address (ip:port or host:port) to (ip:port, host:port) pair
+ def addressToIpHostAddressPair(address: String): (String, String) = {
+ val (host, port) = Utils.parseHostPort(address)
+ val (_ip, _host) = Utils.getIpHostAddressPair(host)
+ (_ip + ":" + port, _host + ":" + port)
+ }
+
def checkHostPort(hostPort: String): Unit = {
if (hostPort != null && hostPort.split(":").length > 2) {
assert(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 8d2669f20..71bcaf064 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -21,8 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Optional;
-import scala.Tuple2;
-
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -37,29 +35,31 @@ import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
public class HAHelper {
public static boolean checkShouldProcess(
- RpcCallContext context, AbstractMetaManager masterStatusSystem) {
+ RpcCallContext context, AbstractMetaManager masterStatusSystem, boolean
bindPreferIp) {
HARaftServer ratisServer = getRatisServer(masterStatusSystem);
if (ratisServer != null) {
if (ratisServer.isLeader()) {
return true;
}
- sendFailure(context, ratisServer, null);
+ sendFailure(context, ratisServer, null, bindPreferIp);
return false;
}
return true;
}
public static void sendFailure(
- RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
+ RpcCallContext context, HARaftServer ratisServer, Throwable cause,
boolean bindPreferIp) {
if (context != null) {
if (ratisServer != null) {
- Optional<Tuple2<String, String>> leaderPeer =
ratisServer.getCachedLeaderPeerRpcEndpoint();
+ Optional<HARaftServer.LeaderPeerEndpoints> leaderPeer =
+ ratisServer.getCachedLeaderPeerRpcEndpoint();
if (leaderPeer.isPresent()) {
context.sendFailure(
new MasterNotLeaderException(
ratisServer.getRpcEndpoint(),
- leaderPeer.get()._1(),
- leaderPeer.get()._2(),
+ leaderPeer.get().rpcEndpoints,
+ leaderPeer.get().rpcInternalEndpoints,
+ bindPreferIp,
cause));
} else {
context.sendFailure(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index b9d690d33..5a5694d86 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -55,6 +55,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -72,6 +73,7 @@ public class HARaftServer {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
+ private final MasterNode localNode;
private final InetSocketAddress ratisAddr;
private final String rpcEndpoint;
private final String internalRpcEndpoint;
@@ -90,7 +92,8 @@ public class HARaftServer {
private long roleCheckIntervalMs;
private final ReentrantReadWriteLock roleCheckLock = new
ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
- private Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoints =
Optional.empty();
+ private Optional<LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoints =
Optional.empty();
+
private final CelebornConf conf;
private long workerTimeoutDeadline;
private long appTimeoutDeadline;
@@ -100,7 +103,7 @@ public class HARaftServer {
*
* @param conf configuration
* @param localRaftPeerId raft peer id of this Ratis server
- * @param ratisAddr address of the ratis server
+ * @param localNode local node of this Ratis server
* @param raftPeers peer nodes in the raft ring
* @throws IOException
*/
@@ -108,15 +111,14 @@ public class HARaftServer {
MetaHandler metaHandler,
CelebornConf conf,
RaftPeerId localRaftPeerId,
- InetSocketAddress ratisAddr,
- String rpcEndpoint,
- String internalRpcEndpoint,
+ MasterNode localNode,
List<RaftPeer> raftPeers)
throws IOException {
this.metaHandler = metaHandler;
- this.ratisAddr = ratisAddr;
- this.rpcEndpoint = rpcEndpoint;
- this.internalRpcEndpoint = internalRpcEndpoint;
+ this.localNode = localNode;
+ this.ratisAddr = localNode.ratisAddr();
+ this.rpcEndpoint = localNode.rpcEndpoint();
+ this.internalRpcEndpoint = localNode.internalRpcEndpoint();
this.raftPeerId = localRaftPeerId;
this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
this.masterStateMachine = getStateMachine();
@@ -201,14 +203,8 @@ public class HARaftServer {
// Add other nodes belonging to the same service to the Ratis ring
raftPeers.add(raftPeer);
});
- return new HARaftServer(
- metaHandler,
- conf,
- localRaftPeerId,
- ratisAddr,
- localNode.rpcEndpoint(),
- localNode.internalRpcEndpoint(),
- raftPeers);
+
+ return new HARaftServer(metaHandler, conf, localRaftPeerId, localNode,
raftPeers);
}
public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
@@ -436,9 +432,9 @@ public class HARaftServer {
/**
* Get the suggested leader peer id.
*
- * @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint,
internal rpc endpoint)
+ * @return RaftPeerId of the suggested leader node -
Optional<LeaderPeerEndpoints>
*/
- public Optional<Tuple2<String, String>> getCachedLeaderPeerRpcEndpoint() {
+ public Optional<LeaderPeerEndpoints> getCachedLeaderPeerRpcEndpoint() {
this.roleCheckLock.readLock().lock();
try {
return cachedLeaderPeerRpcEndpoints;
@@ -455,23 +451,29 @@ public class HARaftServer {
GroupInfoReply groupInfo = getGroupInfo();
RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
-
+ Tuple2<String, String> leaderPeerRpcEndpoint = null;
+ Tuple2<String, String> leaderPeerInternalRpcEndpoint = null;
if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
- setServerRole(thisNodeRole, getRpcEndpoint(),
getInternalRpcEndpoint());
+ // Current Node always uses original rpcEndpoint/internalRpcEndpoint,
as if something wrong
+ // they would never return to client.
+ setServerRole(
+ thisNodeRole,
+ Tuple2.apply(this.rpcEndpoint, this.rpcEndpoint),
+ Tuple2.apply(this.internalRpcEndpoint, this.internalRpcEndpoint));
} else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node Ratis, if 2 nodes are down, there will
// be no leader.
- String leaderPeerRpcEndpoint = null;
- String leaderPeerInternalRpcEndpoint = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
- leaderPeerRpcEndpoint =
+ String clientAddress =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
+ leaderPeerRpcEndpoint =
Utils.addressToIpHostAddressPair(clientAddress);
// We use admin address to host the internal rpc address
if (conf.internalPortEnabled()) {
- leaderPeerInternalRpcEndpoint =
+ String adminAddress =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress();
+ leaderPeerInternalRpcEndpoint =
Utils.addressToIpHostAddressPair(adminAddress);
} else {
leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint;
}
@@ -495,8 +497,8 @@ public class HARaftServer {
/** Set the current server role and the leader peer rpc endpoint. */
private void setServerRole(
RaftProtos.RaftPeerRole currentRole,
- String leaderPeerRpcEndpoint,
- String leaderPeerInternalRpcEndpoint) {
+ Tuple2<String, String> leaderPeerRpcEndpoint,
+ Tuple2<String, String> leaderPeerInternalRpcEndpoint) {
this.roleCheckLock.writeLock().lock();
try {
boolean leaderChanged = false;
@@ -518,7 +520,8 @@ public class HARaftServer {
this.cachedPeerRole = Optional.ofNullable(currentRole);
if (null != leaderPeerRpcEndpoint) {
this.cachedLeaderPeerRpcEndpoints =
- Optional.of(Tuple2.apply(leaderPeerRpcEndpoint,
leaderPeerInternalRpcEndpoint));
+ Optional.of(
+ new LeaderPeerEndpoints(leaderPeerRpcEndpoint,
leaderPeerInternalRpcEndpoint));
} else {
this.cachedLeaderPeerRpcEndpoints = Optional.empty();
}
@@ -578,4 +581,18 @@ public class HARaftServer {
public long getAppTimeoutDeadline() {
return appTimeoutDeadline;
}
+
+ public static class LeaderPeerEndpoints {
+ // the rpcEndpoints Tuple2 (ip:port, host:port)
+ public final Tuple2<String, String> rpcEndpoints;
+
+ // the rpcInternalEndpoints Tuple2 (ip:port, host:port)
+ public final Tuple2<String, String> rpcInternalEndpoints;
+
+ public LeaderPeerEndpoints(
+ Tuple2<String, String> rpcEndpoints, Tuple2<String, String>
rpcInternalEndpoints) {
+ this.rpcEndpoints = rpcEndpoints;
+ this.rpcInternalEndpoints = rpcInternalEndpoints;
+ }
+ }
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 0d8bd3bae..3cdc8c512 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -76,6 +76,7 @@ private[celeborn] class Master(
metricsSystem.registerSource(new JVMCPUSource(conf,
MetricsSystem.ROLE_MASTER))
metricsSystem.registerSource(new SystemMiscSource(conf,
MetricsSystem.ROLE_MASTER))
+ private val bindPreferIP: Boolean = conf.bindPreferIP
private val authEnabled = conf.authEnabled
private val secretRegistry = new MasterSecretRegistryImpl()
private val sendApplicationMetaThreads =
conf.masterSendApplicationMetaThreads
@@ -358,12 +359,12 @@ private[celeborn] class Master(
}
def executeWithLeaderChecker[T](context: RpcCallContext, f: => T): Unit =
- if (HAHelper.checkShouldProcess(context, statusSystem)) {
+ if (HAHelper.checkShouldProcess(context, statusSystem, bindPreferIP)) {
try {
f
} catch {
case e: Exception =>
- HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem),
e)
+ HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem),
e, bindPreferIP)
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index b60b76eda..0f2b09ca9 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -40,8 +40,12 @@ case class MasterNode(
def rpcEndpoint: String = rpcHost + ":" + rpcPort
+ def rpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":" + rpcPort
+
def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort
+ def internalRpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":"
+ rpcPort
+
lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)
lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 75ba1a298..8f7307115 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -183,7 +183,7 @@ public class RatisMasterStatusSystemSuiteJ {
}
@Test
- public void testLeaderAvaiable() {
+ public void testLeaderAvailable() {
boolean hasLeader =
RATISSERVER1.isLeader() || RATISSERVER2.isLeader() ||
RATISSERVER3.isLeader();
Assert.assertTrue(hasLeader);
@@ -201,12 +201,22 @@ public class RatisMasterStatusSystemSuiteJ {
boolean isFollowerCurrentLeader = follower.isLeader();
Assert.assertFalse(isFollowerCurrentLeader);
- Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoint =
+ Optional<HARaftServer.LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoint =
follower.getCachedLeaderPeerRpcEndpoint();
Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
- Assert.assertEquals(leader.getRpcEndpoint(),
cachedLeaderPeerRpcEndpoint.get()._1());
- Assert.assertEquals(leader.getInternalRpcEndpoint(),
cachedLeaderPeerRpcEndpoint.get()._2());
+
+ Tuple2<String, String> rpcEndpointsPair =
cachedLeaderPeerRpcEndpoint.get().rpcEndpoints;
+ Tuple2<String, String> rpcInternalEndpointsPair =
+ cachedLeaderPeerRpcEndpoint.get().rpcInternalEndpoints;
+
+ // rpc endpoint may use custom host name then this ut need check ever
ip/host
+ Assert.assertTrue(
+ leader.getRpcEndpoint().equals(rpcEndpointsPair._1)
+ || leader.getRpcEndpoint().equals(rpcEndpointsPair._2));
+ Assert.assertTrue(
+ leader.getInternalRpcEndpoint().equals(rpcInternalEndpointsPair._1)
+ || leader.getRpcEndpoint().equals(rpcInternalEndpointsPair._2));
}
private static final String HOSTNAME1 = "host1";