This is an automated email from the ASF dual-hosted git repository.
zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ffaf3beb7 [#1994] improvement(netty): Add Netty support for TEZ tasks
in Uniffle. (#1995)
ffaf3beb7 is described below
commit ffaf3beb75d19ad101ae2260dd216e044f63e209
Author: QI Jiale <[email protected]>
AuthorDate: Tue Aug 6 16:30:53 2024 +0800
[#1994] improvement(netty): Add Netty support for TEZ tasks in Uniffle.
(#1995)
### What changes were proposed in this pull request?
Add Netty support for TEZ tasks in Uniffle.
### Why are the changes needed?
Fix: #1994
### Does this PR introduce _any_ user-facing change?
Running TEZ tasks with Netty mode is now supported.
### How was this patch tested?
Testing has been conducted using the modified integration tests that now
support Netty.
---
.../tez/common/ShuffleAssignmentsInfoWritable.java | 43 ++++---
.../tez/dag/app/TezRemoteShuffleManager.java | 5 +
.../apache/tez/dag/app/RssDAGAppMasterTest.java | 93 +++++++-------
.../org/apache/uniffle/test/TezHashJoinTest.java | 23 ++--
.../uniffle/test/TezIntegrationTestBase.java | 64 +++++-----
.../uniffle/test/TezJoinIntegrationTestBase.java | 18 +--
.../uniffle/test/TezSimpleSessionExampleTest.java | 13 +-
.../apache/uniffle/test/TezSortMergeJoinTest.java | 6 +-
.../uniffle/test/TezWordCountWithFailuresTest.java | 139 +++++++++------------
9 files changed, 197 insertions(+), 207 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
b/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
index 01c9cf7ad..7e4ce89ad 100644
---
a/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
+++
b/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
@@ -68,9 +68,7 @@ public class ShuffleAssignmentsInfoWritable implements
Writable {
} else {
dataOutput.writeInt(entry.getValue().size());
for (ShuffleServerInfo serverInfo : entry.getValue()) {
- dataOutput.writeUTF(serverInfo.getId());
- dataOutput.writeUTF(serverInfo.getHost());
- dataOutput.writeInt(serverInfo.getGrpcPort());
+ writeShuffleServerInfo(dataOutput, serverInfo);
}
}
}
@@ -84,9 +82,7 @@ public class ShuffleAssignmentsInfoWritable implements
Writable {
dataOutput.writeInt(serverToPartitionRanges.size());
for (Map.Entry<ShuffleServerInfo, List<PartitionRange>> entry :
serverToPartitionRanges.entrySet()) {
- dataOutput.writeUTF(entry.getKey().getId());
- dataOutput.writeUTF(entry.getKey().getHost());
- dataOutput.writeInt(entry.getKey().getGrpcPort());
+ writeShuffleServerInfo(dataOutput, entry.getKey());
if (CollectionUtils.isEmpty(entry.getValue())) {
dataOutput.writeInt(-1);
} else {
@@ -110,18 +106,13 @@ public class ShuffleAssignmentsInfoWritable implements
Writable {
Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
int partitionToServersSize = dataInput.readInt();
if (partitionToServersSize != -1) {
- Integer partitionId;
for (int i = 0; i < partitionToServersSize; i++) {
- partitionId = dataInput.readInt();
+ int partitionId = dataInput.readInt();
List<ShuffleServerInfo> shuffleServerInfoList = new ArrayList<>();
int shuffleServerInfoListSize = dataInput.readInt();
if (shuffleServerInfoListSize != -1) {
for (int i1 = 0; i1 < shuffleServerInfoListSize; i1++) {
- String id = dataInput.readUTF();
- String host = dataInput.readUTF();
- int port = dataInput.readInt();
- ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo(id,
host, port);
- shuffleServerInfoList.add(shuffleServerInfo);
+ shuffleServerInfoList.add(getShuffleServerInfo(dataInput));
}
}
@@ -133,14 +124,8 @@ public class ShuffleAssignmentsInfoWritable implements
Writable {
int serverToPartitionRangesSize = dataInput.readInt();
if (serverToPartitionRangesSize != -1) {
for (int i = 0; i < serverToPartitionRangesSize; i++) {
- ShuffleServerInfo shuffleServerInfo;
List<PartitionRange> partitionRangeList = new ArrayList<>();
-
- String id = dataInput.readUTF();
- String host = dataInput.readUTF();
- int port = dataInput.readInt();
- shuffleServerInfo = new ShuffleServerInfo(id, host, port);
-
+ ShuffleServerInfo shuffleServerInfo = getShuffleServerInfo(dataInput);
int partitionRangeListSize = dataInput.readInt();
if (partitionRangeListSize != -1) {
for (int i1 = 0; i1 < partitionRangeListSize; i1++) {
@@ -158,6 +143,24 @@ public class ShuffleAssignmentsInfoWritable implements
Writable {
new ShuffleAssignmentsInfo(partitionToServers,
serverToPartitionRanges);
}
+ private ShuffleServerInfo getShuffleServerInfo(DataInput dataInput) throws
IOException {
+ ShuffleServerInfo shuffleServerInfo;
+ String id = dataInput.readUTF();
+ String host = dataInput.readUTF();
+ int grpcPort = dataInput.readInt();
+ int nettyPort = dataInput.readInt();
+ shuffleServerInfo = new ShuffleServerInfo(id, host, grpcPort, nettyPort);
+ return shuffleServerInfo;
+ }
+
+ private void writeShuffleServerInfo(DataOutput dataOutput, ShuffleServerInfo
shuffleServerInfo)
+ throws IOException {
+ dataOutput.writeUTF(shuffleServerInfo.getId());
+ dataOutput.writeUTF(shuffleServerInfo.getHost());
+ dataOutput.writeInt(shuffleServerInfo.getGrpcPort());
+ dataOutput.writeInt(shuffleServerInfo.getNettyPort());
+ }
+
public ShuffleAssignmentsInfo getShuffleAssignmentsInfo() {
return shuffleAssignmentsInfo;
}
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index b37d9486f..bf376814e 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -243,6 +244,10 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
+ String clientType =
+ conf.get(RssTezConfig.RSS_CLIENT_TYPE,
RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+ ClientUtils.validateClientType(clientType);
+ assignmentTags.add(clientType);
try {
shuffleAssignmentsInfo =
diff --git
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index ed631e534..563c5735b 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
@@ -38,7 +39,6 @@ import
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.AsyncDispatcher;
@@ -86,9 +86,13 @@ import
org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.util.StorageType;
@@ -115,6 +119,11 @@ public class RssDAGAppMasterTest {
RssDAGAppMasterTest.class.getSimpleName())
.getAbsoluteFile();
+ static Stream<Arguments> clientTypeProvider() {
+ return Stream.of(
+ Arguments.of(ClientType.GRPC.name()),
Arguments.of(ClientType.GRPC_NETTY.name()));
+ }
+
@Test
public void testDagStateChangeCallback() throws Exception {
// 1 Init and mock some basic module
@@ -142,7 +151,7 @@ public class RssDAGAppMasterTest {
clientConf.set(RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
clientConf.set("tez.config1", "value1");
clientConf.set("config2", "value2");
- Map<String, String> dynamicConf = new HashMap();
+ Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
dynamicConf.put("tez.config3", "value3");
when(appMaster.getClusterClientConf()).thenReturn(dynamicConf);
@@ -174,14 +183,7 @@ public class RssDAGAppMasterTest {
RssDAGAppMaster.registerStateEnteredCallback(dagImpl, appMaster);
// 5 register DAGEvent, init and start dispatcher
- EventHandler<DAGEvent> dagEventDispatcher =
- new EventHandler<DAGEvent>() {
- @Override
- public void handle(DAGEvent event) {
- dagImpl.handle(event);
- }
- };
- dispatcher.register(DAGEventType.class, dagEventDispatcher);
+ dispatcher.register(DAGEventType.class, dagImpl);
dispatcher.init(conf);
dispatcher.start();
@@ -209,6 +211,19 @@ public class RssDAGAppMasterTest {
verify(shuffleManager, times(1)).unregisterShuffleByDagId(dagId);
}
+ private static void verifyCommonAssertions(
+ Configuration conf, int expectedSourceVertexId, int
expectedDestinationVertexId) {
+ Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
+ Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
+ Assertions.assertEquals(StorageType.LOCALFILE.name(),
conf.get(RSS_STORAGE_TYPE));
+ Assertions.assertEquals("value1", conf.get("tez.config1"));
+ Assertions.assertEquals("value3", conf.get("tez.config3"));
+ Assertions.assertNull(conf.get("tez.config2"));
+ Assertions.assertEquals(expectedSourceVertexId,
conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1));
+ Assertions.assertEquals(
+ expectedDestinationVertexId,
conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1));
+ }
+
public static void verifyInput(
DAGImpl dag,
String name,
@@ -216,21 +231,13 @@ public class RssDAGAppMasterTest {
int expectedSourceVertexId,
int expectedDestinationVertexId)
throws Exception {
- // 1 verfiy rename rss io class name
List<InputSpec> inputSpecs = dag.getVertex(name).getInputSpecList(0);
Assertions.assertEquals(1, inputSpecs.size());
Assertions.assertEquals(
expectedInputClassName,
inputSpecs.get(0).getInputDescriptor().getClassName());
- // 2 verfiy the address and port of shuffle manager
UserPayload payload =
inputSpecs.get(0).getInputDescriptor().getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(payload);
- Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
- Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
- // 3 verfiy the config
- Assertions.assertEquals(StorageType.LOCALFILE.name(),
conf.get(RSS_STORAGE_TYPE));
- Assertions.assertEquals("value1", conf.get("tez.config1"));
- Assertions.assertEquals("value3", conf.get("tez.config3"));
- Assertions.assertNull(conf.get("tez.config2"));
+ verifyCommonAssertions(conf, expectedSourceVertexId,
expectedDestinationVertexId);
// TEZ_RUNTIME_IFILE_READAHEAD_BYTES is in getConfigurationKeySet, so the
config from client
// should deliver
// to Input/Output. But tez.config.from.client is not in
getConfigurationKeySet, so the config
@@ -238,10 +245,6 @@ public class RssDAGAppMasterTest {
// should not deliver to Input/Output.
Assertions.assertEquals(12345,
conf.getInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, -1));
Assertions.assertNull(conf.get("tez.config.from.client"));
- // 4 verfiy vertex id
- Assertions.assertEquals(expectedSourceVertexId,
conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1));
- Assertions.assertEquals(
- expectedDestinationVertexId,
conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1));
}
public static void verifyOutput(
@@ -251,28 +254,16 @@ public class RssDAGAppMasterTest {
int expectedSourceVertexId,
int expectedDestinationVertexId)
throws Exception {
- // 1 verfiy rename rss io class name
List<OutputSpec> outputSpecs = dag.getVertex(name).getOutputSpecList(0);
Assertions.assertEquals(1, outputSpecs.size());
Assertions.assertEquals(
expectedOutputClassName,
outputSpecs.get(0).getOutputDescriptor().getClassName());
- // 2 verfiy the address and port of shuffle manager
UserPayload payload =
outputSpecs.get(0).getOutputDescriptor().getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(payload);
- Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
- Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
- // 3 verfiy the config
- Assertions.assertEquals(StorageType.LOCALFILE.name(),
conf.get(RSS_STORAGE_TYPE));
- Assertions.assertEquals("value1", conf.get("tez.config1"));
- Assertions.assertEquals("value3", conf.get("tez.config3"));
- Assertions.assertNull(conf.get("tez.config2"));
- // 4 verfiy vertex id
- Assertions.assertEquals(expectedSourceVertexId,
conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1));
- Assertions.assertEquals(
- expectedDestinationVertexId,
conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1));
+ verifyCommonAssertions(conf, expectedSourceVertexId,
expectedDestinationVertexId);
}
- private static DAG createDAG(String dageName, Configuration conf) {
+ private static DAG createDAG(String dagName, Configuration conf) {
conf.setInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 12345);
conf.set("tez.config.from.client", "value.from.client");
@@ -308,7 +299,7 @@ public class RssDAGAppMasterTest {
.setFromConfiguration(conf)
.build();
- DAG dag = DAG.create(dageName);
+ DAG dag = DAG.create(dagName);
dag.addVertex(vertex1)
.addVertex(vertex2)
.addVertex(vertex3)
@@ -387,8 +378,9 @@ public class RssDAGAppMasterTest {
}
}
- @Test
- public void testFetchRemoteStorageFromDynamicConf() throws Exception {
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void testFetchRemoteStorageFromDynamicConf(String clientType) throws
Exception {
final ApplicationId appId = ApplicationId.newInstance(1, 1);
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
TezConfiguration conf = new TezConfiguration();
@@ -396,7 +388,7 @@ public class RssDAGAppMasterTest {
Credentials amCreds = new Credentials();
JobTokenSecretManager jtsm = new JobTokenSecretManager();
JobTokenIdentifier identifier = new JobTokenIdentifier(new
Text(appId.toString()));
- Token<JobTokenIdentifier> sessionToken = new
Token<JobTokenIdentifier>(identifier, jtsm);
+ Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jtsm);
sessionToken.setService(identifier.getJobId());
TokenCache.setSessionToken(sessionToken, amCreds);
@@ -424,7 +416,7 @@ public class RssDAGAppMasterTest {
amCreds,
"someuser",
null);
- appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(1));
+ appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(1,
clientType));
appMaster.init(conf);
Configuration mergedConf = new Configuration(false);
@@ -437,8 +429,9 @@ public class RssDAGAppMasterTest {
Assertions.assertEquals("testvalue",
mergedConf.get("tez.rss.test.config"));
}
- @Test
- public void testFetchRemoteStorageFromCoordinator() throws Exception {
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void testFetchRemoteStorageFromCoordinator(String clientType) throws
Exception {
final ApplicationId appId = ApplicationId.newInstance(1, 1);
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
TezConfiguration conf = new TezConfiguration();
@@ -446,7 +439,7 @@ public class RssDAGAppMasterTest {
Credentials amCreds = new Credentials();
JobTokenSecretManager jtsm = new JobTokenSecretManager();
JobTokenIdentifier identifier = new JobTokenIdentifier(new
Text(appId.toString()));
- Token<JobTokenIdentifier> sessionToken = new
Token<JobTokenIdentifier>(identifier, jtsm);
+ Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jtsm);
sessionToken.setService(identifier.getJobId());
TokenCache.setSessionToken(sessionToken, amCreds);
@@ -474,7 +467,7 @@ public class RssDAGAppMasterTest {
amCreds,
"someuser",
null);
- appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(2));
+ appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(2,
clientType));
appMaster.init(conf);
Configuration mergedConf = new Configuration(false);
@@ -495,12 +488,12 @@ public class RssDAGAppMasterTest {
* Mode 2: rss.remote.storage.path and rss.remote.storage.conf is not set
by dynamic config,
* appMaster will fetch remote storage conf from coordinator.
* */
- private int mode;
+ private final int mode;
- FakedShuffleWriteClient(int mode) {
+ FakedShuffleWriteClient(int mode, String clientType) {
super(
ShuffleClientFactory.newWriteBuilder()
- .clientType("GRPC")
+ .clientType(clientType)
.retryMax(1)
.retryIntervalMax(1)
.heartBeatThreadNum(10)
@@ -521,7 +514,7 @@ public class RssDAGAppMasterTest {
@Override
public Map<String, String> fetchClientConf(int timeoutMs) {
- Map<String, String> clientConf = new HashMap();
+ Map<String, String> clientConf = new HashMap<>();
if (mode == 1) {
clientConf.put("rss.remote.storage.path", "hdfs://ns1/rss/");
clientConf.put("rss.remote.storage.conf", "key1=value1,key2=value2");
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
index 56cc55553..2576d926b 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
@@ -30,20 +30,15 @@ public class TezHashJoinTest extends
TezJoinIntegrationTestBase {
public void hashJoinTest() throws Exception {
generateInputFile();
fs.delete(new Path(HASH_JOIN_OUTPUT_PATH), true);
- run(getTestArgs(HASH_JOIN_OUTPUT_PATH));
+ run(HASH_JOIN_OUTPUT_PATH);
}
@Test
public void hashJoinDoBroadcastTest() throws Exception {
generateInputFile();
- String[] orignal = getTestArgs(HASH_JOIN_OUTPUT_PATH);
- String[] args = new String[orignal.length + 1];
- for (int i = 0; i < orignal.length; i++) {
- args[i] = orignal[i];
- }
- args[orignal.length] = "doBroadcast";
- fs.delete(new Path(HASH_JOIN_OUTPUT_PATH), true);
- run(args);
+ String path = HASH_JOIN_OUTPUT_PATH + "_broadcast";
+ fs.delete(new Path(path), true);
+ run(path);
}
@Override
@@ -53,11 +48,17 @@ public class TezHashJoinTest extends
TezJoinIntegrationTestBase {
@Override
public String[] getTestArgs(String uniqueOutputName) {
- return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2",
HASH_JOIN_OUTPUT_PATH};
+ if (uniqueOutputName.contains("broadcast")) {
+ return new String[] {
+ STREAM_INPUT_PATH, HASH_INPUT_PATH, "2", uniqueOutputName,
"doBroadcast"
+ };
+ } else {
+ return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2",
uniqueOutputName};
+ }
}
@Override
public String getOutputDir(String uniqueOutputName) {
- return HASH_JOIN_OUTPUT_PATH;
+ return uniqueOutputName;
}
}
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
index befa50c58..cdaffffb1 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
@@ -61,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class TezIntegrationTestBase extends IntegrationTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TezIntegrationTestBase.class);
- private static String TEST_ROOT_DIR =
+ private static final String TEST_ROOT_DIR =
"target" + Path.SEPARATOR + TezIntegrationTestBase.class.getName() +
"-tmpDir";
private Path remoteStagingDir = null;
@@ -75,20 +75,22 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
miniTezCluster.init(conf);
miniTezCluster.start();
}
- LOG.info("Starting corrdinators and shuffer servers");
+ LOG.info("Starting coordinators and shuffle servers");
CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = new HashMap();
+ Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(shuffleServerConf);
+ ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ createShuffleServer(grpcShuffleServerConf);
+ ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ createShuffleServer(nettyShuffleServerConf);
startServers();
}
@AfterAll
- public static void tearDown() throws Exception {
+ public static void tearDown() {
if (miniTezCluster != null) {
LOG.info("Stopping MiniTezCluster");
miniTezCluster.stop();
@@ -117,25 +119,25 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
runTezApp(appConf, getTestTool(), getTestArgs("origin"));
final String originPath = getOutputDir("origin");
- // 2 Run Tez examples based on rss
- appConf = new TezConfiguration(miniTezCluster.getConfig());
- updateRssConfiguration(appConf);
- appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestTool(), getTestArgs("rss"));
- final String rssPath = getOutputDir("rss");
+ // Run RSS tests with different configurations
+ runRssTest(ClientType.GRPC, null, "rss-grpc", originPath);
+ runRssTest(ClientType.GRPC, "/tmp/spill-grpc", "rss-spill-grpc",
originPath);
+ runRssTest(ClientType.GRPC_NETTY, null, "rss-netty", originPath);
+ runRssTest(ClientType.GRPC_NETTY, "/tmp/spill-netty", "rss-spill-netty",
originPath);
+ }
- // 3 Run Tez examples base on rss with remote spill enable
- appConf = new TezConfiguration(miniTezCluster.getConfig());
- appConf.setBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, true);
- appConf.set(RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH, "/tmp/spill");
- updateRssConfiguration(appConf);
+ private void runRssTest(
+ ClientType clientType, String spillPath, String testName, String
originPath)
+ throws Exception {
+ TezConfiguration appConf = new
TezConfiguration(miniTezCluster.getConfig());
+ if (spillPath != null) {
+ appConf.setBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, true);
+ appConf.set(RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH, spillPath);
+ }
+ updateRssConfiguration(appConf, clientType);
appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestTool(), getTestArgs("rss-spill"));
- final String rssPathSpill = getOutputDir("rss-spill");
-
- // 4 verify the results
- verifyResults(originPath, rssPath);
- verifyResults(originPath, rssPathSpill);
+ runTezApp(appConf, getTestTool(), getTestArgs(testName));
+ verifyResults(originPath, getOutputDir(testName));
}
public Tool getTestTool() {
@@ -165,14 +167,14 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
}
- public void updateRssConfiguration(Configuration appConf) throws Exception {
+ public void updateRssConfiguration(Configuration appConf, ClientType
clientType) {
appConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
appConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 512);
appConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx384m");
appConf.setInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, 512);
appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
- appConf.set(RssTezConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name());
appConf.set(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT + " " +
RssDAGAppMaster.class.getName());
@@ -221,8 +223,8 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
FileStatus[] rssFiles = fs.listStatus(rssPathFs);
long originLen = 0;
long rssLen = 0;
- List<String> originFileList = new ArrayList();
- List<String> rssFileList = new ArrayList();
+ List<String> originFileList = new ArrayList<>();
+ List<String> rssFileList = new ArrayList<>();
for (FileStatus file : originFiles) {
originLen += file.getLen();
String name = file.getPath().getName();
@@ -297,8 +299,8 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
// 2 Load original result and rss result to hashmap
Map<String, Integer> originalResults = new HashMap<>();
- for (int i = 0; i < originFileList.size(); i++) {
- Path path = new Path(originPath, originFileList.get(i));
+ for (String file : originFileList) {
+ Path path = new Path(originPath, file);
LineReader lineReader = new LineReader(fs.open(path));
Text line = new Text();
while (lineReader.readLine(line) > 0) {
@@ -311,8 +313,8 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
}
Map<String, Integer> rssResults = new HashMap<>();
- for (int i = 0; i < rssFileList.size(); i++) {
- Path path = new Path(rssPath, rssFileList.get(i));
+ for (String file : rssFileList) {
+ Path path = new Path(rssPath, file);
LineReader lineReader = new LineReader(fs.open(path));
Text line = new Text();
while (lineReader.readLine(line) > 0) {
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
index e7e78b37b..641e40d34 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
@@ -23,6 +23,8 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidate;
+import org.apache.uniffle.common.ClientType;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TezJoinIntegrationTestBase extends TezIntegrationTestBase {
@@ -61,14 +63,16 @@ public class TezJoinIntegrationTestBase extends
TezIntegrationTestBase {
assertEquals(0, ToolRunner.run(appConf, validate, args), "JoinValidate
failed");
}
- public void run(String[] overrideArgs) throws Exception {
- // 1 Run Tez examples based on rss
+ public void run(String path) throws Exception {
+ runForClientType(ClientType.GRPC, path);
+ runForClientType(ClientType.GRPC_NETTY, path + "_netty");
+ }
+
+ private void runForClientType(ClientType clientType, String path) throws
Exception {
TezConfiguration appConf = new
TezConfiguration(miniTezCluster.getConfig());
- updateRssConfiguration(appConf);
+ updateRssConfiguration(appConf, clientType);
appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestTool(), overrideArgs);
-
- // 2 check the result
- verifyResults(JOIN_EXPECTED_PATH, getOutputDir(""));
+ runTezApp(appConf, getTestTool(), getTestArgs(path));
+ verifyResults(JOIN_EXPECTED_PATH, getOutputDir(path));
}
}
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
index 4add6e887..788465d61 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
@@ -33,13 +33,14 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.examples.SimpleSessionExample;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.exception.RssException;
public class TezSimpleSessionExampleTest extends TezIntegrationTestBase {
- private String inputPath = "simple_session_input";
- private String outputPath = "simple_session_output";
- private List<String> wordTable =
+ private final String inputPath = "simple_session_input";
+ private final String outputPath = "simple_session_output";
+ private final List<String> wordTable =
Lists.newArrayList(
"apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan",
"tomato");
@@ -56,8 +57,8 @@ public class TezSimpleSessionExampleTest extends
TezIntegrationTestBase {
}
@Override
- public void updateRssConfiguration(Configuration appConf) throws Exception {
- super.updateRssConfiguration(appConf);
+ public void updateRssConfiguration(Configuration appConf, ClientType
clientType) {
+ super.updateRssConfiguration(appConf, clientType);
appConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
}
@@ -74,7 +75,7 @@ public class TezSimpleSessionExampleTest extends
TezIntegrationTestBase {
// To keep pace with verifyResults, here make sure summation of word is
unique number.
FSDataOutputStream outputStream = fs.create(new Path(inputPath));
Random random = new Random();
- Set<Integer> used = new HashSet();
+ Set<Integer> used = new HashSet<>();
List<String> outputList = new ArrayList<>();
int index = 0;
while (index < wordTable.size()) {
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
index b31bfa8f6..e6b5488bb 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
@@ -28,7 +28,7 @@ public class TezSortMergeJoinTest extends
TezJoinIntegrationTestBase {
@Test
public void sortMergeJoinTest() throws Exception {
generateInputFile();
- run(getTestArgs(""));
+ run(SORT_MERGE_JOIN_OUTPUT_PATH);
}
@Override
@@ -38,11 +38,11 @@ public class TezSortMergeJoinTest extends
TezJoinIntegrationTestBase {
@Override
public String[] getTestArgs(String uniqueOutputName) {
- return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2",
SORT_MERGE_JOIN_OUTPUT_PATH};
+ return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2",
uniqueOutputName};
}
@Override
public String getOutputDir(String uniqueOutputName) {
- return SORT_MERGE_JOIN_OUTPUT_PATH;
+ return uniqueOutputName;
}
}
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
index d31d842ab..4e515eea4 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
@@ -75,14 +75,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TezWordCountWithFailuresTest extends IntegrationTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TezIntegrationTestBase.class);
- private static String TEST_ROOT_DIR =
+ private static final Logger LOG =
LoggerFactory.getLogger(TezWordCountWithFailuresTest.class);
+ private static final String TEST_ROOT_DIR =
"target" + Path.SEPARATOR + TezWordCountWithFailuresTest.class.getName()
+ "-tmpDir";
private Path remoteStagingDir = null;
- private String inputPath = "word_count_input";
- private String outputPath = "word_count_output";
- private List<String> wordTable =
+ private final String inputPath = "word_count_input";
+ private final String outputPath = "word_count_output";
+ private final List<String> wordTable =
Lists.newArrayList(
"apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan",
"tomato");
@@ -96,20 +96,22 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
miniTezCluster.init(conf);
miniTezCluster.start();
}
- LOG.info("Starting corrdinators and shuffer servers");
+ LOG.info("Starting coordinators and shuffle servers");
CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = new HashMap();
+ Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(shuffleServerConf);
+ ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ createShuffleServer(grpcShuffleServerConf);
+ ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ createShuffleServer(nettyShuffleServerConf);
startServers();
}
@AfterAll
- public static void tearDown() throws Exception {
+ public static void tearDown() {
if (miniTezCluster != null) {
LOG.info("Stopping MiniTezCluster");
miniTezCluster.stop();
@@ -155,89 +157,68 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
@Test
public void wordCountTestWithTaskFailureWhenAvoidRecomputeEnable() throws
Exception {
- // 1 Run Tez examples based on rss
- TezConfiguration appConf = new
TezConfiguration(miniTezCluster.getConfig());
- updateRssConfiguration(appConf, 0, true, false, 1);
- TezIntegrationTestBase.appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestArgs("rss"), 0);
- final String rssPath = getOutputDir("rss");
-
- // 2 Run original Tez examples
- appConf = new TezConfiguration(miniTezCluster.getConfig());
- updateCommonConfiguration(appConf);
- runTezApp(appConf, getTestArgs("origin"), -1);
- final String originPath = getOutputDir("origin");
-
- // 3 verify the results
- TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
+ runWordCountTestForBothClientTypes(true, false, 1);
}
@Test
public void wordCountTestWithTaskFailureWhenAvoidRecomputeDisable() throws
Exception {
- // 1 Run Tez examples based on rss
- TezConfiguration appConf = new
TezConfiguration(miniTezCluster.getConfig());
- updateRssConfiguration(appConf, 0, false, false, 1);
- TezIntegrationTestBase.appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestArgs("rss"), 1);
- final String rssPath = getOutputDir("rss");
-
- // 2 Run original Tez examples
- appConf = new TezConfiguration(miniTezCluster.getConfig());
- updateCommonConfiguration(appConf);
- runTezApp(appConf, getTestArgs("origin"), -1);
- final String originPath = getOutputDir("origin");
-
- // 3 verify the results
- TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
+ runWordCountTestForBothClientTypes(false, false, 1);
}
@Test
public void wordCountTestWithNodeUnhealthyWhenAvoidRecomputeEnable() throws
Exception {
- // 1 Run Tez examples based on rss
- TezConfiguration appConf = new
TezConfiguration(miniTezCluster.getConfig());
- updateRssConfiguration(appConf, 1, true, true, 100);
- TezIntegrationTestBase.appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestArgs("rss"), 0);
- final String rssPath = getOutputDir("rss");
-
- // 2 Run original Tez examples
- appConf = new TezConfiguration(miniTezCluster.getConfig());
- updateCommonConfiguration(appConf);
- runTezApp(appConf, getTestArgs("origin"), -1);
- final String originPath = getOutputDir("origin");
-
- // 3 verify the results
- TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
+ runWordCountTestForBothClientTypes(true, true, 100);
}
@Test
public void wordCountTestWithNodeUnhealthyWhenAvoidRecomputeDisable() throws
Exception {
- // 1 Run Tez examples based on rss
- TezConfiguration appConf = new
TezConfiguration(miniTezCluster.getConfig());
- updateRssConfiguration(appConf, 1, false, true, 100);
- TezIntegrationTestBase.appendAndUploadRssJars(appConf);
- runTezApp(appConf, getTestArgs("rss"), 1);
- final String rssPath = getOutputDir("rss");
-
- // 2 Run original Tez examples
- appConf = new TezConfiguration(miniTezCluster.getConfig());
- updateCommonConfiguration(appConf);
- runTezApp(appConf, getTestArgs("origin"), -1);
- final String originPath = getOutputDir("origin");
-
- // 3 verify the results
+ runWordCountTestForBothClientTypes(false, true, 100);
+ }
+
+ private void runWordCountTestForBothClientTypes(
+ boolean avoidRecompute, boolean rescheduleWhenUnhealthy, int
maxFailures) throws Exception {
+ String originPath = runOriginalWordCount();
+ runWordCountTest(
+ ClientType.GRPC, avoidRecompute, rescheduleWhenUnhealthy, maxFailures,
originPath);
+ runWordCountTest(
+ ClientType.GRPC_NETTY, avoidRecompute, rescheduleWhenUnhealthy,
maxFailures, originPath);
+ }
+
+ private void runWordCountTest(
+ ClientType clientType,
+ boolean avoidRecompute,
+ boolean rescheduleWhenUnhealthy,
+ int maxFailures,
+ String originPath)
+ throws Exception {
+ int testMode = rescheduleWhenUnhealthy ? 1 : 0;
+ int expectedVerifyMode = avoidRecompute ? 0 : 1;
+ TezConfiguration rssConf = new
TezConfiguration(miniTezCluster.getConfig());
+ updateRssConfiguration(
+ rssConf, testMode, avoidRecompute, rescheduleWhenUnhealthy,
maxFailures, clientType);
+ TezIntegrationTestBase.appendAndUploadRssJars(rssConf);
+ String testName = "rss-" + clientType.name().toLowerCase();
+ runTezApp(rssConf, getTestArgs(testName), expectedVerifyMode);
+ String rssPath = getOutputDir(testName);
TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
}
+ private String runOriginalWordCount() throws Exception {
+ TezConfiguration originalConf = new
TezConfiguration(miniTezCluster.getConfig());
+ updateCommonConfiguration(originalConf);
+ runTezApp(originalConf, getTestArgs("origin"), -1);
+ return getOutputDir("origin");
+ }
+
/*
* Two verify mode are supported:
* (a) verifyMode 0
* tez.rss.avoid.recompute.succeeded.task is enable, should not
recompute the task when this node is
- * blacke-listed for unhealthy.
+ * black-listed for unhealthy.
*
* (b) verifyMode 1
* tez.rss.avoid.recompute.succeeded.task is disable, will recompute the
task when this node is
- * blacke-listed for unhealthy.
+ * black-listed for unhealthy.
* */
protected void runTezApp(TezConfiguration tezConf, String[] args, int
verifyMode)
throws Exception {
@@ -260,16 +241,16 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
/*
* In this integration test, mini cluster have three NM with 4G
* (YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB). The request of
am is 4G, the request of task is 2G.
- * It means that one node only runs one am container so that won't lable the
node which am container runs as
- * black-list or uhealthy node.
+ * It means that one node only runs one am container so that won't label the
node which am container runs as
+ * black-list or unhealthy node.
* */
public void updateRssConfiguration(
Configuration appConf,
int testMode,
boolean avoidRecompute,
boolean rescheduleWhenUnhealthy,
- int maxFailures)
- throws Exception {
+ int maxFailures,
+ ClientType clientType) {
appConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
appConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 4096);
appConf.setInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, 4096);
@@ -277,7 +258,7 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
appConf.setInt(TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 99);
appConf.setInt(TEZ_AM_MAX_TASK_FAILURES_PER_NODE, maxFailures);
appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
- appConf.set(RssTezConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name());
appConf.set(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT
@@ -297,11 +278,11 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
}
- public class WordCountWithFailures extends WordCount {
+ public static class WordCountWithFailures extends WordCount {
TezClient tezClientInternal = null;
private HadoopShim hadoopShim;
- int verifyMode = -1;
+ int verifyMode;
WordCountWithFailures(int assertMode) {
this.verifyMode = assertMode;
@@ -353,7 +334,7 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
DAGStatus dagStatus =
dagClient.waitForCompletionWithStatusUpdates(getOpts);
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ logger.info("DAG diagnostics: {}", dagStatus.getDiagnostics());
return -1;
}