This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch exclude_startup in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 610c0c06795850cdb51659058cd0534782e917ef Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Jul 14 11:39:28 2021 +0200 Exclude startup/shutdown from benchmarks --- pom.xml | 6 +++++ .../benchmark/BlockingPartitionBenchmark.java | 2 +- .../BlockingPartitionRemoteChannelBenchmark.java | 2 +- .../flink/benchmark/FlinkEnvironmentContext.java | 30 +++++++++++++++++----- .../benchmark/MemoryStateBackendBenchmark.java | 2 +- .../benchmark/RocksStateBackendBenchmark.java | 2 +- .../flink/benchmark/StateBackendBenchmarkBase.java | 2 +- .../UnalignedCheckpointTimeBenchmark.java | 2 +- .../benchmark/full/PojoSerializationBenchmark.java | 2 +- .../full/StringSerializationBenchmark.java | 2 +- 10 files changed, 38 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 9c0778d..ee44284 100644 --- a/pom.xml +++ b/pom.xml @@ -191,6 +191,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-examples-batch_${scala.binary.version}</artifactId> <version>${flink.version}</version> <type>test-jar</type> diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java index 598c8aa..bff9f5c 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java @@ -88,7 +88,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase { private final int parallelism = 4; @Setup - public void setUp() throws IOException { + public void setUp() throws Exception { super.setUp(); env.setParallelism(parallelism); diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java index b8b9572..d9e6319 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java @@ -67,7 +67,7 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase public static class BlockingPartitionEnvironmentContext extends FlinkEnvironmentContext { @Setup - public void setUp() throws IOException { + public void setUp() throws Exception { super.setUp(); env.setParallelism(PARALLELISM); diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java index f206141..6224395 100644 --- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java +++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java @@ -20,12 +20,18 @@ package org.apache.flink.benchmark; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import java.io.IOException; @@ -37,14 +43,23 @@ public class FlinkEnvironmentContext { public static final int NUM_NETWORK_BUFFERS = 1000; public StreamExecutionEnvironment env; + public MiniCluster miniCluster; protected final int parallelism = 1; protected final boolean objectReuse = true; @Setup - public void setUp() throws IOException { + public void setUp() throws Exception { + final Configuration clusterConfig = createConfiguration(); + miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(clusterConfig).build()); + miniCluster.start(); + // set up the execution environment - env = getStreamExecutionEnvironment(); + env = new StreamExecutionEnvironment( + new MiniClusterPipelineExecutorServiceLoader(miniCluster), + clusterConfig, + null); + env.setParallelism(parallelism); if (objectReuse) { env.getConfig().enableObjectReuse(); @@ -53,17 +68,20 @@ public class FlinkEnvironmentContext { env.setStateBackend(new MemoryStateBackend()); } + @TearDown + public void tearDown() throws Exception { + miniCluster.close(); + } + public void execute() throws Exception { env.execute(); } protected Configuration createConfiguration() { final Configuration configuration = new Configuration(); + configuration.set(RestOptions.BIND_PORT, "0"); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS); + configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME); return configuration; } - - private StreamExecutionEnvironment getStreamExecutionEnvironment() { - return StreamExecutionEnvironment.createLocalEnvironment(1, createConfiguration()); - } } diff --git a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java index 09c0095..578b1a8 100644 --- a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java @@ -64,7 +64,7 @@ public class MemoryStateBackendBenchmark extends StateBackendBenchmarkBase { public StateBackend stateBackend = StateBackend.MEMORY; @Setup - public void setUp() throws IOException { + public void setUp() throws Exception { super.setUp(stateBackend, RECORDS_PER_INVOCATION); } diff --git a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java index 4dc4421..84ed6ed 100644 --- a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java @@ -67,7 +67,7 @@ public class RocksStateBackendBenchmark extends StateBackendBenchmarkBase { public StateBackend stateBackend = StateBackend.MEMORY; @Setup - public void setUp() throws IOException { + public void setUp() throws Exception { super.setUp(stateBackend, RECORDS_PER_INVOCATION); } diff --git a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java index cbbbd05..60ba8bf 100644 --- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java +++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java @@ -56,7 +56,7 @@ public class StateBackendBenchmarkBase extends BenchmarkBase { } } - public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws IOException { + public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws Exception { super.setUp(); final AbstractStateBackend backend; diff --git a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java index 093ad53..741dc4e 100644 --- a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java @@ -95,7 +95,7 @@ public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase { public String timeout = "0"; @Setup - public void setUp() throws IOException { + public void setUp() throws Exception { super.setUp(); env.setParallelism(parallelism); diff --git a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java index af77931..8951217 100644 --- a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java @@ -63,7 +63,7 @@ public class PojoSerializationBenchmark extends BenchmarkBase { @Setup - public void setup() throws IOException { + public void setUp() throws Exception { pojo = new SerializationFrameworkMiniBenchmarks.MyPojo( 0, "myName", diff --git a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java index ef7ca5d..729bf7b 100644 --- a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java @@ -72,7 +72,7 @@ public class StringSerializationBenchmark extends BenchmarkBase { DataInputView serializedStream; @Setup - public void setup() throws IOException { + public void setUp() throws Exception { length = Integer.parseInt(lengthStr); switch (type) { case "ascii":
