This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 51a2639c5cc4b418e8c4b62d3f816bd4d767f508 Author: RickyMa <[email protected]> AuthorDate: Thu Apr 25 11:06:51 2024 +0800 [#1662] fix(test): Fix Netty related flaky tests (#1663) ### What changes were proposed in this pull request? Fix some flaky tests. ### Why are the changes needed? Fix: https://github.com/apache/incubator-uniffle/issues/1662. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unnecessary. --- .../manager/ShuffleManagerServerFactory.java | 2 +- .../shuffle/reader/RssShuffleDataIteratorTest.java | 2 ++ .../manager/ShuffleManagerServerFactoryTest.java | 25 +++++++++-------- .../uniffle/client/impl/ShuffleReadClientImpl.java | 9 ++++--- .../client/impl/ShuffleReadClientImplTest.java | 2 ++ .../uniffle/test/DiskErrorToleranceTest.java | 6 +---- .../test/HybridStorageFaultToleranceBase.java | 22 +++++++-------- .../test/HybridStorageHadoopFallbackTest.java | 6 ----- .../test/HybridStorageLocalFileFallbackTest.java | 6 ----- .../java/org/apache/uniffle/test/QuorumTest.java | 1 + .../apache/uniffle/test/RpcClientRetryTest.java | 1 + .../ShuffleServerConcurrentWriteOfHadoopTest.java | 2 ++ .../test/ShuffleServerFaultToleranceTest.java | 6 +---- .../apache/uniffle/test/ShuffleServerGrpcTest.java | 6 ----- .../uniffle/test/ShuffleServerWithHadoopTest.java | 18 +++++-------- .../ShuffleServerWithKerberizedHadoopTest.java | 23 +++++----------- .../ShuffleServerWithLocalOfExceptionTest.java | 1 - .../ShuffleServerWithLocalOfLocalOrderTest.java | 6 ----- .../uniffle/test/ShuffleServerWithLocalTest.java | 6 ----- .../test/ShuffleServerWithMemLocalHadoopTest.java | 15 +++++------ .../uniffle/test/ShuffleServerWithMemoryTest.java | 6 ----- .../uniffle/test/ShuffleWithRssClientTest.java | 2 ++ .../org/apache/uniffle/test/DynamicConfTest.java | 1 + .../test/RepartitionWithLocalFileRssTest.java | 31 +++++++++++++++++----- .../uniffle/test/RepartitionWithMemoryRssTest.java | 2 +- .../apache/uniffle/test/RssShuffleManagerTest.java | 6 ++--- .../uniffle/test/SparkClientWithLocalTest.java | 6 +---- .../uniffle/test/SparkIntegrationTestBase.java | 23 +++++++++------- .../ContinuousSelectPartitionStrategyTest.java | 15 +++++++++-- .../test/GetShuffleReportForMultiPartTest.java | 4 +-- .../impl/grpc/ShuffleServerGrpcNettyClient.java | 8 +++++- .../TopNShuffleDataSizeOfAppCalcTaskTest.java | 6 ----- 32 files changed, 129 insertions(+), 146 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java index 982d9f77f..d139a5319 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java @@ -43,7 +43,7 @@ public class ShuffleManagerServerFactory { public GrpcServer getServer(ShuffleManagerGrpcService service) { ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE); - if (type == ServerType.GRPC) { + if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) { if (service == null) { service = new ShuffleManagerGrpcService(shuffleManager); } diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java index b7a44bf21..3f6993c82 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java +++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java @@ -44,6 +44,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.client.api.ShuffleReadClient; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.impl.ShuffleReadClientImpl; +import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.BlockIdLayout; @@ -124,6 +125,7 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest { boolean compress) { ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.HDFS.name()) .appId("appId") .shuffleId(0) diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java index e17d4ef00..34a1f2402 100644 --- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java @@ -17,26 +17,29 @@ package org.apache.uniffle.shuffle.manager; -import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.rpc.ServerType; -import static org.junit.jupiter.api.Assertions.assertThrows; - public class ShuffleManagerServerFactoryTest { - @Test - public void testShuffleManagerServerType() { + private static Stream<Arguments> shuffleManagerServerTypeProvider() { + return Arrays.stream(ServerType.values()).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("shuffleManagerServerTypeProvider") + public void testShuffleManagerServerType(ServerType serverType) { // add code to generate tests that check the server type RssBaseConf conf = new RssBaseConf(); - conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC); + conf.set(RssBaseConf.RPC_SERVER_TYPE, serverType); ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf); // this should execute normally; factory.getServer(); - - // other types should raise an exception - conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY); - factory = new ShuffleManagerServerFactory(null, conf); - assertThrows(UnsupportedOperationException.class, factory::getServer); } } diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index 4a789bfa2..e1aa0f958 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -102,12 +102,13 @@ public class ShuffleReadClientImpl implements ShuffleReadClient { readBufferSize = Integer.MAX_VALUE; } boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE); - builder.indexReadLimit(indexReadLimit); builder.storageType(storageType); builder.readBufferSize(readBufferSize); builder.offHeapEnable(offHeapEnabled); - builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE)); + if (builder.getClientType() == null) { + builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE)); + } } else { // most for test RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf(); @@ -131,7 +132,9 @@ public class ShuffleReadClientImpl implements ShuffleReadClient { builder.rssConf(rssConf); builder.offHeapEnable(false); builder.expectedTaskIdsBitmapFilterEnable(false); - builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE)); + if (builder.getClientType() == null) { + builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE)); + } } if (builder.getIdHelper() == null) { builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf()))); diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java index 4c0679a11..5d0d4b410 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java @@ -36,6 +36,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.client.TestUtils; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.response.CompressedShuffleBlock; +import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssClientConf; @@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase { private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { return ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.HDFS.name()) .appId("appId") .shuffleId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java index 066135155..4029d1ca6 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java @@ -51,8 +51,6 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -99,11 +97,8 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -193,6 +188,7 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase { isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList; ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .appId(appId) .shuffleId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java index eb44a9b69..8a2fe2635 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java @@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.apache.uniffle.client.api.ShuffleServerClient; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.client.impl.ShuffleReadClientImpl; @@ -49,8 +50,6 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; @@ -63,7 +62,7 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa protected ShuffleServerGrpcNettyClient nettyShuffleServerClient; protected static ShuffleServerConf grpcShuffleServerConfig; protected static ShuffleServerConf nettyShuffleServerConfig; - private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault"; + private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault_%s"; @BeforeEach public void createClient() throws Exception { @@ -71,11 +70,8 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -100,7 +96,7 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa Map<Long, byte[]> expectedData = Maps.newHashMap(); Map<Integer, List<Integer>> map = Maps.newHashMap(); map.put(0, Lists.newArrayList(0)); - registerShuffle(appId, map); + registerShuffle(appId, map, isNettyMode); Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf(); final List<ShuffleBlockInfo> blocks = createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData); @@ -110,7 +106,10 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0), expectedData, isNettyMode); } - private void registerShuffle(String appId, Map<Integer, List<Integer>> registerMap) { + private void registerShuffle( + String appId, Map<Integer, List<Integer>> registerMap, boolean isNettyMode) { + ShuffleServerClient shuffleServerClient = + isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient; for (Map.Entry<Integer, List<Integer>> entry : registerMap.entrySet()) { for (int partition : entry.getValue()) { RssRegisterShuffleRequest rr = @@ -118,8 +117,8 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa appId, entry.getKey(), Lists.newArrayList(new PartitionRange(partition, partition)), - REMOTE_STORAGE); - grpcShuffleServerClient.registerShuffle(rr); + String.format(REMOTE_STORAGE, isNettyMode)); + shuffleServerClient.registerShuffle(rr); } } } @@ -171,6 +170,7 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE_HDFS.name()) .appId(appId) .shuffleId(shuffleId) @@ -179,7 +179,7 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa .partitionNumPerRange(1) .partitionNum(10) .readBufferSize(1000) - .basePath(REMOTE_STORAGE) + .basePath(String.format(REMOTE_STORAGE, isNettyMode)) .blockIdBitmap(blockBitmap) .taskIdBitmap(taskBitmap) .shuffleServerInfoList(Lists.newArrayList(ssi)) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java index 7b53fe606..4ec4fff32 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java @@ -27,9 +27,6 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; -import org.apache.uniffle.common.ClientType; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; @@ -83,11 +80,8 @@ public class HybridStorageHadoopFallbackTest extends HybridStorageFaultTolerance grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java index 6c63f44ea..15680573b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java @@ -27,9 +27,6 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; -import org.apache.uniffle.common.ClientType; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; @@ -68,11 +65,8 @@ public class HybridStorageLocalFileFallbackTest extends HybridStorageFaultTolera grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java index da9dd7158..6662a3670 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java @@ -75,6 +75,7 @@ public class QuorumTest extends ShuffleReadWriteBase { private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { return ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.MEMORY_LOCALFILE.name()) .shuffleId(0) .partitionId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java index abefb1b0d..d082ae025 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java @@ -63,6 +63,7 @@ public class RpcClientRetryTest extends ShuffleReadWriteBase { private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) { return ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(storageType.name()) .shuffleId(0) .partitionId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java index ec5222919..7d5d11481 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java @@ -46,6 +46,7 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest; import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; @@ -177,6 +178,7 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends ShuffleServerWithH }); ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.HDFS.name()) .appId(appId) .shuffleId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java index 631cd7fe9..2eee21227 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java @@ -47,8 +47,6 @@ import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.ByteBufUtils; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -88,8 +86,6 @@ public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase { startServers(); grpcShuffleServerClients = new ArrayList<>(); nettyShuffleServerClients = new ArrayList<>(); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); for (ShuffleServer shuffleServer : grpcShuffleServers) { grpcShuffleServerClients.add( new ShuffleServerGrpcClient(shuffleServer.getIp(), shuffleServer.getGrpcPort())); @@ -97,7 +93,7 @@ public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase { for (ShuffleServer shuffleServer : nettyShuffleServers) { nettyShuffleServerClients.add( new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, shuffleServer.getGrpcPort(), shuffleServer.getNettyPort())); + LOCALHOST, shuffleServer.getGrpcPort(), shuffleServer.getNettyPort())); } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index 39e3deefb..9b486629c 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -62,15 +62,12 @@ import org.apache.uniffle.client.response.RssGetShuffleResultResponse; import org.apache.uniffle.client.response.RssRegisterShuffleResponse; import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssBaseConf; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.metrics.TestUtils; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; @@ -149,11 +146,8 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java index c899b8eac..67a9dc05c 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java @@ -46,8 +46,6 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -93,11 +91,8 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -109,8 +104,9 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { nettyShuffleServerClient.close(); } - private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { + private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) { return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.HDFS.name()) .shuffleId(0) .partitionId(0) @@ -172,7 +168,7 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); ShuffleReadClientImpl readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -208,7 +204,7 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { shuffleServerClient.finishShuffle(rfsr); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -218,7 +214,7 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { validateResult(readClient, expectedData, bitmaps[0]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(1) .basePath(dataBasePath) @@ -229,7 +225,7 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { validateResult(readClient, expectedData, bitmaps[1]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(2) .basePath(dataBasePath) @@ -240,7 +236,7 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase { validateResult(readClient, expectedData, bitmaps[2]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(3) .basePath(dataBasePath) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java index 036120499..77330fe2b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import com.google.common.collect.Lists; @@ -55,8 +54,6 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.RssUtils; @@ -95,10 +92,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase private static ShuffleServerConf grpcShuffleServerConfig; private static ShuffleServerConf nettyShuffleServerConfig; - private static AtomicInteger serverRpcPortCounter = new AtomicInteger(); - private static AtomicInteger nettyPortCounter = new AtomicInteger(); - private static AtomicInteger jettyPortCounter = new AtomicInteger(); - static @TempDir File tempDir; private static ShuffleServerConf getShuffleServerConf(ServerType serverType) throws Exception { @@ -173,11 +166,8 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase new ShuffleServerGrpcClient( LOCALHOST, getShuffleServerConf(ServerType.GRPC).getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, getShuffleServerConf(ServerType.GRPC_NETTY) .getInteger(ShuffleServerConf.RPC_SERVER_PORT), @@ -225,8 +215,9 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase return partitionToBlocks; } - private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { + private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) { return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.HDFS.name()) .shuffleId(0) .partitionId(0) @@ -305,7 +296,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase : new ShuffleServerInfo( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); ShuffleReadClientImpl readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -341,7 +332,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase shuffleServerClient.finishShuffle(rfsr); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -351,7 +342,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase validateResult(readClient, expectedData, bitmaps[0]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(1) .basePath(dataBasePath) @@ -362,7 +353,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase validateResult(readClient, expectedData, bitmaps[1]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(2) .basePath(dataBasePath) @@ -373,7 +364,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase validateResult(readClient, expectedData, bitmaps[2]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(3) .basePath(dataBasePath) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java index 3f93da969..f51bc3649 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java @@ -40,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.fail; public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase { private ShuffleServerGrpcClient shuffleServerClient; - private static String REMOTE_STORAGE = HDFS_URI + "rss/test"; private static int rpcPort; diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java index 2803f05e2..b0ed66d7f 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java @@ -48,13 +48,10 @@ import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.client.util.DefaultIdHelper; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter; @@ -116,11 +113,8 @@ public class ShuffleServerWithLocalOfLocalOrderTest extends ShuffleReadWriteBase grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java index 29321b3d5..a77472e6a 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java @@ -45,12 +45,9 @@ import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ChecksumUtils; @@ -110,11 +107,8 @@ public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java index bae77cfec..561c4716d 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java @@ -43,13 +43,10 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ByteBufUtils; @@ -76,7 +73,7 @@ public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { LoggerFactory.getLogger(ShuffleServerWithMemLocalHadoopTest.class); private ShuffleServerGrpcClient grpcShuffleServerClient; private ShuffleServerGrpcNettyClient nettyShuffleServerClient; - private static String REMOTE_STORAGE = HDFS_URI + "rss/test"; + private static String REMOTE_STORAGE = HDFS_URI + "rss/test_%s"; private static ShuffleServerConf grpcShuffleServerConfig; private static ShuffleServerConf nettyShuffleServerConfig; @@ -107,11 +104,8 @@ public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -167,7 +161,10 @@ public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { int partitionId = 0; RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest( - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), REMOTE_STORAGE); + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + String.format(REMOTE_STORAGE, isNettyMode)); shuffleServerClient.registerShuffle(rrsr); Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf(); Map<Long, byte[]> dataMap = Maps.newHashMap(); @@ -216,7 +213,7 @@ public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { 500, expectBlockIds, processBlockIds, - REMOTE_STORAGE, + String.format(REMOTE_STORAGE, isNettyMode), conf); ClientReadHandler[] handlers = new ClientReadHandler[3]; handlers[0] = memoryClientReadHandler; diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java index fb01071f0..a2de8f761 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java @@ -41,13 +41,10 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ByteBufUtils; @@ -112,11 +109,8 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index c905968b3..de96781c8 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -357,6 +357,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase { ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .appId(testAppId) .shuffleId(0) @@ -380,6 +381,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase { assertTrue(commitResult); readClient = ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .appId(testAppId) .shuffleId(0) diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java index ab1b9de1f..c5421d113 100644 --- a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java +++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java @@ -52,6 +52,7 @@ public class DynamicConfTest extends MRIntegrationTestBase { @Override protected void updateRssConfiguration(Configuration jobConf) { + jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name()); jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1); jobConf.setInt(LargeSorter.MBS_PER_MAP, 256); } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java index 29066247e..4f30c4997 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java @@ -47,14 +47,25 @@ public class RepartitionWithLocalFileRssTest extends RepartitionTest { dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); createCoordinatorServer(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); + + ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); File dataDir1 = new File(tmpDir, "data1"); File dataDir2 = new File(tmpDir, "data2"); - String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); - shuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); - shuffleServerConf.setString("rss.storage.basePath", basePath); - createShuffleServer(shuffleServerConf); + String grpcBasePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + grpcShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + grpcShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + grpcShuffleServerConf.setString("rss.storage.basePath", grpcBasePath); + createShuffleServer(grpcShuffleServerConf); + + ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + File dataDir3 = new File(tmpDir, "data3"); + File dataDir4 = new File(tmpDir, "data4"); + String nettyBasePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath(); + nettyShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + nettyShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + nettyShuffleServerConf.setString("rss.storage.basePath", nettyBasePath); + createShuffleServer(nettyShuffleServerConf); + startServers(); } @@ -76,13 +87,19 @@ public class RepartitionWithLocalFileRssTest extends RepartitionTest { Map resultWithoutRss = runSparkApp(sparkConf, fileName); results.add(resultWithoutRss); - updateSparkConfWithRss(sparkConf); + updateSparkConfWithRssGrpc(sparkConf); updateSparkConfCustomer(sparkConf); for (Codec.Type type : new Codec.Type[] {Codec.Type.NOOP, Codec.Type.ZSTD, Codec.Type.LZ4}) { sparkConf.set("spark." + COMPRESSION_TYPE.key().toLowerCase(), type.name()); Map resultWithRss = runSparkApp(sparkConf, fileName); results.add(resultWithRss); } + updateSparkConfWithRssNetty(sparkConf); + for (Codec.Type type : new Codec.Type[] {Codec.Type.NOOP, Codec.Type.ZSTD, Codec.Type.LZ4}) { + sparkConf.set("spark." + COMPRESSION_TYPE.key().toLowerCase(), type.name()); + Map resultWithRss = runSparkApp(sparkConf, fileName); + results.add(resultWithRss); + } for (int i = 1; i < results.size(); i++) { verifyTestResult(results.get(0), results.get(i)); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java index 571907408..3444a1cee 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java @@ -63,7 +63,7 @@ public class RepartitionWithMemoryRssTest extends RepartitionTest { public void testMemoryRelease() throws Exception { final String fileName = generateTextFile(10000, 10000); SparkConf sparkConf = createSparkConf(); - updateSparkConfWithRss(sparkConf); + updateSparkConfWithRssGrpc(sparkConf); sparkConf.set("spark.executor.memory", "500m"); sparkConf.set("spark.unsafe.exceptionOnMemoryLeak", "true"); updateRssStorage(sparkConf); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java index 8414cd0b9..3cf1736c5 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java @@ -141,16 +141,16 @@ public class RssShuffleManagerTest extends SparkIntegrationTestBase { BlockIdLayout clientConfLayout, BlockIdLayout dynamicConfLayout, BlockIdLayout expectedLayout, - boolean enableDynamicCLientConf) + boolean enableDynamicClientConf) throws Exception { Map<String, String> dynamicConf = startServers(dynamicConfLayout); SparkConf conf = createSparkConf(); - updateSparkConfWithRss(conf); + updateSparkConfWithRssGrpc(conf); // enable stage recompute conf.set("spark." + RssClientConfig.RSS_RESUBMIT_STAGE, "true"); // enable dynamic client conf - conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, enableDynamicCLientConf); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, enableDynamicClientConf); // configure storage type conf.set("spark." + RssClientConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name()); // restarting the coordinator may cause RssException: There isn't enough shuffle servers diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java index c134ced35..11e60540e 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java @@ -47,8 +47,6 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.BlockId; import org.apache.uniffle.common.util.BlockIdLayout; @@ -111,11 +109,8 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -140,6 +135,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase { LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .shuffleId(0) .partitionId(0) diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java index 6d48b901c..e1095e260 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java @@ -29,6 +29,8 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ClientType; + import static org.junit.jupiter.api.Assertions.assertEquals; public abstract class SparkIntegrationTestBase extends IntegrationTestBase { @@ -54,24 +56,24 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase { final long durationWithoutRss = System.currentTimeMillis() - start; Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - updateSparkConfWithRss(sparkConf); + updateSparkConfWithRssGrpc(sparkConf); updateSparkConfCustomer(sparkConf); start = System.currentTimeMillis(); - Map resultWithRss = runSparkApp(sparkConf, fileName); - final long durationWithRss = System.currentTimeMillis() - start; + Map resultWithRssGrpc = runSparkApp(sparkConf, fileName); + final long durationWithRssGrpc = System.currentTimeMillis() - start; updateSparkConfWithRssNetty(sparkConf); start = System.currentTimeMillis(); Map resultWithRssNetty = runSparkApp(sparkConf, fileName); final long durationWithRssNetty = System.currentTimeMillis() - start; - verifyTestResult(resultWithoutRss, resultWithRss); + verifyTestResult(resultWithoutRss, resultWithRssGrpc); verifyTestResult(resultWithoutRss, resultWithRssNetty); LOG.info( "Test: durationWithoutRss[" + durationWithoutRss - + "], durationWithRss[" - + durationWithRss + + "], durationWithRssGrpc[" + + durationWithRssGrpc + "]" + "], durationWithRssNetty[" + durationWithRssNetty @@ -90,16 +92,16 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase { spark.close(); } spark = SparkSession.builder().config(sparkConf).getOrCreate(); - Map resultWithRss = runTest(spark, testFileName); + Map result = runTest(spark, testFileName); spark.stop(); - return resultWithRss; + return result; } protected SparkConf createSparkConf() { return new SparkConf().setAppName(this.getClass().getSimpleName()).setMaster("local[4]"); } - public void updateSparkConfWithRss(SparkConf sparkConf) { + public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); sparkConf.set( "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); @@ -118,10 +120,11 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase { sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "1m"); sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000"); sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true"); + sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name()); } public void updateSparkConfWithRssNetty(SparkConf sparkConf) { - sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, "GRPC_NETTY"); + sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY.name()); } protected void verifyTestResult(Map expected, Map actual) { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java index 3b02caff2..1b46e6e95 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java @@ -140,8 +140,19 @@ public class ContinuousSelectPartitionStrategyTest extends SparkIntegrationTestB } @Override - public void updateSparkConfWithRss(SparkConf sparkConf) { - super.updateSparkConfWithRss(sparkConf); + public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { + super.updateSparkConfWithRssGrpc(sparkConf); + addMultiReplicaConf(sparkConf); + } + + @Override + public void updateSparkConfWithRssNetty(SparkConf sparkConf) { + super.updateSparkConfWithRssNetty(sparkConf); + // Add multi replica conf + addMultiReplicaConf(sparkConf); + } + + private static void addMultiReplicaConf(SparkConf sparkConf) { // Add multi replica conf sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(), String.valueOf(replicateWrite)); sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(), String.valueOf(replicateWrite)); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java index 75b43e756..462ee8dab 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java @@ -140,8 +140,8 @@ public class GetShuffleReportForMultiPartTest extends SparkIntegrationTestBase { } @Override - public void updateSparkConfWithRss(SparkConf sparkConf) { - super.updateSparkConfWithRss(sparkConf); + public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { + super.updateSparkConfWithRssGrpc(sparkConf); // Add multi replica conf sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(), String.valueOf(replicateWrite)); sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(), String.valueOf(replicateWrite)); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index f677b6385..0c6860ad1 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,9 +59,14 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { private int nettyPort; private TransportClientFactory clientFactory; + @VisibleForTesting + public ShuffleServerGrpcNettyClient(String host, int grpcPort, int nettyPort) { + this(new RssConf(), host, grpcPort, nettyPort); + } + public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int grpcPort, int nettyPort) { this( - rssConf, + rssConf == null ? new RssConf() : rssConf, host, grpcPort, nettyPort, diff --git a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java index ab9e8ad02..e4b2559aa 100644 --- a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java +++ b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java @@ -48,11 +48,8 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssRegisterShuffleResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.metrics.TestUtils; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; @@ -168,12 +165,9 @@ public class TopNShuffleDataSizeOfAppCalcTaskTest { private void registerAndRequireBuffer(String appId, int length, boolean isNettyMode) throws Exception { - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); ShuffleServerGrpcClient shuffleServerClient = isNettyMode ? new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))
