This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cc33ce175c1 [FLINK-33103][network] Hybrid shuffle ITCase supports the
new mode
cc33ce175c1 is described below
commit cc33ce175c1b3953577cd26e2416cb2eef94e176
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Sep 15 12:26:25 2023 +0800
[FLINK-33103][network] Hybrid shuffle ITCase supports the new mode
---
.../flink/test/runtime/HybridShuffleITCase.java | 69 +++++++++++++---------
1 file changed, 42 insertions(+), 27 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index e9de7c5eea7..f5c6b0a609e 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -22,60 +22,75 @@ import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
/** Tests for hybrid shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
class HybridShuffleITCase extends BatchShuffleITCaseBase {
- @Test
+ @Parameter public boolean enableNewHybridMode;
+
+ @Parameters(name = "enableNewHybridMode={0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ @TestTemplate
void testHybridFullExchanges() throws Exception {
final int numRecordsToSend = 10000;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
- configuration.set(
-
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration =
configureHybridOptions(getConfiguration(), false);
JobGraph jobGraph = createJobGraph(numRecordsToSend, false,
configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
- @Test
+ @TestTemplate
void testHybridSelectiveExchanges() throws Exception {
final int numRecordsToSend = 10000;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE,
- BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
- configuration.set(
-
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration =
configureHybridOptions(getConfiguration(), true);
JobGraph jobGraph = createJobGraph(numRecordsToSend, false,
configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
- @Test
+ @TestTemplate
void testHybridFullExchangesRestart() throws Exception {
final int numRecordsToSend = 10;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
- configuration.set(
-
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration =
configureHybridOptions(getConfiguration(), false);
JobGraph jobGraph = createJobGraph(numRecordsToSend, true,
configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
- @Test
+ @TestTemplate
void testHybridSelectiveExchangesRestart() throws Exception {
final int numRecordsToSend = 10;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE,
- BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
- configuration.set(
-
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration =
configureHybridOptions(getConfiguration(), true);
JobGraph jobGraph = createJobGraph(numRecordsToSend, true,
configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
+
+ private Configuration configureHybridOptions(Configuration configuration,
boolean isSelective) {
+ BatchShuffleMode shuffleMode =
+ isSelective
+ ? BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE
+ : BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+ configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+ configuration.set(
+
NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE,
+ enableNewHybridMode);
+
+ if (enableNewHybridMode && isSelective) {
+ // Note that the memory tier of the new mode need more buffers for
the selective mode
+
configuration.setString(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), "256m");
+ }
+ return configuration;
+ }
}