This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e5b35b  [FLINK-25704] Fix the blocking partition benchmark regression 
caused by FLINK-25637
5e5b35b is described below

commit 5e5b35b28b1ef9299cafbed1e4e9a715f1027f1e
Author: kevin.cyj <[email protected]>
AuthorDate: Wed Jan 26 15:47:16 2022 +0800

    [FLINK-25704] Fix the blocking partition benchmark regression caused by 
FLINK-25637
    
    FLINK-25637 changed the default blocking shuffle implementation from 
hash-shuffle to sort-shuffle which caused some benchmark regression. This patch 
tries to fix the regression by switching back to hash-shuffle for those 
benchmark tests with regression 
(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM is the 
config option deciding which blocking shuffle implementation to use. If 
consumer task parallelism is smaller than this config value, the hash-based 
blocking shu [...]
---
 .../java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java    | 3 +++
 .../flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java       | 3 +++
 2 files changed, 6 insertions(+)

diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 7a30525..3bf055c 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -95,6 +95,9 @@ public class BlockingPartitionBenchmark extends BenchmarkBase 
{
                 boolean compressionEnabled, String subpartitionType) {
             Configuration configuration = super.createConfiguration();
 
+            configuration.setInteger(
+                    
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                    Integer.MAX_VALUE);
             configuration.setBoolean(
                     
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
                     compressionEnabled);
diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index aa30de7..09fa02b 100644
--- 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -71,6 +71,9 @@ public class BlockingPartitionRemoteChannelBenchmark extends 
RemoteBenchmarkBase
         protected Configuration createConfiguration() {
             Configuration configuration = super.createConfiguration();
 
+            configuration.setInteger(
+                    
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                    Integer.MAX_VALUE);
             configuration.setString(
                     
NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
             configuration.setString(

Reply via email to