reswqa commented on code in PR #23436:
URL: https://github.com/apache/flink/pull/23436#discussion_r1329611272


##########
flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java:
##########
@@ -22,60 +22,75 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
 
 /** Tests for hybrid shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
 class HybridShuffleITCase extends BatchShuffleITCaseBase {
 
-    @Test
+    @Parameter public boolean enableNewHybridMode;
+
+    @Parameters(name = "enableNewHybridMode={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @TestTemplate
     void testHybridFullExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridFullExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
+
+    private Configuration configureHybridOptions(Configuration configuration, 
boolean isSelective) {
+        BatchShuffleMode shuffleMode =
+                isSelective
+                        ? BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE
+                        : BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+        configuration.set(
+                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE,
+                enableNewHybridMode);
+
+        if (enableNewHybridMode && isSelective) {
+            // Note that the memory tier of the new mode need more buffers for 
the selective mode
+            
configuration.setString(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), "256m");

Review Comment:
   How is `256MB` calculated? In the CI environment, memory should be 
relatively tight. 🤔 



##########
flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java:
##########
@@ -22,60 +22,75 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
 
 /** Tests for hybrid shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
 class HybridShuffleITCase extends BatchShuffleITCaseBase {
 
-    @Test
+    @Parameter public boolean enableNewHybridMode;
+
+    @Parameters(name = "enableNewHybridMode={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @TestTemplate
     void testHybridFullExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridFullExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
+
+    private Configuration configureHybridOptions(Configuration configuration, 
boolean isSelective) {
+        BatchShuffleMode shuffleMode =
+                isSelective
+                        ? BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE
+                        : BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+        configuration.set(
+                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE,
+                enableNewHybridMode);
+
+        if (enableNewHybridMode && isSelective) {
+            // Note that the memory tier of the new mode need more buffers for 
the selective mode
+            
configuration.setString(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), "256m");

Review Comment:
   How is `256MB` calculated? In the CI environment, memory should be 
relatively tight. 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to