This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new ddaafee [FLINK-33122][benchmark] Support null checkpoint data path
for rescale benchmarks
ddaafee is described below
commit ddaafee5469511f9f9b7a8ad5d77cb0f8a3b4b9c
Author: Zakelly <[email protected]>
AuthorDate: Wed Sep 20 14:58:35 2023 +0800
[FLINK-33122][benchmark] Support null checkpoint data path for rescale
benchmarks
---
...hMapStateBackendRescalingBenchmarkExecutor.java | 4 +---
.../state/benchmark/RescalingBenchmarkBase.java | 26 ++++++++++++++++++++++
...ksdbStateBackendRescalingBenchmarkExecutor.java | 4 +---
3 files changed, 28 insertions(+), 6 deletions(-)
diff --git
a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
index 2c5ff84..1ec6032 100644
---
a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
+++
b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
@@ -50,15 +50,13 @@ public class HashMapStateBackendRescalingBenchmarkExecutor
extends RescalingBenc
public void setUp() throws Exception {
// FsStateBackend is deprecated in favor of HashMapStateBackend with
setting checkpointStorage.
HashMapStateBackend stateBackend = new HashMapStateBackend();
- Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
- String stateDataDirPath =
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
benchmark =
new RescalingBenchmarkBuilder<byte[]>()
.setMaxParallelism(128)
.setParallelismBefore(rescaleType.getParallelismBefore())
.setParallelismAfter(rescaleType.getParallelismAfter())
.setCheckpointStorageAccess(
- new FileSystemCheckpointStorage(new
URI("file://" + stateDataDirPath), 0)
+ new FileSystemCheckpointStorage(new
URI("file://" + prepareDirectory("rescaleDb").getAbsolutePath()), 0)
.createCheckpointStorage(new JobID()))
.setStateBackend(stateBackend)
.setStreamRecordGenerator(new
ByteArrayRecordGenerator(numberOfKeys, keyLen))
diff --git
a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
index 5d05d8b..ef03389 100644
--- a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.benchmark.BenchmarkBase;
+import org.apache.flink.config.ConfigUtil;
+import org.apache.flink.config.StateBenchmarkOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.state.benchmark.RescalingBenchmark;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -36,6 +38,10 @@ import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
@@ -57,6 +63,26 @@ public class RescalingBenchmarkBase extends BenchmarkBase {
new Runner(options).run();
}
+ protected static File prepareDirectory(String prefix) throws IOException {
+ Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
+ String stateDataDirPath =
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
+ File dataDir = null;
+ if (stateDataDirPath != null) {
+ dataDir = new File(stateDataDirPath);
+ if (!dataDir.exists()) {
+ Files.createDirectories(Paths.get(stateDataDirPath));
+ }
+ }
+ File target = File.createTempFile(prefix, "", dataDir);
+ if (target.exists() && !target.delete()) {
+ throw new IOException("Target dir {" + target.getAbsolutePath() +
"} exists but failed to clean it up");
+ } else if (!target.mkdirs()) {
+ throw new IOException("Failed to create target directory: " +
target.getAbsolutePath());
+ } else {
+ return target;
+ }
+ }
+
@State(Scope.Thread)
public enum RescaleType {
RESCALE_OUT(1, 2, 0),
diff --git
a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
index 8cc8234..b552ad7 100644
---
a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
+++
b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
@@ -48,8 +48,6 @@ public class RocksdbStateBackendRescalingBenchmarkExecutor
extends RescalingBenc
@Setup(Level.Trial)
public void setUp() throws Exception {
EmbeddedRocksDBStateBackend stateBackend = new
EmbeddedRocksDBStateBackend(true);
- Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
- String stateDataDirPath =
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
benchmark =
new RescalingBenchmarkBuilder<byte[]>()
.setMaxParallelism(128)
@@ -57,7 +55,7 @@ public class RocksdbStateBackendRescalingBenchmarkExecutor
extends RescalingBenc
.setParallelismAfter(rescaleType.getParallelismAfter())
.setManagedMemorySize(512 * 1024 * 1024)
.setCheckpointStorageAccess(
- new FileSystemCheckpointStorage("file://" +
stateDataDirPath)
+ new FileSystemCheckpointStorage("file://" +
prepareDirectory("rescaleDb").getAbsolutePath())
.createCheckpointStorage(new JobID()))
.setStateBackend(stateBackend)
.setStreamRecordGenerator(new
ByteArrayRecordGenerator(numberOfKeys, keyLen))