This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new 70d9b7b [FLINK-23392] Exclude startup/shutdown time from benchmarks
(#22)
70d9b7b is described below
commit 70d9b7b4927fc38ecf0950e55a47325b71e2dd63
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jul 20 10:00:41 2021 +0200
[FLINK-23392] Exclude startup/shutdown time from benchmarks (#22)
Sets up a MiniCluster in the FlinkEnvironmentContext, which all jobs will
be run against.
This avoids including MiniCluster setup costs in the benchmarked code.
---
pom.xml | 7 ++++
.../flink/benchmark/FlinkEnvironmentContext.java | 49 +++++++++++++++++++---
.../UnalignedCheckpointTimeBenchmark.java | 13 +++++-
3 files changed, 62 insertions(+), 7 deletions(-)
diff --git a/pom.xml b/pom.xml
index 33caf48..fbfc080 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,13 @@ under the License.
</dependency>
<dependency>
+ <!-- required for using a pre-defined MiniCluster -->
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
diff --git
a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index f206141..82201f9 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -20,12 +20,18 @@ package org.apache.flink.benchmark;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
import java.io.IOException;
@@ -37,14 +43,34 @@ public class FlinkEnvironmentContext {
public static final int NUM_NETWORK_BUFFERS = 1000;
public StreamExecutionEnvironment env;
+ public MiniCluster miniCluster;
protected final int parallelism = 1;
protected final boolean objectReuse = true;
@Setup
public void setUp() throws IOException {
+ if (miniCluster != null) {
+ throw new RuntimeException("setUp was called multiple times!");
+ }
+ final Configuration clusterConfig = createConfiguration();
+ miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder()
+ .setNumSlotsPerTaskManager(getNumberOfSlotsPerTaskManager())
+ .setNumTaskManagers(getNumberOfTaskManagers())
+ .setConfiguration(clusterConfig)
+ .build());
+
+ try {
+ miniCluster.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
// set up the execution environment
- env = getStreamExecutionEnvironment();
+ env = new StreamExecutionEnvironment(
+ new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+ clusterConfig,
+ null);
+
env.setParallelism(parallelism);
if (objectReuse) {
env.getConfig().enableObjectReuse();
@@ -53,17 +79,30 @@ public class FlinkEnvironmentContext {
env.setStateBackend(new MemoryStateBackend());
}
+ @TearDown
+ public void tearDown() throws Exception {
+ miniCluster.close();
+ miniCluster = null;
+ }
+
+ protected int getNumberOfTaskManagers() {
+ return 1;
+ }
+
+ protected int getNumberOfSlotsPerTaskManager() {
+ return 4;
+ }
+
public void execute() throws Exception {
env.execute();
}
protected Configuration createConfiguration() {
final Configuration configuration = new Configuration();
+ configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS,
NUM_NETWORK_BUFFERS);
+ configuration.set(DeploymentOptions.TARGET,
MiniClusterPipelineExecutorServiceLoader.NAME);
+ configuration.set(DeploymentOptions.ATTACHED, true);
return configuration;
}
-
- private StreamExecutionEnvironment getStreamExecutionEnvironment() {
- return StreamExecutionEnvironment.createLocalEnvironment(1,
createConfiguration());
- }
}
diff --git
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index 58b3e1c..e328b85 100644
---
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -109,11 +109,20 @@ public class UnalignedCheckpointTimeBenchmark extends
BenchmarkBase {
}
}
+ @Override
+ protected int getNumberOfSlotsPerTaskManager() {
+ return 1;
+ }
+
+ @Override
+ protected int getNumberOfTaskManagers() {
+ // why is this using PARALLELISM when we don't actually use it?
+ return NUM_VERTICES * PARALLELISM;
+ }
+
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
- conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, new
MemorySize(1024 * 4));
- conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
NUM_VERTICES * PARALLELISM);
return conf;
}
}