This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new 5e5b35b [FLINK-25704] Fix the blocking partition benchmark regression
caused by FLINK-25637
5e5b35b is described below
commit 5e5b35b28b1ef9299cafbed1e4e9a715f1027f1e
Author: kevin.cyj <[email protected]>
AuthorDate: Wed Jan 26 15:47:16 2022 +0800
[FLINK-25704] Fix the blocking partition benchmark regression caused by
FLINK-25637
FLINK-25637 changed the default blocking shuffle implementation from
hash-shuffle to sort-shuffle which caused some benchmark regression. This patch
tries to fix the regression by switching back to hash-shuffle for those
benchmark tests with regression
(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM is the
config option deciding which blocking shuffle implementation to use. If
consumer task parallelism is smaller than this config value, the hash-based
blocking shu [...]
---
.../java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java | 3 +++
.../flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java | 3 +++
2 files changed, 6 insertions(+)
diff --git
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 7a30525..3bf055c 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -95,6 +95,9 @@ public class BlockingPartitionBenchmark extends BenchmarkBase
{
boolean compressionEnabled, String subpartitionType) {
Configuration configuration = super.createConfiguration();
+ configuration.setInteger(
+
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+ Integer.MAX_VALUE);
configuration.setBoolean(
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
compressionEnabled);
diff --git
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index aa30de7..09fa02b 100644
---
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -71,6 +71,9 @@ public class BlockingPartitionRemoteChannelBenchmark extends
RemoteBenchmarkBase
protected Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();
+ configuration.setInteger(
+
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+ Integer.MAX_VALUE);
configuration.setString(
NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
configuration.setString(