This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new ddaafee  [FLINK-33122][benchmark] Support null checkpoint data path 
for rescale benchmarks
ddaafee is described below

commit ddaafee5469511f9f9b7a8ad5d77cb0f8a3b4b9c
Author: Zakelly <[email protected]>
AuthorDate: Wed Sep 20 14:58:35 2023 +0800

    [FLINK-33122][benchmark] Support null checkpoint data path for rescale 
benchmarks
---
 ...hMapStateBackendRescalingBenchmarkExecutor.java |  4 +---
 .../state/benchmark/RescalingBenchmarkBase.java    | 26 ++++++++++++++++++++++
 ...ksdbStateBackendRescalingBenchmarkExecutor.java |  4 +---
 3 files changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
index 2c5ff84..1ec6032 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
@@ -50,15 +50,13 @@ public class HashMapStateBackendRescalingBenchmarkExecutor 
extends RescalingBenc
     public void setUp() throws Exception {
         // FsStateBackend is deprecated in favor of HashMapStateBackend with 
setting checkpointStorage.
         HashMapStateBackend stateBackend = new HashMapStateBackend();
-        Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
-        String stateDataDirPath = 
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
         benchmark =
                 new RescalingBenchmarkBuilder<byte[]>()
                         .setMaxParallelism(128)
                         
.setParallelismBefore(rescaleType.getParallelismBefore())
                         .setParallelismAfter(rescaleType.getParallelismAfter())
                         .setCheckpointStorageAccess(
-                                new FileSystemCheckpointStorage(new 
URI("file://" + stateDataDirPath), 0)
+                                new FileSystemCheckpointStorage(new 
URI("file://" + prepareDirectory("rescaleDb").getAbsolutePath()), 0)
                                         .createCheckpointStorage(new JobID()))
                         .setStateBackend(stateBackend)
                         .setStreamRecordGenerator(new 
ByteArrayRecordGenerator(numberOfKeys, keyLen))
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
index 5d05d8b..ef03389 100644
--- a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.benchmark.BenchmarkBase;
+import org.apache.flink.config.ConfigUtil;
+import org.apache.flink.config.StateBenchmarkOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.state.benchmark.RescalingBenchmark;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -36,6 +38,10 @@ import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Random;
@@ -57,6 +63,26 @@ public class RescalingBenchmarkBase extends BenchmarkBase {
         new Runner(options).run();
     }
 
+    protected static File prepareDirectory(String prefix) throws IOException {
+        Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
+        String stateDataDirPath = 
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
+        File dataDir = null;
+        if (stateDataDirPath != null) {
+            dataDir = new File(stateDataDirPath);
+            if (!dataDir.exists()) {
+                Files.createDirectories(Paths.get(stateDataDirPath));
+            }
+        }
+        File target = File.createTempFile(prefix, "", dataDir);
+        if (target.exists() && !target.delete()) {
+            throw new IOException("Target dir {" + target.getAbsolutePath() + 
"} exists but failed to clean it up");
+        } else if (!target.mkdirs()) {
+            throw new IOException("Failed to create target directory: " + 
target.getAbsolutePath());
+        } else {
+            return target;
+        }
+    }
+
     @State(Scope.Thread)
     public enum RescaleType {
         RESCALE_OUT(1, 2, 0),
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
index 8cc8234..b552ad7 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
@@ -48,8 +48,6 @@ public class RocksdbStateBackendRescalingBenchmarkExecutor 
extends RescalingBenc
     @Setup(Level.Trial)
     public void setUp() throws Exception {
         EmbeddedRocksDBStateBackend stateBackend = new 
EmbeddedRocksDBStateBackend(true);
-        Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
-        String stateDataDirPath = 
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
         benchmark =
                 new RescalingBenchmarkBuilder<byte[]>()
                         .setMaxParallelism(128)
@@ -57,7 +55,7 @@ public class RocksdbStateBackendRescalingBenchmarkExecutor 
extends RescalingBenc
                         .setParallelismAfter(rescaleType.getParallelismAfter())
                         .setManagedMemorySize(512 * 1024 * 1024)
                         .setCheckpointStorageAccess(
-                                new FileSystemCheckpointStorage("file://" + 
stateDataDirPath)
+                                new FileSystemCheckpointStorage("file://" + 
prepareDirectory("rescaleDb").getAbsolutePath())
                                         .createCheckpointStorage(new JobID()))
                         .setStateBackend(stateBackend)
                         .setStreamRecordGenerator(new 
ByteArrayRecordGenerator(numberOfKeys, keyLen))

Reply via email to