This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d9b7b  [FLINK-23392] Exclude startup/shutdown time from benchmarks 
(#22)
70d9b7b is described below

commit 70d9b7b4927fc38ecf0950e55a47325b71e2dd63
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jul 20 10:00:41 2021 +0200

    [FLINK-23392] Exclude startup/shutdown time from benchmarks (#22)
    
    Sets up a MiniCluster in the FlinkEnvironmentContext, which all jobs will 
be run against.
    This avoids including MiniCluster setup costs in the benchmarked code.
---
 pom.xml                                            |  7 ++++
 .../flink/benchmark/FlinkEnvironmentContext.java   | 49 +++++++++++++++++++---
 .../UnalignedCheckpointTimeBenchmark.java          | 13 +++++-
 3 files changed, 62 insertions(+), 7 deletions(-)

diff --git a/pom.xml b/pom.xml
index 33caf48..fbfc080 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,13 @@ under the License.
                </dependency>
 
                <dependency>
+                       <!-- required for using a pre-defined MiniCluster -->
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
diff --git 
a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java 
b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index f206141..82201f9 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -20,12 +20,18 @@ package org.apache.flink.benchmark;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 
 import java.io.IOException;
 
@@ -37,14 +43,34 @@ public class FlinkEnvironmentContext {
     public static final int NUM_NETWORK_BUFFERS = 1000;
 
     public StreamExecutionEnvironment env;
+    public MiniCluster miniCluster;
 
     protected final int parallelism = 1;
     protected final boolean objectReuse = true;
 
     @Setup
     public void setUp() throws IOException {
+        if (miniCluster != null) {
+            throw new RuntimeException("setUp was called multiple times!");
+        }
+        final Configuration clusterConfig = createConfiguration();
+        miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder()
+            .setNumSlotsPerTaskManager(getNumberOfSlotsPerTaskManager())
+            .setNumTaskManagers(getNumberOfTaskManagers())
+            .setConfiguration(clusterConfig)
+            .build());
+
+        try {
+            miniCluster.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
         // set up the execution environment
-        env = getStreamExecutionEnvironment();
+        env = new StreamExecutionEnvironment(
+            new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+            clusterConfig,
+            null);
+
         env.setParallelism(parallelism);
         if (objectReuse) {
             env.getConfig().enableObjectReuse();
@@ -53,17 +79,30 @@ public class FlinkEnvironmentContext {
         env.setStateBackend(new MemoryStateBackend());
     }
 
+    @TearDown
+    public void tearDown() throws Exception {
+        miniCluster.close();
+        miniCluster = null;
+    }
+
+    protected int getNumberOfTaskManagers() {
+        return 1;
+    }
+
+    protected int getNumberOfSlotsPerTaskManager() {
+        return 4;
+    }
+
     public void execute() throws Exception {
         env.execute();
     }
 
     protected Configuration createConfiguration() {
         final Configuration configuration = new Configuration();
+        configuration.setString(RestOptions.BIND_PORT, "0");
         
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
NUM_NETWORK_BUFFERS);
+        configuration.set(DeploymentOptions.TARGET, 
MiniClusterPipelineExecutorServiceLoader.NAME);
+        configuration.set(DeploymentOptions.ATTACHED, true);
         return configuration;
     }
-
-    private StreamExecutionEnvironment getStreamExecutionEnvironment() {
-        return StreamExecutionEnvironment.createLocalEnvironment(1, 
createConfiguration());
-    }
 }
diff --git 
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index 58b3e1c..e328b85 100644
--- 
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -109,11 +109,20 @@ public class UnalignedCheckpointTimeBenchmark extends 
BenchmarkBase {
             }
         }
 
+        @Override
+        protected int getNumberOfSlotsPerTaskManager() {
+            return 1;
+        }
+
+        @Override
+        protected int getNumberOfTaskManagers() {
+            // why is this using PARALLELISM when we don't actually use it?
+            return NUM_VERTICES * PARALLELISM;
+        }
+
         protected Configuration createConfiguration() {
             Configuration conf = super.createConfiguration();
-            conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
             conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, new 
MemorySize(1024 * 4));
-            conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_VERTICES * PARALLELISM);
             return conf;
         }
     }

Reply via email to