This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c75e727a1 [#2065] improvement(test): Fix QuorumTest to use random port
(#2066)
c75e727a1 is described below
commit c75e727a1f6bbdc0fa95ba9ad2f931c84c68abeb
Author: maobaolong <[email protected]>
AuthorDate: Wed Sep 11 19:09:46 2024 +0800
[#2065] improvement(test): Fix QuorumTest to use random port (#2066)
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
Fix: #2065
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../uniffle/coordinator/CoordinatorServer.java | 7 +++-
.../java/org/apache/uniffle/test/QuorumTest.java | 37 +++++++++++++---------
2 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 847afda83..74d34bac6 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -72,6 +72,7 @@ public class CoordinatorServer {
private GRPCMetrics grpcMetrics;
private MetricReporter metricReporter;
private String id;
+ private int rpcListenPort;
public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
this.startTimeMs = System.currentTimeMillis();
@@ -106,7 +107,7 @@ public class CoordinatorServer {
LOG.info(
"{} version: {}", this.getClass().getSimpleName(),
Constants.VERSION_AND_REVISION_SHORT);
jettyServer.start();
- server.start();
+ rpcListenPort = server.start();
if (metricReporter != null) {
metricReporter.start();
}
@@ -280,4 +281,8 @@ public class CoordinatorServer {
public long getStartTimeMs() {
return startTimeMs;
}
+
+ public int getRpcListenPort() {
+ return rpcListenPort;
+ }
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 35037470b..ff0fe3477 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -85,32 +85,45 @@ public class QuorumTest extends ShuffleReadWriteBase {
.readBufferSize(1000);
}
- public static MockedShuffleServer createServer(int id, File tmpDir) throws
Exception {
+ public static MockedShuffleServer createServer(int id, File tmpDir, int
coordinatorRpcPort)
+ throws Exception {
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
File dataDir1 = new File(tmpDir, id + "_1");
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE.name());
- shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
+ shuffleServerConf.setInteger("rss.jetty.http.port", 0);
shuffleServerConf.setString("rss.storage.basePath", basePath);
+ shuffleServerConf.setString("rss.coordinator.quorum", LOCALHOST + ":" +
coordinatorRpcPort);
return new MockedShuffleServer(shuffleServerConf);
}
@BeforeEach
public void initCluster(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
+ coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
+ coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0);
createCoordinatorServer(coordinatorConf);
+ for (CoordinatorServer coordinator : coordinators) {
+ coordinator.start();
+ }
+
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
- grpcShuffleServers.add(createServer(0, tmpDir));
- grpcShuffleServers.add(createServer(1, tmpDir));
- grpcShuffleServers.add(createServer(2, tmpDir));
- grpcShuffleServers.add(createServer(3, tmpDir));
- grpcShuffleServers.add(createServer(4, tmpDir));
+ grpcShuffleServers.add(createServer(0, tmpDir,
coordinators.get(0).getRpcListenPort()));
+ grpcShuffleServers.add(createServer(1, tmpDir,
coordinators.get(0).getRpcListenPort()));
+ grpcShuffleServers.add(createServer(2, tmpDir,
coordinators.get(0).getRpcListenPort()));
+ grpcShuffleServers.add(createServer(3, tmpDir,
coordinators.get(0).getRpcListenPort()));
+ grpcShuffleServers.add(createServer(4, tmpDir,
coordinators.get(0).getRpcListenPort()));
+
+ for (ShuffleServer shuffleServer : grpcShuffleServers) {
+ shuffleServer.start();
+ }
shuffleServerInfo0 =
new ShuffleServerInfo(
@@ -137,12 +150,6 @@ public class QuorumTest extends ShuffleReadWriteBase {
String.format("127.0.0.1-%s",
grpcShuffleServers.get(4).getGrpcPort()),
grpcShuffleServers.get(4).getIp(),
grpcShuffleServers.get(4).getGrpcPort());
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.start();
- }
- for (ShuffleServer shuffleServer : grpcShuffleServers) {
- shuffleServer.start();
- }
// simulator of failed servers
fakedShuffleServerInfo0 =
@@ -643,7 +650,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
// when one server is restarted, getShuffleResult should success
grpcShuffleServers.get(1).stopServer();
- grpcShuffleServers.set(1, createServer(1, tmpDir));
+ grpcShuffleServers.set(1, createServer(1, tmpDir,
coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.get(1).start();
report =
shuffleWriteClientImpl.getShuffleResult(
@@ -656,7 +663,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
// when two servers are restarted, getShuffleResult should fail
grpcShuffleServers.get(2).stopServer();
- grpcShuffleServers.set(2, createServer(2, tmpDir));
+ grpcShuffleServers.set(2, createServer(2, tmpDir,
coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.get(2).start();
try {
report =