This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new ffcdbb4  [FLINK-25959] Add micro-benchmarks for the sort-based 
blocking shuffle
ffcdbb4 is described below

commit ffcdbb45c88e9453bd815229eebaec1e776722cb
Author: kevin.cyj <[email protected]>
AuthorDate: Thu Feb 10 15:28:04 2022 +0800

    [FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle
---
 .../benchmark/BlockingPartitionBenchmark.java      | 45 ++++++++++++++++++----
 .../BlockingPartitionRemoteChannelBenchmark.java   | 38 +++++++++++++++---
 2 files changed, 70 insertions(+), 13 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 3bf055c..bcef395 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -66,6 +66,16 @@ public class BlockingPartitionBenchmark extends 
BenchmarkBase {
         executeBenchmark(context.env);
     }
 
+    @Benchmark
+    public void compressedSortPartition(CompressedSortEnvironmentContext 
context) throws Exception {
+        executeBenchmark(context.env);
+    }
+
+    @Benchmark
+    public void uncompressedSortPartition(UncompressedSortEnvironmentContext 
context) throws Exception {
+        executeBenchmark(context.env);
+    }
+
     private void executeBenchmark(StreamExecutionEnvironment env) throws 
Exception {
         StreamGraph streamGraph =
                 StreamGraphUtils.buildGraphForBatchJob(env, 
RECORDS_PER_INVOCATION);
@@ -92,12 +102,17 @@ public class BlockingPartitionBenchmark extends 
BenchmarkBase {
         }
 
         protected Configuration createConfiguration(
-                boolean compressionEnabled, String subpartitionType) {
+                boolean compressionEnabled, String subpartitionType, boolean 
isSortShuffle) {
             Configuration configuration = super.createConfiguration();
 
-            configuration.setInteger(
-                    
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
-                    Integer.MAX_VALUE);
+            if (isSortShuffle) {
+                configuration.setInteger(
+                        
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+            } else {
+                configuration.setInteger(
+                        
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                        Integer.MAX_VALUE);
+            }
             configuration.setBoolean(
                     
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
                     compressionEnabled);
@@ -114,7 +129,7 @@ public class BlockingPartitionBenchmark extends 
BenchmarkBase {
             extends BlockingPartitionEnvironmentContext {
         @Override
         protected Configuration createConfiguration() {
-            return createConfiguration(false, "file");
+            return createConfiguration(false, "file", false);
         }
     }
 
@@ -122,7 +137,7 @@ public class BlockingPartitionBenchmark extends 
BenchmarkBase {
             extends BlockingPartitionEnvironmentContext {
         @Override
         protected Configuration createConfiguration() {
-            return createConfiguration(true, "file");
+            return createConfiguration(true, "file", false);
         }
     }
 
@@ -130,7 +145,23 @@ public class BlockingPartitionBenchmark extends 
BenchmarkBase {
             extends BlockingPartitionEnvironmentContext {
         @Override
         protected Configuration createConfiguration() {
-            return createConfiguration(false, "mmap");
+            return createConfiguration(false, "mmap", false);
+        }
+    }
+
+    public static class CompressedSortEnvironmentContext
+            extends BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(true, "file", true);
+        }
+    }
+
+    public static class UncompressedSortEnvironmentContext
+            extends BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(false, "file", true);
         }
     }
 }
diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 09fa02b..ee69eb7 100644
--- 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -49,7 +49,15 @@ public class BlockingPartitionRemoteChannelBenchmark extends 
RemoteBenchmarkBase
     }
 
     @Benchmark
-    public void remoteFilePartition(BlockingPartitionEnvironmentContext 
context) throws Exception {
+    public void remoteFilePartition(RemoteFileEnvironmentContext context) 
throws Exception {
+        StreamGraph streamGraph =
+                StreamGraphUtils.buildGraphForBatchJob(context.env, 
RECORDS_PER_INVOCATION);
+        context.miniCluster.executeJobBlocking(
+                StreamingJobGraphGenerator.createJobGraph(streamGraph));
+    }
+
+    @Benchmark
+    public void remoteSortPartition(RemoteSortEnvironmentContext context) 
throws Exception {
         StreamGraph streamGraph =
                 StreamGraphUtils.buildGraphForBatchJob(context.env, 
RECORDS_PER_INVOCATION);
         context.miniCluster.executeJobBlocking(
@@ -67,13 +75,17 @@ public class BlockingPartitionRemoteChannelBenchmark 
extends RemoteBenchmarkBase
             env.setBufferTimeout(-1);
         }
 
-        @Override
-        protected Configuration createConfiguration() {
+        protected Configuration createConfiguration(boolean isSortShuffle) {
             Configuration configuration = super.createConfiguration();
 
-            configuration.setInteger(
-                    
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
-                    Integer.MAX_VALUE);
+            if (isSortShuffle) {
+                configuration.setInteger(
+                        
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+            } else {
+                configuration.setInteger(
+                        
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                        Integer.MAX_VALUE);
+            }
             configuration.setString(
                     
NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
             configuration.setString(
@@ -87,4 +99,18 @@ public class BlockingPartitionRemoteChannelBenchmark extends 
RemoteBenchmarkBase
             return NUM_VERTICES;
         }
     }
+
+    public static class RemoteFileEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(false);
+        }
+    }
+
+    public static class RemoteSortEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(true);
+        }
+    }
 }

Reply via email to