This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 43323bbdb [#2015] improvement(netty): Support Netty for MR integration 
test. (#2016)
43323bbdb is described below

commit 43323bbdb1e60381ffb029716a742c750f9e8572
Author: QI Jiale <qijial...@foxmail.com>
AuthorDate: Mon Oct 21 14:33:33 2024 +0800

    [#2015] improvement(netty): Support Netty for MR integration test. (#2016)
    
    ### What changes were proposed in this pull request?
    
    Support Netty for MR integration test.
    
    ### Why are the changes needed?
    
    Fix: #2015
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Integration tests themselves.
---
 .../org/apache/uniffle/test/DynamicConfTest.java   | 15 +++----
 .../org/apache/uniffle/test/HadoopConfTest.java    | 14 ++++---
 .../org/apache/uniffle/test/LargeSorterTest.java   | 14 ++++---
 .../apache/uniffle/test/MRIntegrationTestBase.java | 47 +++++++++++++---------
 .../org/apache/uniffle/test/RMWordCountTest.java   | 11 +++--
 .../org/apache/uniffle/test/SecondarySortTest.java | 12 ++++--
 .../org/apache/uniffle/test/WordCountTest.java     | 12 ++++--
 7 files changed, 76 insertions(+), 49 deletions(-)

diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
index c5421d113..aad036794 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.mapreduce.LargeSorter;
 import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.storage.util.StorageType;
@@ -41,18 +42,18 @@ public class DynamicConfTest extends MRIntegrationTestBase {
     Map<String, String> dynamicConf = new HashMap<>();
     dynamicConf.put(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + 
"rss/test");
     dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    dynamicConf.put(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
     return dynamicConf;
   }
 
-  @Test
-  public void dynamicConfTest() throws Exception {
-    run();
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void dynamicConfTest(ClientType clientType) throws Exception {
+    run(clientType);
   }
 
   @Override
-  protected void updateRssConfiguration(Configuration jobConf) {
-    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+  protected void updateRssConfiguration(Configuration jobConf, ClientType 
clientType) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
     jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
     jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
   }
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
index 892b9e19a..be8227bdc 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.mapreduce.LargeSorter;
 import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.storage.util.StorageType;
@@ -41,14 +42,15 @@ public class HadoopConfTest extends MRIntegrationTestBase {
     return new HashMap<>();
   }
 
-  @Test
-  public void hadoopConfTest() throws Exception {
-    run();
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void hadoopConfTest(ClientType clientType) throws Exception {
+    run(clientType);
   }
 
   @Override
-  protected void updateRssConfiguration(Configuration jobConf) {
-    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+  protected void updateRssConfiguration(Configuration jobConf, ClientType 
clientType) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
     jobConf.set(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
     jobConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
     jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
index be7ec8375..a1546b762 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.common.ClientType;
 
@@ -34,14 +35,15 @@ public class LargeSorterTest extends MRIntegrationTestBase {
     MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
   }
 
-  @Test
-  public void largeSorterTest() throws Exception {
-    run();
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void largeSorterTest(ClientType clientType) throws Exception {
+    run(clientType);
   }
 
   @Override
-  protected void updateRssConfiguration(Configuration jobConf) {
-    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+  protected void updateRssConfiguration(Configuration jobConf, ClientType 
clientType) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
     jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
     jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
     jobConf.set(
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 0387eb9cb..0c6e32652 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -24,6 +24,7 @@ import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.provider.Arguments;
 
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.rpc.ServerType;
@@ -75,6 +77,10 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
   private static final String OUTPUT_ROOT_DIR = "/tmp/" + 
TestMRJobs.class.getSimpleName();
   private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR, 
"localizedResources");
 
+  static Stream<Arguments> clientTypeProvider() {
+    return Stream.of(Arguments.of(ClientType.GRPC), 
Arguments.of(ClientType.GRPC_NETTY));
+  }
+
   @BeforeAll
   public static void setUpMRYarn() throws IOException {
     mrYarnCluster = new MiniMRYarnCluster("test");
@@ -99,29 +105,29 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     }
   }
 
-  public void run() throws Exception {
+  public void run(ClientType clientType) throws Exception {
     JobConf appConf = new JobConf(mrYarnCluster.getConfig());
     updateCommonConfiguration(appConf);
     runOriginApp(appConf);
     final String originPath = 
appConf.get("mapreduce.output.fileoutputformat.outputdir");
     appConf = new JobConf(mrYarnCluster.getConfig());
     updateCommonConfiguration(appConf);
-    runRssApp(appConf);
+    runRssApp(appConf, clientType);
     String rssPath = 
appConf.get("mapreduce.output.fileoutputformat.outputdir");
     verifyResults(originPath, rssPath);
 
     appConf = new JobConf(mrYarnCluster.getConfig());
     appConf.set("mapreduce.rss.reduce.remote.spill.enable", "true");
-    runRssApp(appConf);
+    runRssApp(appConf, clientType);
     String rssRemoteSpillPath = 
appConf.get("mapreduce.output.fileoutputformat.outputdir");
     verifyResults(originPath, rssRemoteSpillPath);
   }
 
-  public void runWithRemoteMerge() throws Exception {
+  public void runWithRemoteMerge(ClientType clientType) throws Exception {
     // 1 run application when remote merge is enable
     JobConf appConf = new JobConf(mrYarnCluster.getConfig());
     updateCommonConfiguration(appConf);
-    runRssApp(appConf, true);
+    runRssApp(appConf, true, clientType);
     final String rssPath1 = 
appConf.get("mapreduce.output.fileoutputformat.outputdir");
 
     // 2 run original application
@@ -142,11 +148,12 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     runMRApp(jobConf, getTestTool(), getTestArgs());
   }
 
-  private void runRssApp(Configuration jobConf) throws Exception {
-    runRssApp(jobConf, false);
+  private void runRssApp(Configuration jobConf, ClientType clientType) throws 
Exception {
+    runRssApp(jobConf, false, clientType);
   }
 
-  private void runRssApp(Configuration jobConf, boolean remoteMerge) throws 
Exception {
+  private void runRssApp(Configuration jobConf, boolean remoteMerge, 
ClientType clientType)
+      throws Exception {
     URL url = MRIntegrationTestBase.class.getResource("/");
     final String parentPath =
         new 
Path(url.getPath()).getParent().getParent().getParent().getParent().toString();
@@ -185,19 +192,19 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     }
     assertNotNull(localFile);
     String props = System.getProperty("java.class.path");
-    String newProps = "";
+    StringBuilder newProps = new StringBuilder();
     String[] splittedProps = props.split(":");
     for (String prop : splittedProps) {
       if (!prop.contains("classes")
           && !prop.contains("grpc")
           && !prop.contains("rss-")
           && !prop.contains("shuffle-storage")) {
-        newProps = newProps + ":" + prop;
+        newProps.append(":").append(prop);
       } else if (prop.contains("mr") && prop.contains("integration-test")) {
-        newProps = newProps + ":" + prop;
+        newProps.append(":").append(prop);
       }
     }
-    System.setProperty("java.class.path", newProps);
+    System.setProperty("java.class.path", newProps.toString());
     Path newPath = new Path(HDFS_URI + "/rss.jar");
     FileUtil.copy(file, fs, newPath, false, jobConf);
     DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), 
jobConf, fs);
@@ -208,8 +215,9 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
             + ","
             + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
     jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
-    updateRssConfiguration(jobConf);
+    updateRssConfiguration(jobConf, clientType);
     runMRApp(jobConf, getTestTool(), getTestArgs());
+    fs.delete(newPath, true);
   }
 
   protected String[] getTestArgs() {
@@ -225,11 +233,14 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     CoordinatorConf coordinatorConf = getCoordinatorConf();
     addDynamicConf(coordinatorConf, dynamicConf);
     createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
+    ShuffleServerConf grpcShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
+    ShuffleServerConf nettyShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC_NETTY);
     if (serverConf != null) {
-      shuffleServerConf.addAll(serverConf);
+      grpcShuffleServerConf.addAll(serverConf);
+      nettyShuffleServerConf.addAll(serverConf);
     }
-    createShuffleServer(shuffleServerConf);
+    createShuffleServer(grpcShuffleServerConf);
+    createShuffleServer(nettyShuffleServerConf);
     startServers();
   }
 
@@ -240,8 +251,8 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     return dynamicConf;
   }
 
-  protected void updateRssConfiguration(Configuration jobConf) {
-    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+  protected void updateRssConfiguration(Configuration jobConf, ClientType 
clientType) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
   }
 
   private void runMRApp(Configuration conf, Tool tool, String[] args) throws 
Exception {
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
index 4ac8fc589..9c2252920 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.mapred.WordCount;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.buffer.ShuffleBufferType;
 
@@ -51,10 +53,11 @@ public class RMWordCountTest extends MRIntegrationTestBase {
     MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf(), 
serverConf);
   }
 
-  @Test
-  public void wordCountTest() throws Exception {
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void wordCountTest(ClientType clientType) throws Exception {
     generateInputFile();
-    runWithRemoteMerge();
+    runWithRemoteMerge(clientType);
   }
 
   @Override
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
index 6e0bf5dcd..4882ec5d6 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.uniffle.common.ClientType;
 
 public class SecondarySortTest extends MRIntegrationTestBase {
 
@@ -43,10 +46,11 @@ public class SecondarySortTest extends 
MRIntegrationTestBase {
     MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
   }
 
-  @Test
-  public void secondarySortTest() throws Exception {
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void secondarySortTest(ClientType clientType) throws Exception {
     generateInputFile();
-    run();
+    run(clientType);
   }
 
   private void generateInputFile() throws Exception {
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
index 2ba76f5a9..47aef051e 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.mapred.WordCount;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.uniffle.common.ClientType;
 
 public class WordCountTest extends MRIntegrationTestBase {
 
@@ -46,10 +49,11 @@ public class WordCountTest extends MRIntegrationTestBase {
     MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
   }
 
-  @Test
-  public void wordCountTest() throws Exception {
+  @ParameterizedTest
+  @MethodSource("clientTypeProvider")
+  public void wordCountTest(ClientType clientType) throws Exception {
     generateInputFile();
-    run();
+    run(clientType);
   }
 
   @Override

Reply via email to