This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ffaf3beb7 [#1994] improvement(netty): Add Netty support for TEZ tasks 
in Uniffle. (#1995)
ffaf3beb7 is described below

commit ffaf3beb75d19ad101ae2260dd216e044f63e209
Author: QI Jiale <[email protected]>
AuthorDate: Tue Aug 6 16:30:53 2024 +0800

    [#1994] improvement(netty): Add Netty support for TEZ tasks in Uniffle. 
(#1995)
    
    ### What changes were proposed in this pull request?
    
    Add Netty support for TEZ tasks in Uniffle.
    
    ### Why are the changes needed?
    
    Fix: #1994
    
    ### Does this PR introduce _any_ user-facing change?
    
    Running TEZ tasks with Netty mode is now supported.
    
    ### How was this patch tested?
    
    Testing has been conducted using the modified integration tests that now 
support Netty.
---
 .../tez/common/ShuffleAssignmentsInfoWritable.java |  43 ++++---
 .../tez/dag/app/TezRemoteShuffleManager.java       |   5 +
 .../apache/tez/dag/app/RssDAGAppMasterTest.java    |  93 +++++++-------
 .../org/apache/uniffle/test/TezHashJoinTest.java   |  23 ++--
 .../uniffle/test/TezIntegrationTestBase.java       |  64 +++++-----
 .../uniffle/test/TezJoinIntegrationTestBase.java   |  18 +--
 .../uniffle/test/TezSimpleSessionExampleTest.java  |  13 +-
 .../apache/uniffle/test/TezSortMergeJoinTest.java  |   6 +-
 .../uniffle/test/TezWordCountWithFailuresTest.java | 139 +++++++++------------
 9 files changed, 197 insertions(+), 207 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
 
b/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
index 01c9cf7ad..7e4ce89ad 100644
--- 
a/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
+++ 
b/client-tez/src/main/java/org/apache/tez/common/ShuffleAssignmentsInfoWritable.java
@@ -68,9 +68,7 @@ public class ShuffleAssignmentsInfoWritable implements 
Writable {
         } else {
           dataOutput.writeInt(entry.getValue().size());
           for (ShuffleServerInfo serverInfo : entry.getValue()) {
-            dataOutput.writeUTF(serverInfo.getId());
-            dataOutput.writeUTF(serverInfo.getHost());
-            dataOutput.writeInt(serverInfo.getGrpcPort());
+            writeShuffleServerInfo(dataOutput, serverInfo);
           }
         }
       }
@@ -84,9 +82,7 @@ public class ShuffleAssignmentsInfoWritable implements 
Writable {
       dataOutput.writeInt(serverToPartitionRanges.size());
       for (Map.Entry<ShuffleServerInfo, List<PartitionRange>> entry :
           serverToPartitionRanges.entrySet()) {
-        dataOutput.writeUTF(entry.getKey().getId());
-        dataOutput.writeUTF(entry.getKey().getHost());
-        dataOutput.writeInt(entry.getKey().getGrpcPort());
+        writeShuffleServerInfo(dataOutput, entry.getKey());
         if (CollectionUtils.isEmpty(entry.getValue())) {
           dataOutput.writeInt(-1);
         } else {
@@ -110,18 +106,13 @@ public class ShuffleAssignmentsInfoWritable implements 
Writable {
     Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
     int partitionToServersSize = dataInput.readInt();
     if (partitionToServersSize != -1) {
-      Integer partitionId;
       for (int i = 0; i < partitionToServersSize; i++) {
-        partitionId = dataInput.readInt();
+        int partitionId = dataInput.readInt();
         List<ShuffleServerInfo> shuffleServerInfoList = new ArrayList<>();
         int shuffleServerInfoListSize = dataInput.readInt();
         if (shuffleServerInfoListSize != -1) {
           for (int i1 = 0; i1 < shuffleServerInfoListSize; i1++) {
-            String id = dataInput.readUTF();
-            String host = dataInput.readUTF();
-            int port = dataInput.readInt();
-            ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo(id, 
host, port);
-            shuffleServerInfoList.add(shuffleServerInfo);
+            shuffleServerInfoList.add(getShuffleServerInfo(dataInput));
           }
         }
 
@@ -133,14 +124,8 @@ public class ShuffleAssignmentsInfoWritable implements 
Writable {
     int serverToPartitionRangesSize = dataInput.readInt();
     if (serverToPartitionRangesSize != -1) {
       for (int i = 0; i < serverToPartitionRangesSize; i++) {
-        ShuffleServerInfo shuffleServerInfo;
         List<PartitionRange> partitionRangeList = new ArrayList<>();
-
-        String id = dataInput.readUTF();
-        String host = dataInput.readUTF();
-        int port = dataInput.readInt();
-        shuffleServerInfo = new ShuffleServerInfo(id, host, port);
-
+        ShuffleServerInfo shuffleServerInfo = getShuffleServerInfo(dataInput);
         int partitionRangeListSize = dataInput.readInt();
         if (partitionRangeListSize != -1) {
           for (int i1 = 0; i1 < partitionRangeListSize; i1++) {
@@ -158,6 +143,24 @@ public class ShuffleAssignmentsInfoWritable implements 
Writable {
         new ShuffleAssignmentsInfo(partitionToServers, 
serverToPartitionRanges);
   }
 
+  private ShuffleServerInfo getShuffleServerInfo(DataInput dataInput) throws 
IOException {
+    ShuffleServerInfo shuffleServerInfo;
+    String id = dataInput.readUTF();
+    String host = dataInput.readUTF();
+    int grpcPort = dataInput.readInt();
+    int nettyPort = dataInput.readInt();
+    shuffleServerInfo = new ShuffleServerInfo(id, host, grpcPort, nettyPort);
+    return shuffleServerInfo;
+  }
+
+  private void writeShuffleServerInfo(DataOutput dataOutput, ShuffleServerInfo 
shuffleServerInfo)
+      throws IOException {
+    dataOutput.writeUTF(shuffleServerInfo.getId());
+    dataOutput.writeUTF(shuffleServerInfo.getHost());
+    dataOutput.writeInt(shuffleServerInfo.getGrpcPort());
+    dataOutput.writeInt(shuffleServerInfo.getNettyPort());
+  }
+
   public ShuffleAssignmentsInfo getShuffleAssignmentsInfo() {
     return shuffleAssignmentsInfo;
   }
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index b37d9486f..bf376814e 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -243,6 +244,10 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
       assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
     }
     assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
+    String clientType =
+        conf.get(RssTezConfig.RSS_CLIENT_TYPE, 
RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+    ClientUtils.validateClientType(clientType);
+    assignmentTags.add(clientType);
 
     try {
       shuffleAssignmentsInfo =
diff --git 
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java 
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index ed631e534..563c5735b 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,7 +39,6 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.AsyncDispatcher;
@@ -86,9 +86,13 @@ import 
org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.storage.util.StorageType;
@@ -115,6 +119,11 @@ public class RssDAGAppMasterTest {
               RssDAGAppMasterTest.class.getSimpleName())
           .getAbsoluteFile();
 
+  static Stream<Arguments> clientTypeProvider() {
+    return Stream.of(
+        Arguments.of(ClientType.GRPC.name()), 
Arguments.of(ClientType.GRPC_NETTY.name()));
+  }
+
   @Test
   public void testDagStateChangeCallback() throws Exception {
     // 1 Init and mock some basic module
@@ -142,7 +151,7 @@ public class RssDAGAppMasterTest {
     clientConf.set(RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
     clientConf.set("tez.config1", "value1");
     clientConf.set("config2", "value2");
-    Map<String, String> dynamicConf = new HashMap();
+    Map<String, String> dynamicConf = new HashMap<>();
     dynamicConf.put(RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
     dynamicConf.put("tez.config3", "value3");
     when(appMaster.getClusterClientConf()).thenReturn(dynamicConf);
@@ -174,14 +183,7 @@ public class RssDAGAppMasterTest {
     RssDAGAppMaster.registerStateEnteredCallback(dagImpl, appMaster);
 
     // 5 register DAGEvent, init and start dispatcher
-    EventHandler<DAGEvent> dagEventDispatcher =
-        new EventHandler<DAGEvent>() {
-          @Override
-          public void handle(DAGEvent event) {
-            dagImpl.handle(event);
-          }
-        };
-    dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    dispatcher.register(DAGEventType.class, dagImpl);
     dispatcher.init(conf);
     dispatcher.start();
 
@@ -209,6 +211,19 @@ public class RssDAGAppMasterTest {
     verify(shuffleManager, times(1)).unregisterShuffleByDagId(dagId);
   }
 
+  private static void verifyCommonAssertions(
+      Configuration conf, int expectedSourceVertexId, int 
expectedDestinationVertexId) {
+    Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
+    Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
+    Assertions.assertEquals(StorageType.LOCALFILE.name(), 
conf.get(RSS_STORAGE_TYPE));
+    Assertions.assertEquals("value1", conf.get("tez.config1"));
+    Assertions.assertEquals("value3", conf.get("tez.config3"));
+    Assertions.assertNull(conf.get("tez.config2"));
+    Assertions.assertEquals(expectedSourceVertexId, 
conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1));
+    Assertions.assertEquals(
+        expectedDestinationVertexId, 
conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1));
+  }
+
   public static void verifyInput(
       DAGImpl dag,
       String name,
@@ -216,21 +231,13 @@ public class RssDAGAppMasterTest {
       int expectedSourceVertexId,
       int expectedDestinationVertexId)
       throws Exception {
-    // 1 verfiy rename rss io class name
     List<InputSpec> inputSpecs = dag.getVertex(name).getInputSpecList(0);
     Assertions.assertEquals(1, inputSpecs.size());
     Assertions.assertEquals(
         expectedInputClassName, 
inputSpecs.get(0).getInputDescriptor().getClassName());
-    // 2 verfiy the address and port of shuffle manager
     UserPayload payload = 
inputSpecs.get(0).getInputDescriptor().getUserPayload();
     Configuration conf = TezUtils.createConfFromUserPayload(payload);
-    Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
-    Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
-    // 3 verfiy the config
-    Assertions.assertEquals(StorageType.LOCALFILE.name(), 
conf.get(RSS_STORAGE_TYPE));
-    Assertions.assertEquals("value1", conf.get("tez.config1"));
-    Assertions.assertEquals("value3", conf.get("tez.config3"));
-    Assertions.assertNull(conf.get("tez.config2"));
+    verifyCommonAssertions(conf, expectedSourceVertexId, 
expectedDestinationVertexId);
     // TEZ_RUNTIME_IFILE_READAHEAD_BYTES is in getConfigurationKeySet, so the 
config from client
     // should deliver
     // to Input/Output. But tez.config.from.client is not in 
getConfigurationKeySet, so the config
@@ -238,10 +245,6 @@ public class RssDAGAppMasterTest {
     // should not deliver to Input/Output.
     Assertions.assertEquals(12345, 
conf.getInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, -1));
     Assertions.assertNull(conf.get("tez.config.from.client"));
-    // 4 verfiy vertex id
-    Assertions.assertEquals(expectedSourceVertexId, 
conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1));
-    Assertions.assertEquals(
-        expectedDestinationVertexId, 
conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1));
   }
 
   public static void verifyOutput(
@@ -251,28 +254,16 @@ public class RssDAGAppMasterTest {
       int expectedSourceVertexId,
       int expectedDestinationVertexId)
       throws Exception {
-    // 1 verfiy rename rss io class name
     List<OutputSpec> outputSpecs = dag.getVertex(name).getOutputSpecList(0);
     Assertions.assertEquals(1, outputSpecs.size());
     Assertions.assertEquals(
         expectedOutputClassName, 
outputSpecs.get(0).getOutputDescriptor().getClassName());
-    // 2 verfiy the address and port of shuffle manager
     UserPayload payload = 
outputSpecs.get(0).getOutputDescriptor().getUserPayload();
     Configuration conf = TezUtils.createConfFromUserPayload(payload);
-    Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
-    Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
-    // 3 verfiy the config
-    Assertions.assertEquals(StorageType.LOCALFILE.name(), 
conf.get(RSS_STORAGE_TYPE));
-    Assertions.assertEquals("value1", conf.get("tez.config1"));
-    Assertions.assertEquals("value3", conf.get("tez.config3"));
-    Assertions.assertNull(conf.get("tez.config2"));
-    // 4 verfiy vertex id
-    Assertions.assertEquals(expectedSourceVertexId, 
conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1));
-    Assertions.assertEquals(
-        expectedDestinationVertexId, 
conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1));
+    verifyCommonAssertions(conf, expectedSourceVertexId, 
expectedDestinationVertexId);
   }
 
-  private static DAG createDAG(String dageName, Configuration conf) {
+  private static DAG createDAG(String dagName, Configuration conf) {
     conf.setInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 12345);
     conf.set("tez.config.from.client", "value.from.client");
 
@@ -308,7 +299,7 @@ public class RssDAGAppMasterTest {
             .setFromConfiguration(conf)
             .build();
 
-    DAG dag = DAG.create(dageName);
+    DAG dag = DAG.create(dagName);
     dag.addVertex(vertex1)
         .addVertex(vertex2)
         .addVertex(vertex3)
@@ -387,8 +378,9 @@ public class RssDAGAppMasterTest {
     }
   }
 
-  @Test
-  public void testFetchRemoteStorageFromDynamicConf() throws Exception {
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void testFetchRemoteStorageFromDynamicConf(String clientType) throws 
Exception {
     final ApplicationId appId = ApplicationId.newInstance(1, 1);
     final ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 1);
     TezConfiguration conf = new TezConfiguration();
@@ -396,7 +388,7 @@ public class RssDAGAppMasterTest {
     Credentials amCreds = new Credentials();
     JobTokenSecretManager jtsm = new JobTokenSecretManager();
     JobTokenIdentifier identifier = new JobTokenIdentifier(new 
Text(appId.toString()));
-    Token<JobTokenIdentifier> sessionToken = new 
Token<JobTokenIdentifier>(identifier, jtsm);
+    Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jtsm);
     sessionToken.setService(identifier.getJobId());
     TokenCache.setSessionToken(sessionToken, amCreds);
 
@@ -424,7 +416,7 @@ public class RssDAGAppMasterTest {
             amCreds,
             "someuser",
             null);
-    appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(1));
+    appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(1, 
clientType));
     appMaster.init(conf);
 
     Configuration mergedConf = new Configuration(false);
@@ -437,8 +429,9 @@ public class RssDAGAppMasterTest {
     Assertions.assertEquals("testvalue", 
mergedConf.get("tez.rss.test.config"));
   }
 
-  @Test
-  public void testFetchRemoteStorageFromCoordinator() throws Exception {
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void testFetchRemoteStorageFromCoordinator(String clientType) throws 
Exception {
     final ApplicationId appId = ApplicationId.newInstance(1, 1);
     final ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 1);
     TezConfiguration conf = new TezConfiguration();
@@ -446,7 +439,7 @@ public class RssDAGAppMasterTest {
     Credentials amCreds = new Credentials();
     JobTokenSecretManager jtsm = new JobTokenSecretManager();
     JobTokenIdentifier identifier = new JobTokenIdentifier(new 
Text(appId.toString()));
-    Token<JobTokenIdentifier> sessionToken = new 
Token<JobTokenIdentifier>(identifier, jtsm);
+    Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jtsm);
     sessionToken.setService(identifier.getJobId());
     TokenCache.setSessionToken(sessionToken, amCreds);
 
@@ -474,7 +467,7 @@ public class RssDAGAppMasterTest {
             amCreds,
             "someuser",
             null);
-    appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(2));
+    appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(2, 
clientType));
     appMaster.init(conf);
 
     Configuration mergedConf = new Configuration(false);
@@ -495,12 +488,12 @@ public class RssDAGAppMasterTest {
      * Mode 2: rss.remote.storage.path and rss.remote.storage.conf is not set 
by dynamic config,
      *         appMaster will fetch remote storage conf from coordinator.
      * */
-    private int mode;
+    private final int mode;
 
-    FakedShuffleWriteClient(int mode) {
+    FakedShuffleWriteClient(int mode, String clientType) {
       super(
           ShuffleClientFactory.newWriteBuilder()
-              .clientType("GRPC")
+              .clientType(clientType)
               .retryMax(1)
               .retryIntervalMax(1)
               .heartBeatThreadNum(10)
@@ -521,7 +514,7 @@ public class RssDAGAppMasterTest {
 
     @Override
     public Map<String, String> fetchClientConf(int timeoutMs) {
-      Map<String, String> clientConf = new HashMap();
+      Map<String, String> clientConf = new HashMap<>();
       if (mode == 1) {
         clientConf.put("rss.remote.storage.path", "hdfs://ns1/rss/");
         clientConf.put("rss.remote.storage.conf", "key1=value1,key2=value2");
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
index 56cc55553..2576d926b 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezHashJoinTest.java
@@ -30,20 +30,15 @@ public class TezHashJoinTest extends 
TezJoinIntegrationTestBase {
   public void hashJoinTest() throws Exception {
     generateInputFile();
     fs.delete(new Path(HASH_JOIN_OUTPUT_PATH), true);
-    run(getTestArgs(HASH_JOIN_OUTPUT_PATH));
+    run(HASH_JOIN_OUTPUT_PATH);
   }
 
   @Test
   public void hashJoinDoBroadcastTest() throws Exception {
     generateInputFile();
-    String[] orignal = getTestArgs(HASH_JOIN_OUTPUT_PATH);
-    String[] args = new String[orignal.length + 1];
-    for (int i = 0; i < orignal.length; i++) {
-      args[i] = orignal[i];
-    }
-    args[orignal.length] = "doBroadcast";
-    fs.delete(new Path(HASH_JOIN_OUTPUT_PATH), true);
-    run(args);
+    String path = HASH_JOIN_OUTPUT_PATH + "_broadcast";
+    fs.delete(new Path(path), true);
+    run(path);
   }
 
   @Override
@@ -53,11 +48,17 @@ public class TezHashJoinTest extends 
TezJoinIntegrationTestBase {
 
   @Override
   public String[] getTestArgs(String uniqueOutputName) {
-    return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2", 
HASH_JOIN_OUTPUT_PATH};
+    if (uniqueOutputName.contains("broadcast")) {
+      return new String[] {
+        STREAM_INPUT_PATH, HASH_INPUT_PATH, "2", uniqueOutputName, 
"doBroadcast"
+      };
+    } else {
+      return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2", 
uniqueOutputName};
+    }
   }
 
   @Override
   public String getOutputDir(String uniqueOutputName) {
-    return HASH_JOIN_OUTPUT_PATH;
+    return uniqueOutputName;
   }
 }
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
index befa50c58..cdaffffb1 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
@@ -61,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class TezIntegrationTestBase extends IntegrationTestBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TezIntegrationTestBase.class);
-  private static String TEST_ROOT_DIR =
+  private static final String TEST_ROOT_DIR =
       "target" + Path.SEPARATOR + TezIntegrationTestBase.class.getName() + 
"-tmpDir";
 
   private Path remoteStagingDir = null;
@@ -75,20 +75,22 @@ public class TezIntegrationTestBase extends 
IntegrationTestBase {
       miniTezCluster.init(conf);
       miniTezCluster.start();
     }
-    LOG.info("Starting corrdinators and shuffer servers");
+    LOG.info("Starting coordinators and shuffle servers");
     CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = new HashMap();
+    Map<String, String> dynamicConf = new HashMap<>();
     dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
     dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
     addDynamicConf(coordinatorConf, dynamicConf);
     createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
-    createShuffleServer(shuffleServerConf);
+    ShuffleServerConf grpcShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
+    createShuffleServer(grpcShuffleServerConf);
+    ShuffleServerConf nettyShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC_NETTY);
+    createShuffleServer(nettyShuffleServerConf);
     startServers();
   }
 
   @AfterAll
-  public static void tearDown() throws Exception {
+  public static void tearDown() {
     if (miniTezCluster != null) {
       LOG.info("Stopping MiniTezCluster");
       miniTezCluster.stop();
@@ -117,25 +119,25 @@ public class TezIntegrationTestBase extends 
IntegrationTestBase {
     runTezApp(appConf, getTestTool(), getTestArgs("origin"));
     final String originPath = getOutputDir("origin");
 
-    // 2 Run Tez examples based on rss
-    appConf = new TezConfiguration(miniTezCluster.getConfig());
-    updateRssConfiguration(appConf);
-    appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestTool(), getTestArgs("rss"));
-    final String rssPath = getOutputDir("rss");
+    // Run RSS tests with different configurations
+    runRssTest(ClientType.GRPC, null, "rss-grpc", originPath);
+    runRssTest(ClientType.GRPC, "/tmp/spill-grpc", "rss-spill-grpc", 
originPath);
+    runRssTest(ClientType.GRPC_NETTY, null, "rss-netty", originPath);
+    runRssTest(ClientType.GRPC_NETTY, "/tmp/spill-netty", "rss-spill-netty", 
originPath);
+  }
 
-    // 3 Run Tez examples base on rss with remote spill enable
-    appConf = new TezConfiguration(miniTezCluster.getConfig());
-    appConf.setBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, true);
-    appConf.set(RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH, "/tmp/spill");
-    updateRssConfiguration(appConf);
+  private void runRssTest(
+      ClientType clientType, String spillPath, String testName, String 
originPath)
+      throws Exception {
+    TezConfiguration appConf = new 
TezConfiguration(miniTezCluster.getConfig());
+    if (spillPath != null) {
+      appConf.setBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, true);
+      appConf.set(RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH, spillPath);
+    }
+    updateRssConfiguration(appConf, clientType);
     appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestTool(), getTestArgs("rss-spill"));
-    final String rssPathSpill = getOutputDir("rss-spill");
-
-    // 4 verify the results
-    verifyResults(originPath, rssPath);
-    verifyResults(originPath, rssPathSpill);
+    runTezApp(appConf, getTestTool(), getTestArgs(testName));
+    verifyResults(originPath, getOutputDir(testName));
   }
 
   public Tool getTestTool() {
@@ -165,14 +167,14 @@ public class TezIntegrationTestBase extends 
IntegrationTestBase {
     appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
   }
 
-  public void updateRssConfiguration(Configuration appConf) throws Exception {
+  public void updateRssConfiguration(Configuration appConf, ClientType 
clientType) {
     appConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
     appConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 512);
     appConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx384m");
     appConf.setInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, 512);
     appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
     appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
-    appConf.set(RssTezConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+    appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name());
     appConf.set(
         TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
         TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT + " " + 
RssDAGAppMaster.class.getName());
@@ -221,8 +223,8 @@ public class TezIntegrationTestBase extends 
IntegrationTestBase {
     FileStatus[] rssFiles = fs.listStatus(rssPathFs);
     long originLen = 0;
     long rssLen = 0;
-    List<String> originFileList = new ArrayList();
-    List<String> rssFileList = new ArrayList();
+    List<String> originFileList = new ArrayList<>();
+    List<String> rssFileList = new ArrayList<>();
     for (FileStatus file : originFiles) {
       originLen += file.getLen();
       String name = file.getPath().getName();
@@ -297,8 +299,8 @@ public class TezIntegrationTestBase extends 
IntegrationTestBase {
 
     // 2 Load original result and rss result to hashmap
     Map<String, Integer> originalResults = new HashMap<>();
-    for (int i = 0; i < originFileList.size(); i++) {
-      Path path = new Path(originPath, originFileList.get(i));
+    for (String file : originFileList) {
+      Path path = new Path(originPath, file);
       LineReader lineReader = new LineReader(fs.open(path));
       Text line = new Text();
       while (lineReader.readLine(line) > 0) {
@@ -311,8 +313,8 @@ public class TezIntegrationTestBase extends 
IntegrationTestBase {
     }
 
     Map<String, Integer> rssResults = new HashMap<>();
-    for (int i = 0; i < rssFileList.size(); i++) {
-      Path path = new Path(rssPath, rssFileList.get(i));
+    for (String file : rssFileList) {
+      Path path = new Path(rssPath, file);
       LineReader lineReader = new LineReader(fs.open(path));
       Text line = new Text();
       while (lineReader.readLine(line) > 0) {
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
index e7e78b37b..641e40d34 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
@@ -23,6 +23,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.examples.JoinDataGen;
 import org.apache.tez.examples.JoinValidate;
 
+import org.apache.uniffle.common.ClientType;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TezJoinIntegrationTestBase extends TezIntegrationTestBase {
@@ -61,14 +63,16 @@ public class TezJoinIntegrationTestBase extends 
TezIntegrationTestBase {
     assertEquals(0, ToolRunner.run(appConf, validate, args), "JoinValidate 
failed");
   }
 
-  public void run(String[] overrideArgs) throws Exception {
-    // 1 Run Tez examples based on rss
+  public void run(String path) throws Exception {
+    runForClientType(ClientType.GRPC, path);
+    runForClientType(ClientType.GRPC_NETTY, path + "_netty");
+  }
+
+  private void runForClientType(ClientType clientType, String path) throws 
Exception {
     TezConfiguration appConf = new 
TezConfiguration(miniTezCluster.getConfig());
-    updateRssConfiguration(appConf);
+    updateRssConfiguration(appConf, clientType);
     appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestTool(), overrideArgs);
-
-    // 2 check the result
-    verifyResults(JOIN_EXPECTED_PATH, getOutputDir(""));
+    runTezApp(appConf, getTestTool(), getTestArgs(path));
+    verifyResults(JOIN_EXPECTED_PATH, getOutputDir(path));
   }
 }
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
index 4add6e887..788465d61 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
@@ -33,13 +33,14 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.examples.SimpleSessionExample;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.exception.RssException;
 
 public class TezSimpleSessionExampleTest extends TezIntegrationTestBase {
 
-  private String inputPath = "simple_session_input";
-  private String outputPath = "simple_session_output";
-  private List<String> wordTable =
+  private final String inputPath = "simple_session_input";
+  private final String outputPath = "simple_session_output";
+  private final List<String> wordTable =
       Lists.newArrayList(
           "apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan", 
"tomato");
 
@@ -56,8 +57,8 @@ public class TezSimpleSessionExampleTest extends 
TezIntegrationTestBase {
   }
 
   @Override
-  public void updateRssConfiguration(Configuration appConf) throws Exception {
-    super.updateRssConfiguration(appConf);
+  public void updateRssConfiguration(Configuration appConf, ClientType 
clientType) {
+    super.updateRssConfiguration(appConf, clientType);
     appConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
   }
 
@@ -74,7 +75,7 @@ public class TezSimpleSessionExampleTest extends 
TezIntegrationTestBase {
     // To keep pace with verifyResults, here make sure summation of word is 
unique number.
     FSDataOutputStream outputStream = fs.create(new Path(inputPath));
     Random random = new Random();
-    Set<Integer> used = new HashSet();
+    Set<Integer> used = new HashSet<>();
     List<String> outputList = new ArrayList<>();
     int index = 0;
     while (index < wordTable.size()) {
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
index b31bfa8f6..e6b5488bb 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezSortMergeJoinTest.java
@@ -28,7 +28,7 @@ public class TezSortMergeJoinTest extends 
TezJoinIntegrationTestBase {
   @Test
   public void sortMergeJoinTest() throws Exception {
     generateInputFile();
-    run(getTestArgs(""));
+    run(SORT_MERGE_JOIN_OUTPUT_PATH);
   }
 
   @Override
@@ -38,11 +38,11 @@ public class TezSortMergeJoinTest extends 
TezJoinIntegrationTestBase {
 
   @Override
   public String[] getTestArgs(String uniqueOutputName) {
-    return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2", 
SORT_MERGE_JOIN_OUTPUT_PATH};
+    return new String[] {STREAM_INPUT_PATH, HASH_INPUT_PATH, "2", 
uniqueOutputName};
   }
 
   @Override
   public String getOutputDir(String uniqueOutputName) {
-    return SORT_MERGE_JOIN_OUTPUT_PATH;
+    return uniqueOutputName;
   }
 }
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
index d31d842ab..4e515eea4 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
@@ -75,14 +75,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TezWordCountWithFailuresTest extends IntegrationTestBase {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TezIntegrationTestBase.class);
-  private static String TEST_ROOT_DIR =
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezWordCountWithFailuresTest.class);
+  private static final String TEST_ROOT_DIR =
       "target" + Path.SEPARATOR + TezWordCountWithFailuresTest.class.getName() 
+ "-tmpDir";
 
   private Path remoteStagingDir = null;
-  private String inputPath = "word_count_input";
-  private String outputPath = "word_count_output";
-  private List<String> wordTable =
+  private final String inputPath = "word_count_input";
+  private final String outputPath = "word_count_output";
+  private final List<String> wordTable =
       Lists.newArrayList(
           "apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan", 
"tomato");
 
@@ -96,20 +96,22 @@ public class TezWordCountWithFailuresTest extends 
IntegrationTestBase {
       miniTezCluster.init(conf);
       miniTezCluster.start();
     }
-    LOG.info("Starting corrdinators and shuffer servers");
+    LOG.info("Starting coordinators and shuffle servers");
     CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = new HashMap();
+    Map<String, String> dynamicConf = new HashMap<>();
     dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
     dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
     addDynamicConf(coordinatorConf, dynamicConf);
     createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
-    createShuffleServer(shuffleServerConf);
+    ShuffleServerConf grpcShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
+    createShuffleServer(grpcShuffleServerConf);
+    ShuffleServerConf nettyShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC_NETTY);
+    createShuffleServer(nettyShuffleServerConf);
     startServers();
   }
 
   @AfterAll
-  public static void tearDown() throws Exception {
+  public static void tearDown() {
     if (miniTezCluster != null) {
       LOG.info("Stopping MiniTezCluster");
       miniTezCluster.stop();
@@ -155,89 +157,68 @@ public class TezWordCountWithFailuresTest extends 
IntegrationTestBase {
 
   @Test
   public void wordCountTestWithTaskFailureWhenAvoidRecomputeEnable() throws 
Exception {
-    // 1 Run Tez examples based on rss
-    TezConfiguration appConf = new 
TezConfiguration(miniTezCluster.getConfig());
-    updateRssConfiguration(appConf, 0, true, false, 1);
-    TezIntegrationTestBase.appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestArgs("rss"), 0);
-    final String rssPath = getOutputDir("rss");
-
-    // 2 Run original Tez examples
-    appConf = new TezConfiguration(miniTezCluster.getConfig());
-    updateCommonConfiguration(appConf);
-    runTezApp(appConf, getTestArgs("origin"), -1);
-    final String originPath = getOutputDir("origin");
-
-    // 3 verify the results
-    TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
+    runWordCountTestForBothClientTypes(true, false, 1);
   }
 
   @Test
   public void wordCountTestWithTaskFailureWhenAvoidRecomputeDisable() throws 
Exception {
-    // 1 Run Tez examples based on rss
-    TezConfiguration appConf = new 
TezConfiguration(miniTezCluster.getConfig());
-    updateRssConfiguration(appConf, 0, false, false, 1);
-    TezIntegrationTestBase.appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestArgs("rss"), 1);
-    final String rssPath = getOutputDir("rss");
-
-    // 2 Run original Tez examples
-    appConf = new TezConfiguration(miniTezCluster.getConfig());
-    updateCommonConfiguration(appConf);
-    runTezApp(appConf, getTestArgs("origin"), -1);
-    final String originPath = getOutputDir("origin");
-
-    // 3 verify the results
-    TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
+    runWordCountTestForBothClientTypes(false, false, 1);
   }
 
   @Test
   public void wordCountTestWithNodeUnhealthyWhenAvoidRecomputeEnable() throws 
Exception {
-    // 1 Run Tez examples based on rss
-    TezConfiguration appConf = new 
TezConfiguration(miniTezCluster.getConfig());
-    updateRssConfiguration(appConf, 1, true, true, 100);
-    TezIntegrationTestBase.appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestArgs("rss"), 0);
-    final String rssPath = getOutputDir("rss");
-
-    // 2 Run original Tez examples
-    appConf = new TezConfiguration(miniTezCluster.getConfig());
-    updateCommonConfiguration(appConf);
-    runTezApp(appConf, getTestArgs("origin"), -1);
-    final String originPath = getOutputDir("origin");
-
-    // 3 verify the results
-    TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
+    runWordCountTestForBothClientTypes(true, true, 100);
   }
 
   @Test
   public void wordCountTestWithNodeUnhealthyWhenAvoidRecomputeDisable() throws 
Exception {
-    // 1 Run Tez examples based on rss
-    TezConfiguration appConf = new 
TezConfiguration(miniTezCluster.getConfig());
-    updateRssConfiguration(appConf, 1, false, true, 100);
-    TezIntegrationTestBase.appendAndUploadRssJars(appConf);
-    runTezApp(appConf, getTestArgs("rss"), 1);
-    final String rssPath = getOutputDir("rss");
-
-    // 2 Run original Tez examples
-    appConf = new TezConfiguration(miniTezCluster.getConfig());
-    updateCommonConfiguration(appConf);
-    runTezApp(appConf, getTestArgs("origin"), -1);
-    final String originPath = getOutputDir("origin");
-
-    // 3 verify the results
+    runWordCountTestForBothClientTypes(false, true, 100);
+  }
+
+  private void runWordCountTestForBothClientTypes(
+      boolean avoidRecompute, boolean rescheduleWhenUnhealthy, int 
maxFailures) throws Exception {
+    String originPath = runOriginalWordCount();
+    runWordCountTest(
+        ClientType.GRPC, avoidRecompute, rescheduleWhenUnhealthy, maxFailures, 
originPath);
+    runWordCountTest(
+        ClientType.GRPC_NETTY, avoidRecompute, rescheduleWhenUnhealthy, 
maxFailures, originPath);
+  }
+
+  private void runWordCountTest(
+      ClientType clientType,
+      boolean avoidRecompute,
+      boolean rescheduleWhenUnhealthy,
+      int maxFailures,
+      String originPath)
+      throws Exception {
+    int testMode = rescheduleWhenUnhealthy ? 1 : 0;
+    int expectedVerifyMode = avoidRecompute ? 0 : 1;
+    TezConfiguration rssConf = new 
TezConfiguration(miniTezCluster.getConfig());
+    updateRssConfiguration(
+        rssConf, testMode, avoidRecompute, rescheduleWhenUnhealthy, 
maxFailures, clientType);
+    TezIntegrationTestBase.appendAndUploadRssJars(rssConf);
+    String testName = "rss-" + clientType.name().toLowerCase();
+    runTezApp(rssConf, getTestArgs(testName), expectedVerifyMode);
+    String rssPath = getOutputDir(testName);
     TezIntegrationTestBase.verifyResultEqual(originPath, rssPath);
   }
 
+  private String runOriginalWordCount() throws Exception {
+    TezConfiguration originalConf = new 
TezConfiguration(miniTezCluster.getConfig());
+    updateCommonConfiguration(originalConf);
+    runTezApp(originalConf, getTestArgs("origin"), -1);
+    return getOutputDir("origin");
+  }
+
   /*
    * Two verify mode are supported:
    * (a) verifyMode 0
    *     tez.rss.avoid.recompute.succeeded.task is enable, should not 
recompute the task when this node is
-   *     blacke-listed for unhealthy.
+   *     black-listed for unhealthy.
    *
    * (b) verifyMode 1
    *     tez.rss.avoid.recompute.succeeded.task is disable, will recompute the 
task when this node is
-   *     blacke-listed for unhealthy.
+   *     black-listed for unhealthy.
    * */
   protected void runTezApp(TezConfiguration tezConf, String[] args, int 
verifyMode)
       throws Exception {
@@ -260,16 +241,16 @@ public class TezWordCountWithFailuresTest extends 
IntegrationTestBase {
   /*
    * In this integration test, mini cluster have three NM with 4G
    * (YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB). The request of 
am is 4G, the request of task is 2G.
-   * It means that one node only runs one am container so that won't lable the 
node which am container runs as
-   * black-list or uhealthy node.
+   * It means that one node only runs one am container so that won't label the 
node which am container runs as
+   * black-list or unhealthy node.
    * */
   public void updateRssConfiguration(
       Configuration appConf,
       int testMode,
       boolean avoidRecompute,
       boolean rescheduleWhenUnhealthy,
-      int maxFailures)
-      throws Exception {
+      int maxFailures,
+      ClientType clientType) {
     appConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
     appConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 4096);
     appConf.setInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, 4096);
@@ -277,7 +258,7 @@ public class TezWordCountWithFailuresTest extends 
IntegrationTestBase {
     appConf.setInt(TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 99);
     appConf.setInt(TEZ_AM_MAX_TASK_FAILURES_PER_NODE, maxFailures);
     appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
-    appConf.set(RssTezConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+    appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name());
     appConf.set(
         TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
         TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT
@@ -297,11 +278,11 @@ public class TezWordCountWithFailuresTest extends 
IntegrationTestBase {
     appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
   }
 
-  public class WordCountWithFailures extends WordCount {
+  public static class WordCountWithFailures extends WordCount {
 
     TezClient tezClientInternal = null;
     private HadoopShim hadoopShim;
-    int verifyMode = -1;
+    int verifyMode;
 
     WordCountWithFailures(int assertMode) {
       this.verifyMode = assertMode;
@@ -353,7 +334,7 @@ public class TezWordCountWithFailuresTest extends 
IntegrationTestBase {
 
       DAGStatus dagStatus = 
dagClient.waitForCompletionWithStatusUpdates(getOpts);
       if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-        logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+        logger.info("DAG diagnostics: {}", dagStatus.getDiagnostics());
         return -1;
       }
 


Reply via email to