This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch exclude_startup
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 610c0c06795850cdb51659058cd0534782e917ef
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jul 14 11:39:28 2021 +0200

    Exclude startup/shutdown from benchmarks
---
 pom.xml                                            |  6 +++++
 .../benchmark/BlockingPartitionBenchmark.java      |  2 +-
 .../BlockingPartitionRemoteChannelBenchmark.java   |  2 +-
 .../flink/benchmark/FlinkEnvironmentContext.java   | 30 +++++++++++++++++-----
 .../benchmark/MemoryStateBackendBenchmark.java     |  2 +-
 .../benchmark/RocksStateBackendBenchmark.java      |  2 +-
 .../flink/benchmark/StateBackendBenchmarkBase.java |  2 +-
 .../UnalignedCheckpointTimeBenchmark.java          |  2 +-
 .../benchmark/full/PojoSerializationBenchmark.java |  2 +-
 .../full/StringSerializationBenchmark.java         |  2 +-
 10 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9c0778d..ee44284 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,6 +191,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <type>test-jar</type>
diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 598c8aa..bff9f5c 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -88,7 +88,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase 
{
                private final int parallelism = 4;
 
                @Setup
-               public void setUp() throws IOException {
+               public void setUp() throws Exception {
                        super.setUp();
 
                        env.setParallelism(parallelism);
diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index b8b9572..d9e6319 100644
--- 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -67,7 +67,7 @@ public class BlockingPartitionRemoteChannelBenchmark extends 
RemoteBenchmarkBase
     public static class BlockingPartitionEnvironmentContext extends 
FlinkEnvironmentContext {
 
         @Setup
-        public void setUp() throws IOException {
+        public void setUp() throws Exception {
             super.setUp();
 
             env.setParallelism(PARALLELISM);
diff --git 
a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java 
b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index f206141..6224395 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -20,12 +20,18 @@ package org.apache.flink.benchmark;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 
 import java.io.IOException;
 
@@ -37,14 +43,23 @@ public class FlinkEnvironmentContext {
     public static final int NUM_NETWORK_BUFFERS = 1000;
 
     public StreamExecutionEnvironment env;
+    public MiniCluster miniCluster;
 
     protected final int parallelism = 1;
     protected final boolean objectReuse = true;
 
     @Setup
-    public void setUp() throws IOException {
+    public void setUp() throws Exception {
+        final Configuration clusterConfig = createConfiguration();
+        miniCluster = new MiniCluster(new 
MiniClusterConfiguration.Builder().setConfiguration(clusterConfig).build());
+        miniCluster.start();
+
         // set up the execution environment
-        env = getStreamExecutionEnvironment();
+        env = new StreamExecutionEnvironment(
+            new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+            clusterConfig,
+            null);
+
         env.setParallelism(parallelism);
         if (objectReuse) {
             env.getConfig().enableObjectReuse();
@@ -53,17 +68,20 @@ public class FlinkEnvironmentContext {
         env.setStateBackend(new MemoryStateBackend());
     }
 
+    @TearDown
+    public void tearDown() throws Exception {
+        miniCluster.close();
+    }
+
     public void execute() throws Exception {
         env.execute();
     }
 
     protected Configuration createConfiguration() {
         final Configuration configuration = new Configuration();
+        configuration.set(RestOptions.BIND_PORT, "0");
         
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
NUM_NETWORK_BUFFERS);
+        configuration.set(DeploymentOptions.TARGET, 
MiniClusterPipelineExecutorServiceLoader.NAME);
         return configuration;
     }
-
-    private StreamExecutionEnvironment getStreamExecutionEnvironment() {
-        return StreamExecutionEnvironment.createLocalEnvironment(1, 
createConfiguration());
-    }
 }
diff --git 
a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
index 09c0095..578b1a8 100644
--- a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
@@ -64,7 +64,7 @@ public class MemoryStateBackendBenchmark extends 
StateBackendBenchmarkBase {
                public StateBackend stateBackend = StateBackend.MEMORY;
 
                @Setup
-               public void setUp() throws IOException {
+               public void setUp() throws Exception {
                        super.setUp(stateBackend, RECORDS_PER_INVOCATION);
                }
 
diff --git 
a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
index 4dc4421..84ed6ed 100644
--- a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
@@ -67,7 +67,7 @@ public class RocksStateBackendBenchmark extends 
StateBackendBenchmarkBase {
                public StateBackend stateBackend = StateBackend.MEMORY;
 
                @Setup
-               public void setUp() throws IOException {
+               public void setUp() throws Exception {
                        super.setUp(stateBackend, RECORDS_PER_INVOCATION);
                }
 
diff --git 
a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java 
b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
index cbbbd05..60ba8bf 100644
--- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
@@ -56,7 +56,7 @@ public class StateBackendBenchmarkBase extends BenchmarkBase {
                        }
                }
 
-               public void setUp(StateBackend stateBackend, long 
recordsPerInvocation) throws IOException {
+               public void setUp(StateBackend stateBackend, long 
recordsPerInvocation) throws Exception {
                        super.setUp();
 
                        final AbstractStateBackend backend;
diff --git 
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index 093ad53..741dc4e 100644
--- 
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -95,7 +95,7 @@ public class UnalignedCheckpointTimeBenchmark extends 
BenchmarkBase {
         public String timeout = "0";
 
         @Setup
-        public void setUp() throws IOException {
+        public void setUp() throws Exception {
             super.setUp();
 
             env.setParallelism(parallelism);
diff --git 
a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
index af77931..8951217 100644
--- 
a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
@@ -63,7 +63,7 @@ public class PojoSerializationBenchmark extends BenchmarkBase 
{
 
 
     @Setup
-    public void setup() throws IOException {
+    public void setUp() throws Exception {
         pojo = new SerializationFrameworkMiniBenchmarks.MyPojo(
                 0,
                 "myName",
diff --git 
a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
index ef7ca5d..729bf7b 100644
--- 
a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
@@ -72,7 +72,7 @@ public class StringSerializationBenchmark extends 
BenchmarkBase {
     DataInputView serializedStream;
 
     @Setup
-    public void setup() throws IOException {
+    public void setUp() throws Exception {
         length = Integer.parseInt(lengthStr);
         switch (type) {
             case "ascii":

Reply via email to