This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c3616c2 Send commit concurrently in client side (#59)
c3616c2 is described below
commit c3616c27a545ffccf86c81d700346557bcaa9058
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Jul 17 10:53:21 2022 +0800
Send commit concurrently in client side (#59)
### What changes were proposed in this pull request?
Send commit concurrently in client side
### Why are the changes needed?
I found when using the `LOCALFILE` storageType, waiting the commit will
cost too much time. To speed up, it can be sent commit concurrently by using
thread pool.
**Performance Test Case**
Using 1000 executors of Spark, single executor 1g/1core to run TeraSort 1TB.
When using `LOCALFILE` storageType mode, it cost 7.3 min.
And then after applying this PR, it cost 6.1 min
### Does this PR introduce _any_ user-facing change?
1. Introducing the conf of `rss.client.data.commit.pool.size`, the default
value is assigned shuffle server size.
### How was this patch tested?
No need
---
.../org/apache/hadoop/mapreduce/RssMRConfig.java | 5 ++
.../org/apache/hadoop/mapreduce/RssMRUtils.java | 4 +-
.../org/apache/spark/shuffle/RssSparkConfig.java | 5 ++
.../apache/spark/shuffle/RssShuffleManager.java | 8 +-
.../apache/spark/shuffle/RssShuffleManager.java | 11 ++-
.../client/factory/ShuffleClientFactory.java | 5 +-
.../client/impl/ShuffleWriteClientImpl.java | 96 ++++++++++++++--------
.../uniffle/client/util/RssClientConfig.java | 2 +
.../client/impl/ShuffleWriteClientImplTest.java | 2 +-
docs/client_guide.md | 1 +
.../uniffle/test/AssignmentWithTagsTest.java | 2 +-
.../java/org/apache/uniffle/test/QuorumTest.java | 2 +-
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 2 +-
.../uniffle/test/ShuffleWithRssClientTest.java | 2 +-
14 files changed, 102 insertions(+), 45 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index b1b9507..ed1f90f 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -55,6 +55,11 @@ public class RssMRConfig {
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+ public static final String RSS_DATA_COMMIT_POOL_SIZE =
+ MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+ public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
+
public static final String RSS_CLIENT_SEND_THREAD_NUM =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 684a5ec..740de51 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -91,11 +91,13 @@ public class RssMRUtils {
RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
int dataTransferPoolSize =
jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ int dataCommitPoolSize =
jobConf.getInt(RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE,
+ RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
ShuffleWriteClient client = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum, replica, replicaWrite, replicaRead,
replicaSkipEnabled,
- dataTransferPoolSize);
+ dataTransferPoolSize, dataCommitPoolSize);
return client;
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 041e21f..a9e845a 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -109,6 +109,11 @@ public class RssSparkConfig {
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+ public static final String RSS_DATA_COMMIT_POOL_SIZE =
+ SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+ public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
+
public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
public static final String RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE =
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 705d66b..4198460 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
private final int dataReplicaRead;
private final boolean dataReplicaSkipEnabled;
private final int dataTransferPoolSize;
+ private final int dataCommitPoolSize;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private RemoteStorageInfo remoteStorage;
@@ -168,10 +169,13 @@ public class RssShuffleManager implements ShuffleManager {
RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
int heartBeatThreadNum =
sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
- shuffleWriteClient = ShuffleClientFactory
+ this.dataCommitPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
+ RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
+ this.shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
+ dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize,
+ dataCommitPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b71f1b3..124ae61 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
private final int dataReplicaRead;
private final boolean dataReplicaSkipEnabled;
private final int dataTransferPoolSize;
+ private final int dataCommitPoolSize;
private ShuffleWriteClient shuffleWriteClient;
private final Map<String, Set<Long>> taskToSuccessBlockIds;
private final Map<String, Set<Long>> taskToFailedBlockIds;
@@ -169,11 +170,14 @@ public class RssShuffleManager implements ShuffleManager {
this.dataTransferPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ this.dataCommitPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
+ RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
+ dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize,
+ dataCommitPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
@@ -238,11 +242,14 @@ public class RssShuffleManager implements ShuffleManager {
RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
this.dataTransferPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ this.dataCommitPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
+ RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
+ dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize,
+ dataCommitPoolSize);
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockIds = taskToFailedBlockIds;
if (loop != null) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 5afa43d..d11a07f 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -36,9 +36,10 @@ public class ShuffleClientFactory {
public ShuffleWriteClient createShuffleWriteClient(
String clientType, int retryMax, long retryIntervalMax, int
heartBeatThreadNum,
- int replica, int replicaWrite, int replicaRead, boolean
replicaSkipEnabled, int dataTransferPoolSize) {
+ int replica, int replicaWrite, int replicaRead, boolean
replicaSkipEnabled, int dataTransferPoolSize,
+ int dataCommitPoolSize) {
return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- replica, replicaWrite, replicaRead, replicaSkipEnabled,
dataTransferPoolSize);
+ replica, replicaWrite, replicaRead, replicaSkipEnabled,
dataTransferPoolSize, dataCommitPoolSize);
}
public ShuffleReadClient
createShuffleReadClient(CreateShuffleReadClientRequest request) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index ce4c247..b078d42 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -88,11 +88,20 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
private int replicaRead;
private boolean replicaSkipEnabled;
private int dataTranferPoolSize;
+ private int dataCommitPoolSize = -1;
private final ForkJoinPool dataTransferPool;
- public ShuffleWriteClientImpl(String clientType, int retryMax, long
retryIntervalMax, int heartBeatThreadNum,
- int replica, int replicaWrite, int
replicaRead, boolean replicaSkipEnabled,
- int dataTranferPoolSize) {
+ public ShuffleWriteClientImpl(
+ String clientType,
+ int retryMax,
+ long retryIntervalMax,
+ int heartBeatThreadNum,
+ int replica,
+ int replicaWrite,
+ int replicaRead,
+ boolean replicaSkipEnabled,
+ int dataTranferPoolSize,
+ int dataCommitPoolSize) {
this.clientType = clientType;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
@@ -105,6 +114,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
this.replicaSkipEnabled = replicaSkipEnabled;
this.dataTranferPoolSize = dataTranferPoolSize;
this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
+ this.dataCommitPoolSize = dataCommitPoolSize;
}
private boolean sendShuffleDataAsync(
@@ -247,43 +257,62 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
return new SendShuffleDataResult(successBlockIds, failedBlockIds);
}
+ /**
+ * This method will wait until all shuffle data have been flushed
+ * to durable storage in assigned shuffle servers.
+ * @param shuffleServerInfoSet
+ * @param appId
+ * @param shuffleId
+ * @param numMaps
+ * @return
+ */
@Override
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId, int shuffleId, int numMaps) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(
+ dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() :
dataCommitPoolSize
+ );
AtomicInteger successfulCommit = new AtomicInteger(0);
- shuffleServerInfoSet.stream().forEach(ssi -> {
- RssSendCommitRequest request = new RssSendCommitRequest(appId,
shuffleId);
- String errorMsg = "Failed to commit shuffle data to " + ssi + " for
shuffleId[" + shuffleId + "]";
- long startTime = System.currentTimeMillis();
- try {
- RssSendCommitResponse response =
getShuffleServerClient(ssi).sendCommit(request);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
- int commitCount = response.getCommitCount();
- LOG.info("Successfully sendCommit for appId[" + appId + "],
shuffleId[" + shuffleId
- + "] to ShuffleServer[" + ssi.getId() + "], cost "
- + (System.currentTimeMillis() - startTime) + " ms, got committed
maps["
- + commitCount + "], map number of stage is " + numMaps);
- if (commitCount >= numMaps) {
- RssFinishShuffleResponse rfsResponse =
- getShuffleServerClient(ssi).finishShuffle(new
RssFinishShuffleRequest(appId, shuffleId));
- if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
- String msg = "Failed to finish shuffle to " + ssi + " for
shuffleId[" + shuffleId
- + "] with statusCode " + rfsResponse.getStatusCode();
+ try {
+ forkJoinPool.submit(() -> {
+ shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+ RssSendCommitRequest request = new RssSendCommitRequest(appId,
shuffleId);
+ String errorMsg = "Failed to commit shuffle data to " + ssi + " for
shuffleId[" + shuffleId + "]";
+ long startTime = System.currentTimeMillis();
+ try {
+ RssSendCommitResponse response =
getShuffleServerClient(ssi).sendCommit(request);
+ if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ int commitCount = response.getCommitCount();
+ LOG.info("Successfully sendCommit for appId[" + appId + "],
shuffleId[" + shuffleId
+ + "] to ShuffleServer[" + ssi.getId() + "], cost "
+ + (System.currentTimeMillis() - startTime) + " ms, got
committed maps["
+ + commitCount + "], map number of stage is " + numMaps);
+ if (commitCount >= numMaps) {
+ RssFinishShuffleResponse rfsResponse =
+ getShuffleServerClient(ssi).finishShuffle(new
RssFinishShuffleRequest(appId, shuffleId));
+ if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS)
{
+ String msg = "Failed to finish shuffle to " + ssi + " for
shuffleId[" + shuffleId
+ + "] with statusCode " + rfsResponse.getStatusCode();
+ LOG.error(msg);
+ throw new Exception(msg);
+ } else {
+ LOG.info("Successfully finish shuffle to " + ssi + " for
shuffleId[" + shuffleId + "]");
+ }
+ }
+ } else {
+ String msg = errorMsg + " with statusCode " +
response.getStatusCode();
LOG.error(msg);
throw new Exception(msg);
- } else {
- LOG.info("Successfully finish shuffle to " + ssi + " for
shuffleId[" + shuffleId + "]");
}
+ successfulCommit.incrementAndGet();
+ } catch (Exception e) {
+ LOG.error(errorMsg, e);
}
- } else {
- String msg = errorMsg + " with statusCode " +
response.getStatusCode();
- LOG.error(msg);
- throw new Exception(msg);
- }
- successfulCommit.incrementAndGet();
- } catch (Exception e) {
- LOG.error(errorMsg, e);
- }
- });
+ });
+ }).join();
+ } finally {
+ forkJoinPool.shutdownNow();
+ }
+
// check if every commit/finish call is successful
return successfulCommit.get() == shuffleServerInfoSet.size();
}
@@ -508,6 +537,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
public void close() {
heartBeatExecutorService.shutdownNow();
coordinatorClients.forEach(CoordinatorClient::close);
+ dataTransferPool.shutdownNow();
}
private void throwExceptionIfNecessary(ClientResponse response, String
errorMsg) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index 0c519a8..22f662b 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -36,6 +36,8 @@ public class RssClientConfig {
public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
true;
public static final String RSS_DATA_TRANSFER_POOL_SIZE =
"rss.client.data.transfer.pool.size";
public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE =
Runtime.getRuntime().availableProcessors();
+ public static final String RSS_DATA_COMMIT_POOL_SIZE =
"rss.client.data.commit.pool.size";
+ public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE = -1;
public static final String RSS_HEARTBEAT_INTERVAL = "rss.heartbeat.interval";
public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE = 10 * 1000L;
public static final String RSS_HEARTBEAT_TIMEOUT = "rss.heartbeat.timeout";
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 67efea7..d7f539e 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -42,7 +42,7 @@ public class ShuffleWriteClientImplTest {
@Test
public void testSendData() {
ShuffleWriteClientImpl shuffleWriteClient =
- new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1);
+ new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1);
ShuffleServerClient mockShuffleServerClient =
mock(ShuffleServerClient.class);
ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 14b914d..216b76a 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -87,6 +87,7 @@ These configurations are shared by all types of clients.
|<client_type>.rss.client.read.buffer.size|14m|The max data size read from
storage|
|<client_type>.rss.client.send.threadPool.size|5|The thread size for send
shuffle data to shuffle server|
|<client_type>.rss.client.assignment.tags|-|The comma-separated list of tags
for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION
will always as the assignment tag whether this conf is set or not|
+|<client_type>.rss.client.data.commit.pool.size|The number of assigned shuffle
servers|The thread size for sending commit to shuffle servers|
Notice:
1. `<client_type>` should be `spark` or `mapreduce`
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index 416af72..9ab84d4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -147,7 +147,7 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
@Test
public void testTags() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1);
+ 1, 1, 1, true, 1, 1);
shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
// Case1 : only set the single default shuffle version tag
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 0148f69..3dc71f4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -258,7 +258,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {
shuffleWriteClientImpl = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- replica, replicaWrite, replicaRead, replicaSkip, 1);
+ replica, replicaWrite, replicaRead, replicaSkip, 1, 1);
List<ShuffleServerInfo> allServers =
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 4339239..a1cc6a1 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -101,7 +101,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
public void clearResourceTest() throws Exception {
final ShuffleWriteClient shuffleWriteClient =
ShuffleClientFactory.getInstance().createShuffleWriteClient(
- "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1);
+ "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1);
shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
shuffleWriteClient.registerShuffle(
new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 53f2869..f2e35c1 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -88,7 +88,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
@BeforeEach
public void createClient() {
shuffleWriteClientImpl = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1);
+ 1, 1, 1, true, 1, 1);
}
@AfterEach