TanYuxin-tyx commented on code in PR #23436:
URL: https://github.com/apache/flink/pull/23436#discussion_r1329625276


##########
flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java:
##########
@@ -22,60 +22,75 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
 
 /** Tests for hybrid shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
 class HybridShuffleITCase extends BatchShuffleITCaseBase {
 
-    @Test
+    @Parameter public boolean enableNewHybridMode;
+
+    @Parameters(name = "enableNewHybridMode={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @TestTemplate
     void testHybridFullExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchanges() throws Exception {
         final int numRecordsToSend = 10000;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, false, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridFullExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
-    @Test
+    @TestTemplate
     void testHybridSelectiveExchangesRestart() throws Exception {
         final int numRecordsToSend = 10;
-        Configuration configuration = getConfiguration();
-        configuration.set(
-                ExecutionOptions.BATCH_SHUFFLE_MODE,
-                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        configuration.set(
-                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), true);
         JobGraph jobGraph = createJobGraph(numRecordsToSend, true, 
configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
+
+    private Configuration configureHybridOptions(Configuration configuration, 
boolean isSelective) {
+        BatchShuffleMode shuffleMode =
+                isSelective
+                        ? BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE
+                        : BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+        configuration.set(
+                
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE,
+                enableNewHybridMode);
+
+        if (enableNewHybridMode && isSelective) {
+            // Note that the memory tier of the new mode need more buffers for 
the selective mode
+            
configuration.setString(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), "256m");

Review Comment:
   The IT case's TM memory is set in `JobGraphRunningUtil#execute` and has a 
size of 1g. I think it is sufficient to run IT Case.
   
   Because the memory tier uses more buffers(at least 100 buffers),  the 
network mem of `256m`(and it is far less than `1g` ) is only to ensure that the 
buffers are enough and ensure the test stability as much as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to