This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3bc0ee50d [#1662] fix(test): Fix Netty related flaky tests (#1663)
3bc0ee50d is described below
commit 3bc0ee50d1e3f60ba5e42c4c92697f1a6e927496
Author: RickyMa <[email protected]>
AuthorDate: Thu Apr 25 11:06:51 2024 +0800
[#1662] fix(test): Fix Netty related flaky tests (#1663)
### What changes were proposed in this pull request?
Fix some flaky tests.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/1662.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unnecessary.
---
.../manager/ShuffleManagerServerFactory.java | 2 +-
.../shuffle/reader/RssShuffleDataIteratorTest.java | 2 ++
.../manager/ShuffleManagerServerFactoryTest.java | 25 +++++++++--------
.../uniffle/client/impl/ShuffleReadClientImpl.java | 9 ++++---
.../client/impl/ShuffleReadClientImplTest.java | 2 ++
.../uniffle/test/DiskErrorToleranceTest.java | 6 +----
.../test/HybridStorageFaultToleranceBase.java | 22 +++++++--------
.../test/HybridStorageHadoopFallbackTest.java | 6 -----
.../test/HybridStorageLocalFileFallbackTest.java | 6 -----
.../java/org/apache/uniffle/test/QuorumTest.java | 1 +
.../apache/uniffle/test/RpcClientRetryTest.java | 1 +
.../ShuffleServerConcurrentWriteOfHadoopTest.java | 2 ++
.../test/ShuffleServerFaultToleranceTest.java | 6 +----
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 6 -----
.../uniffle/test/ShuffleServerWithHadoopTest.java | 18 +++++--------
.../ShuffleServerWithKerberizedHadoopTest.java | 23 +++++-----------
.../ShuffleServerWithLocalOfExceptionTest.java | 1 -
.../ShuffleServerWithLocalOfLocalOrderTest.java | 6 -----
.../uniffle/test/ShuffleServerWithLocalTest.java | 6 -----
.../test/ShuffleServerWithMemLocalHadoopTest.java | 15 +++++------
.../uniffle/test/ShuffleServerWithMemoryTest.java | 6 -----
.../uniffle/test/ShuffleWithRssClientTest.java | 2 ++
.../org/apache/uniffle/test/DynamicConfTest.java | 1 +
.../test/RepartitionWithLocalFileRssTest.java | 31 +++++++++++++++++-----
.../uniffle/test/RepartitionWithMemoryRssTest.java | 2 +-
.../apache/uniffle/test/RssShuffleManagerTest.java | 6 ++---
.../uniffle/test/SparkClientWithLocalTest.java | 6 +----
.../uniffle/test/SparkIntegrationTestBase.java | 23 +++++++++-------
.../ContinuousSelectPartitionStrategyTest.java | 15 +++++++++--
.../test/GetShuffleReportForMultiPartTest.java | 4 +--
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 8 +++++-
.../TopNShuffleDataSizeOfAppCalcTaskTest.java | 6 -----
32 files changed, 129 insertions(+), 146 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
index 982d9f77f..d139a5319 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -43,7 +43,7 @@ public class ShuffleManagerServerFactory {
public GrpcServer getServer(ShuffleManagerGrpcService service) {
ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE);
- if (type == ServerType.GRPC) {
+ if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
if (service == null) {
service = new ShuffleManagerGrpcService(shuffleManager);
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index b7a44bf21..3f6993c82 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -44,6 +44,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
@@ -124,6 +125,7 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
boolean compress) {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
+ .clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
index e17d4ef00..34a1f2402 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
@@ -17,26 +17,29 @@
package org.apache.uniffle.shuffle.manager;
-import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
public class ShuffleManagerServerFactoryTest {
- @Test
- public void testShuffleManagerServerType() {
+ private static Stream<Arguments> shuffleManagerServerTypeProvider() {
+ return Arrays.stream(ServerType.values()).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("shuffleManagerServerTypeProvider")
+ public void testShuffleManagerServerType(ServerType serverType) {
// add code to generate tests that check the server type
RssBaseConf conf = new RssBaseConf();
- conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC);
+ conf.set(RssBaseConf.RPC_SERVER_TYPE, serverType);
ShuffleManagerServerFactory factory = new
ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();
-
- // other types should raise an exception
- conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY);
- factory = new ShuffleManagerServerFactory(null, conf);
- assertThrows(UnsupportedOperationException.class, factory::getServer);
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 4a789bfa2..e1aa0f958 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -102,12 +102,13 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
readBufferSize = Integer.MAX_VALUE;
}
boolean offHeapEnabled =
builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);
-
builder.indexReadLimit(indexReadLimit);
builder.storageType(storageType);
builder.readBufferSize(readBufferSize);
builder.offHeapEnable(offHeapEnabled);
-
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
+ if (builder.getClientType() == null) {
+
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
+ }
} else {
// most for test
RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() :
builder.getRssConf();
@@ -131,7 +132,9 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
builder.rssConf(rssConf);
builder.offHeapEnable(false);
builder.expectedTaskIdsBitmapFilterEnable(false);
- builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
+ if (builder.getClientType() == null) {
+ builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
+ }
}
if (builder.getIdHelper() == null) {
builder.idHelper(new
DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 4c0679a11..5d0d4b410 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -36,6 +36,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
@@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase
{
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
+ .clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 066135155..4029d1ca6 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -51,8 +51,6 @@ import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -99,11 +97,8 @@ public class DiskErrorToleranceTest extends
ShuffleReadWriteBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
@@ -193,6 +188,7 @@ public class DiskErrorToleranceTest extends
ShuffleReadWriteBase {
isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList;
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(appId)
.shuffleId(0)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
index eb44a9b69..8a2fe2635 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
@@ -49,8 +50,6 @@ import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -63,7 +62,7 @@ public abstract class HybridStorageFaultToleranceBase extends
ShuffleReadWriteBa
protected ShuffleServerGrpcNettyClient nettyShuffleServerClient;
protected static ShuffleServerConf grpcShuffleServerConfig;
protected static ShuffleServerConf nettyShuffleServerConfig;
- private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";
+ private static String REMOTE_STORAGE = HDFS_URI +
"rss/multi_storage_fault_%s";
@BeforeEach
public void createClient() throws Exception {
@@ -71,11 +70,8 @@ public abstract class HybridStorageFaultToleranceBase
extends ShuffleReadWriteBa
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
@@ -100,7 +96,7 @@ public abstract class HybridStorageFaultToleranceBase
extends ShuffleReadWriteBa
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(0, Lists.newArrayList(0));
- registerShuffle(appId, map);
+ registerShuffle(appId, map, isNettyMode);
Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
final List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap,
expectedData);
@@ -110,7 +106,10 @@ public abstract class HybridStorageFaultToleranceBase
extends ShuffleReadWriteBa
appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0),
expectedData, isNettyMode);
}
- private void registerShuffle(String appId, Map<Integer, List<Integer>>
registerMap) {
+ private void registerShuffle(
+ String appId, Map<Integer, List<Integer>> registerMap, boolean
isNettyMode) {
+ ShuffleServerClient shuffleServerClient =
+ isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient;
for (Map.Entry<Integer, List<Integer>> entry : registerMap.entrySet()) {
for (int partition : entry.getValue()) {
RssRegisterShuffleRequest rr =
@@ -118,8 +117,8 @@ public abstract class HybridStorageFaultToleranceBase
extends ShuffleReadWriteBa
appId,
entry.getKey(),
Lists.newArrayList(new PartitionRange(partition, partition)),
- REMOTE_STORAGE);
- grpcShuffleServerClient.registerShuffle(rr);
+ String.format(REMOTE_STORAGE, isNettyMode));
+ shuffleServerClient.registerShuffle(rr);
}
}
}
@@ -171,6 +170,7 @@ public abstract class HybridStorageFaultToleranceBase
extends ShuffleReadWriteBa
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE_HDFS.name())
.appId(appId)
.shuffleId(shuffleId)
@@ -179,7 +179,7 @@ public abstract class HybridStorageFaultToleranceBase
extends ShuffleReadWriteBa
.partitionNumPerRange(1)
.partitionNum(10)
.readBufferSize(1000)
- .basePath(REMOTE_STORAGE)
+ .basePath(String.format(REMOTE_STORAGE, isNettyMode))
.blockIdBitmap(blockBitmap)
.taskIdBitmap(taskBitmap)
.shuffleServerInfoList(Lists.newArrayList(ssi))
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java
index 7b53fe606..4ec4fff32 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java
@@ -27,9 +27,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
-import org.apache.uniffle.common.ClientType;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -83,11 +80,8 @@ public class HybridStorageHadoopFallbackTest extends
HybridStorageFaultTolerance
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java
index 6c63f44ea..15680573b 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java
@@ -27,9 +27,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
-import org.apache.uniffle.common.ClientType;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -68,11 +65,8 @@ public class HybridStorageLocalFileFallbackTest extends
HybridStorageFaultTolera
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index da9dd7158..6662a3670 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -75,6 +75,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
+ .clientType(ClientType.GRPC)
.storageType(StorageType.MEMORY_LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
index abefb1b0d..d082ae025 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
@@ -63,6 +63,7 @@ public class RpcClientRetryTest extends ShuffleReadWriteBase {
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType
storageType) {
return ShuffleClientFactory.newReadBuilder()
+ .clientType(ClientType.GRPC)
.storageType(storageType.name())
.shuffleId(0)
.partitionId(0)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index ec5222919..7d5d11481 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -46,6 +46,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -177,6 +178,7 @@ public class ShuffleServerConcurrentWriteOfHadoopTest
extends ShuffleServerWithH
});
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId(appId)
.shuffleId(0)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 631cd7fe9..2eee21227 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -47,8 +47,6 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -88,8 +86,6 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
startServers();
grpcShuffleServerClients = new ArrayList<>();
nettyShuffleServerClients = new ArrayList<>();
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
for (ShuffleServer shuffleServer : grpcShuffleServers) {
grpcShuffleServerClients.add(
new ShuffleServerGrpcClient(shuffleServer.getIp(),
shuffleServer.getGrpcPort()));
@@ -97,7 +93,7 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
for (ShuffleServer shuffleServer : nettyShuffleServers) {
nettyShuffleServerClients.add(
new ShuffleServerGrpcNettyClient(
- rssConf, LOCALHOST, shuffleServer.getGrpcPort(),
shuffleServer.getNettyPort()));
+ LOCALHOST, shuffleServer.getGrpcPort(),
shuffleServer.getNettyPort()));
}
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 39e3deefb..9b486629c 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -62,15 +62,12 @@ import
org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
@@ -149,11 +146,8 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
index c899b8eac..67a9dc05c 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
@@ -46,8 +46,6 @@ import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -93,11 +91,8 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
@@ -109,8 +104,9 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
nettyShuffleServerClient.close();
}
- private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean
isNettyMode) {
return ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.shuffleId(0)
.partitionId(0)
@@ -172,7 +168,7 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
@@ -208,7 +204,7 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
shuffleServerClient.finishShuffle(rfsr);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
@@ -218,7 +214,7 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
validateResult(readClient, expectedData, bitmaps[0]);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(1)
.basePath(dataBasePath)
@@ -229,7 +225,7 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
validateResult(readClient, expectedData, bitmaps[1]);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(2)
.basePath(dataBasePath)
@@ -240,7 +236,7 @@ public class ShuffleServerWithHadoopTest extends
ShuffleReadWriteBase {
validateResult(readClient, expectedData, bitmaps[2]);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(3)
.basePath(dataBasePath)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 036120499..77330fe2b 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import com.google.common.collect.Lists;
@@ -55,8 +54,6 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RssUtils;
@@ -95,10 +92,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
private static ShuffleServerConf grpcShuffleServerConfig;
private static ShuffleServerConf nettyShuffleServerConfig;
- private static AtomicInteger serverRpcPortCounter = new AtomicInteger();
- private static AtomicInteger nettyPortCounter = new AtomicInteger();
- private static AtomicInteger jettyPortCounter = new AtomicInteger();
-
static @TempDir File tempDir;
private static ShuffleServerConf getShuffleServerConf(ServerType serverType)
throws Exception {
@@ -173,11 +166,8 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
new ShuffleServerGrpcClient(
LOCALHOST,
getShuffleServerConf(ServerType.GRPC).getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
getShuffleServerConf(ServerType.GRPC_NETTY)
.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
@@ -225,8 +215,9 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
return partitionToBlocks;
}
- private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean
isNettyMode) {
return ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.shuffleId(0)
.partitionId(0)
@@ -305,7 +296,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
: new ShuffleServerInfo(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
@@ -341,7 +332,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
shuffleServerClient.finishShuffle(rfsr);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
@@ -351,7 +342,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
validateResult(readClient, expectedData, bitmaps[0]);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(1)
.basePath(dataBasePath)
@@ -362,7 +353,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
validateResult(readClient, expectedData, bitmaps[1]);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(2)
.basePath(dataBasePath)
@@ -373,7 +364,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
validateResult(readClient, expectedData, bitmaps[2]);
readClient =
- baseReadBuilder()
+ baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(3)
.basePath(dataBasePath)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
index 3f93da969..f51bc3649 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -40,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleServerWithLocalOfExceptionTest extends
ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;
- private static String REMOTE_STORAGE = HDFS_URI + "rss/test";
private static int rpcPort;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
index 2803f05e2..b0ed66d7f 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
@@ -48,13 +48,10 @@ import
org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter;
@@ -116,11 +113,8 @@ public class ShuffleServerWithLocalOfLocalOrderTest
extends ShuffleReadWriteBase
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index 29321b3d5..a77472e6a 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -45,12 +45,9 @@ import
org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ChecksumUtils;
@@ -110,11 +107,8 @@ public class ShuffleServerWithLocalTest extends
ShuffleReadWriteBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
index 549ae21e9..f969397f8 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
@@ -43,13 +43,10 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
@@ -76,7 +73,7 @@ public class ShuffleServerWithMemLocalHadoopTest extends
ShuffleReadWriteBase {
LoggerFactory.getLogger(ShuffleServerWithMemLocalHadoopTest.class);
private ShuffleServerGrpcClient grpcShuffleServerClient;
private ShuffleServerGrpcNettyClient nettyShuffleServerClient;
- private static String REMOTE_STORAGE = HDFS_URI + "rss/test";
+ private static String REMOTE_STORAGE = HDFS_URI + "rss/test_%s";
private static ShuffleServerConf grpcShuffleServerConfig;
private static ShuffleServerConf nettyShuffleServerConfig;
@@ -107,11 +104,8 @@ public class ShuffleServerWithMemLocalHadoopTest extends
ShuffleReadWriteBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
@@ -167,7 +161,10 @@ public class ShuffleServerWithMemLocalHadoopTest extends
ShuffleReadWriteBase {
int partitionId = 0;
RssRegisterShuffleRequest rrsr =
new RssRegisterShuffleRequest(
- testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)),
REMOTE_STORAGE);
+ testAppId,
+ 0,
+ Lists.newArrayList(new PartitionRange(0, 0)),
+ String.format(REMOTE_STORAGE, isNettyMode));
shuffleServerClient.registerShuffle(rrsr);
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
Map<Long, byte[]> dataMap = Maps.newHashMap();
@@ -216,7 +213,7 @@ public class ShuffleServerWithMemLocalHadoopTest extends
ShuffleReadWriteBase {
500,
expectBlockIds,
processBlockIds,
- REMOTE_STORAGE,
+ String.format(REMOTE_STORAGE, isNettyMode),
conf);
ClientReadHandler[] handlers = new ClientReadHandler[3];
handlers[0] = memoryClientReadHandler;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index fb01071f0..a2de8f761 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -41,13 +41,10 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
@@ -112,11 +109,8 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index c905968b3..de96781c8 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -357,6 +357,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
+ .clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
@@ -380,6 +381,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
assertTrue(commitResult);
readClient =
ShuffleClientFactory.newReadBuilder()
+ .clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
index ab1b9de1f..c5421d113 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
@@ -52,6 +52,7 @@ public class DynamicConfTest extends MRIntegrationTestBase {
@Override
protected void updateRssConfiguration(Configuration jobConf) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
index 29066247e..4f30c4997 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
@@ -47,14 +47,25 @@ public class RepartitionWithLocalFileRssTest extends
RepartitionTest {
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+
+ ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
- String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
- shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
- shuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
- shuffleServerConf.setString("rss.storage.basePath", basePath);
- createShuffleServer(shuffleServerConf);
+ String grpcBasePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
+ grpcShuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
+ grpcShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE,
true);
+ grpcShuffleServerConf.setString("rss.storage.basePath", grpcBasePath);
+ createShuffleServer(grpcShuffleServerConf);
+
+ ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ File dataDir3 = new File(tmpDir, "data3");
+ File dataDir4 = new File(tmpDir, "data4");
+ String nettyBasePath = dataDir3.getAbsolutePath() + "," +
dataDir4.getAbsolutePath();
+ nettyShuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
+ nettyShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE,
true);
+ nettyShuffleServerConf.setString("rss.storage.basePath", nettyBasePath);
+ createShuffleServer(nettyShuffleServerConf);
+
startServers();
}
@@ -76,13 +87,19 @@ public class RepartitionWithLocalFileRssTest extends
RepartitionTest {
Map resultWithoutRss = runSparkApp(sparkConf, fileName);
results.add(resultWithoutRss);
- updateSparkConfWithRss(sparkConf);
+ updateSparkConfWithRssGrpc(sparkConf);
updateSparkConfCustomer(sparkConf);
for (Codec.Type type : new Codec.Type[] {Codec.Type.NOOP, Codec.Type.ZSTD,
Codec.Type.LZ4}) {
sparkConf.set("spark." + COMPRESSION_TYPE.key().toLowerCase(),
type.name());
Map resultWithRss = runSparkApp(sparkConf, fileName);
results.add(resultWithRss);
}
+ updateSparkConfWithRssNetty(sparkConf);
+ for (Codec.Type type : new Codec.Type[] {Codec.Type.NOOP, Codec.Type.ZSTD,
Codec.Type.LZ4}) {
+ sparkConf.set("spark." + COMPRESSION_TYPE.key().toLowerCase(),
type.name());
+ Map resultWithRss = runSparkApp(sparkConf, fileName);
+ results.add(resultWithRss);
+ }
for (int i = 1; i < results.size(); i++) {
verifyTestResult(results.get(0), results.get(i));
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
index 571907408..3444a1cee 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
@@ -63,7 +63,7 @@ public class RepartitionWithMemoryRssTest extends
RepartitionTest {
public void testMemoryRelease() throws Exception {
final String fileName = generateTextFile(10000, 10000);
SparkConf sparkConf = createSparkConf();
- updateSparkConfWithRss(sparkConf);
+ updateSparkConfWithRssGrpc(sparkConf);
sparkConf.set("spark.executor.memory", "500m");
sparkConf.set("spark.unsafe.exceptionOnMemoryLeak", "true");
updateRssStorage(sparkConf);
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 8414cd0b9..3cf1736c5 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -141,16 +141,16 @@ public class RssShuffleManagerTest extends
SparkIntegrationTestBase {
BlockIdLayout clientConfLayout,
BlockIdLayout dynamicConfLayout,
BlockIdLayout expectedLayout,
- boolean enableDynamicCLientConf)
+ boolean enableDynamicClientConf)
throws Exception {
Map<String, String> dynamicConf = startServers(dynamicConfLayout);
SparkConf conf = createSparkConf();
- updateSparkConfWithRss(conf);
+ updateSparkConfWithRssGrpc(conf);
// enable stage recompute
conf.set("spark." + RssClientConfig.RSS_RESUBMIT_STAGE, "true");
// enable dynamic client conf
- conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
enableDynamicCLientConf);
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
enableDynamicClientConf);
// configure storage type
conf.set("spark." + RssClientConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE.name());
// restarting the coordinator may cause RssException: There isn't enough
shuffle servers
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index c134ced35..11e60540e 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -47,8 +47,6 @@ import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.BlockIdLayout;
@@ -111,11 +109,8 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
@@ -140,6 +135,7 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)));
return ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 6d48b901c..e1095e260 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -29,6 +29,8 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ClientType;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public abstract class SparkIntegrationTestBase extends IntegrationTestBase {
@@ -54,24 +56,24 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
final long durationWithoutRss = System.currentTimeMillis() - start;
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
- updateSparkConfWithRss(sparkConf);
+ updateSparkConfWithRssGrpc(sparkConf);
updateSparkConfCustomer(sparkConf);
start = System.currentTimeMillis();
- Map resultWithRss = runSparkApp(sparkConf, fileName);
- final long durationWithRss = System.currentTimeMillis() - start;
+ Map resultWithRssGrpc = runSparkApp(sparkConf, fileName);
+ final long durationWithRssGrpc = System.currentTimeMillis() - start;
updateSparkConfWithRssNetty(sparkConf);
start = System.currentTimeMillis();
Map resultWithRssNetty = runSparkApp(sparkConf, fileName);
final long durationWithRssNetty = System.currentTimeMillis() - start;
- verifyTestResult(resultWithoutRss, resultWithRss);
+ verifyTestResult(resultWithoutRss, resultWithRssGrpc);
verifyTestResult(resultWithoutRss, resultWithRssNetty);
LOG.info(
"Test: durationWithoutRss["
+ durationWithoutRss
- + "], durationWithRss["
- + durationWithRss
+ + "], durationWithRssGrpc["
+ + durationWithRssGrpc
+ "]"
+ "], durationWithRssNetty["
+ durationWithRssNetty
@@ -90,16 +92,16 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
spark.close();
}
spark = SparkSession.builder().config(sparkConf).getOrCreate();
- Map resultWithRss = runTest(spark, testFileName);
+ Map result = runTest(spark, testFileName);
spark.stop();
- return resultWithRss;
+ return result;
}
protected SparkConf createSparkConf() {
return new
SparkConf().setAppName(this.getClass().getSimpleName()).setMaster("local[4]");
}
- public void updateSparkConfWithRss(SparkConf sparkConf) {
+ public void updateSparkConfWithRssGrpc(SparkConf sparkConf) {
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
sparkConf.set(
"spark.shuffle.sort.io.plugin.class",
"org.apache.spark.shuffle.RssShuffleDataIo");
@@ -118,10 +120,11 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "1m");
sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000");
sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
}
public void updateSparkConfWithRssNetty(SparkConf sparkConf) {
- sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, "GRPC_NETTY");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE,
ClientType.GRPC_NETTY.name());
}
protected void verifyTestResult(Map expected, Map actual) {
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
index 3b02caff2..1b46e6e95 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
@@ -140,8 +140,19 @@ public class ContinuousSelectPartitionStrategyTest extends
SparkIntegrationTestB
}
@Override
- public void updateSparkConfWithRss(SparkConf sparkConf) {
- super.updateSparkConfWithRss(sparkConf);
+ public void updateSparkConfWithRssGrpc(SparkConf sparkConf) {
+ super.updateSparkConfWithRssGrpc(sparkConf);
+ addMultiReplicaConf(sparkConf);
+ }
+
+ @Override
+ public void updateSparkConfWithRssNetty(SparkConf sparkConf) {
+ super.updateSparkConfWithRssNetty(sparkConf);
+ // Add multi replica conf
+ addMultiReplicaConf(sparkConf);
+ }
+
+ private static void addMultiReplicaConf(SparkConf sparkConf) {
// Add multi replica conf
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(),
String.valueOf(replicateWrite));
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(),
String.valueOf(replicateWrite));
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index 75b43e756..462ee8dab 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -140,8 +140,8 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
}
@Override
- public void updateSparkConfWithRss(SparkConf sparkConf) {
- super.updateSparkConfWithRss(sparkConf);
+ public void updateSparkConfWithRssGrpc(SparkConf sparkConf) {
+ super.updateSparkConfWithRssGrpc(sparkConf);
// Add multi replica conf
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(),
String.valueOf(replicateWrite));
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(),
String.valueOf(replicateWrite));
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index f677b6385..0c6860ad1 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,9 +59,14 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
private int nettyPort;
private TransportClientFactory clientFactory;
+ @VisibleForTesting
+ public ShuffleServerGrpcNettyClient(String host, int grpcPort, int
nettyPort) {
+ this(new RssConf(), host, grpcPort, nettyPort);
+ }
+
public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int
grpcPort, int nettyPort) {
this(
- rssConf,
+ rssConf == null ? new RssConf() : rssConf,
host,
grpcPort,
nettyPort,
diff --git
a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
index ab9e8ad02..e4b2559aa 100644
---
a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
@@ -48,11 +48,8 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
-import org.apache.uniffle.common.config.RssClientConf;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
@@ -168,12 +165,9 @@ public class TopNShuffleDataSizeOfAppCalcTaskTest {
private void registerAndRequireBuffer(String appId, int length, boolean
isNettyMode)
throws Exception {
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
ShuffleServerGrpcClient shuffleServerClient =
isNettyMode
? new ShuffleServerGrpcNettyClient(
- rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))