This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc33ce175c1 [FLINK-33103][network] Hybrid shuffle ITCase supports the 
new mode
cc33ce175c1 is described below

commit cc33ce175c1b3953577cd26e2416cb2eef94e176
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Sep 15 12:26:25 2023 +0800

    [FLINK-33103][network] Hybrid shuffle ITCase supports the new mode
---
 .../flink/test/runtime/HybridShuffleITCase.java    | 69 +++++++++++++---------
 1 file changed, 42 insertions(+), 27 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index e9de7c5eea7..f5c6b0a609e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -22,60 +22,75 @@ import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
 
 /** Tests for hybrid shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
 class HybridShuffleITCase extends BatchShuffleITCaseBase {
 
-    @Test
+    @Parameter public boolean enableNewHybridMode;
+
+    @Parameters(name = "enableNewHybridMode={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @TestTemplate
     void testHybridFullExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridFullExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
+
+    private Configuration configureHybridOptions(Configuration configuration, 
boolean isSelective) {
+        BatchShuffleMode shuffleMode =
+                isSelective
+                        ? BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE
+                        : BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+        configuration.set(
+                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE,
+                enableNewHybridMode);
+
+        if (enableNewHybridMode && isSelective) {
+            // Note that the memory tier of the new mode need more buffers for 
the selective mode
+            
configuration.setString(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), "256m");
+        }
+        return configuration;
+    }
 }

Reply via email to