This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 9da39cec93d65b0ef9770080f2823a6d140f1d2c
Author: Zakelly <[email protected]>
AuthorDate: Sat Dec 16 19:19:55 2023 +0800

    [FLINK-30535] Introduce TTL state based benchmarks
---
 .../flink/state/benchmark/StateBenchmarkBase.java  |  20 +--
 .../state/benchmark/StateBenchmarkConstants.java   |  28 ++--
 .../state/benchmark/ttl/TtlListStateBenchmark.java | 150 +++++++++++++++++++++
 .../state/benchmark/ttl/TtlMapStateBenchmark.java  | 120 +++++++++++++++++
 .../state/benchmark/ttl/TtlStateBenchmarkBase.java |  48 +++++++
 .../benchmark/ttl/TtlValueStateBenchmark.java      |  79 +++++++++++
 6 files changed, 421 insertions(+), 24 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index bc8bdb9..6b5e89e 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -53,13 +53,13 @@ import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeys
 /** Base implementation of the state benchmarks. */
 public class StateBenchmarkBase extends BenchmarkBase {
     // TODO: why AtomicInteger?
-    static AtomicInteger keyIndex;
-    final ThreadLocalRandom random = ThreadLocalRandom.current();
+    protected static AtomicInteger keyIndex;
+    protected final ThreadLocalRandom random = ThreadLocalRandom.current();
 
     @Param({"HEAP", "ROCKSDB", "ROCKSDB_CHANGELOG"})
-    private StateBackendBenchmarkUtils.StateBackendType backendType;
+    protected StateBackendBenchmarkUtils.StateBackendType backendType;
 
-    KeyedStateBackend<Long> keyedStateBackend;
+    protected KeyedStateBackend<Long> keyedStateBackend;
 
     protected KeyedStateBackend<Long> createKeyedStateBackend() throws 
Exception {
         Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
@@ -89,12 +89,12 @@ public class StateBenchmarkBase extends BenchmarkBase {
 
     @State(Scope.Thread)
     public static class KeyValue {
-        long newKey;
-        long setUpKey;
-        long mapKey;
-        double mapValue;
-        long value;
-        List<Long> listValue;
+        public long newKey;
+        public long setUpKey;
+        public long mapKey;
+        public double mapValue;
+        public long value;
+        public List<Long> listValue;
 
         @Setup(Level.Invocation)
         public void kvSetup() {
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
index 1bb9eed..c0a141f 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
@@ -26,22 +26,22 @@ import java.util.Random;
  * Constants for state benchmark tests. Also generates random keys/values in 
advance to avoid
  * possible affect of using {@link Random#nextLong()}
  */
-class StateBenchmarkConstants {
+public class StateBenchmarkConstants {
     // TODO: why all of those static fields? Those should be inside a context 
class
-    static final int mapKeyCount = 10;
-    static final int listValueCount = 100;
-    static final int setupKeyCount = 500_000;
-    static final String rootDirName = "benchmark";
-    static final String recoveryDirName = "localRecovery";
-    static final String dbDirName = "dbPath";
+    public static final int mapKeyCount = 10;
+    public static final int listValueCount = 100;
+    public static final int setupKeyCount = 500_000;
+    public static final String rootDirName = "benchmark";
+    public static final String recoveryDirName = "localRecovery";
+    public static final String dbDirName = "dbPath";
 
-    static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount);
-    static final ArrayList<Double> mapValues = new ArrayList<>(mapKeyCount);
-    static final ArrayList<Long> setupKeys = new ArrayList<>(setupKeyCount);
-    static final int newKeyCount = 500_000;
-    static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount);
-    static final int randomValueCount = 1_000_000;
-    static final ArrayList<Long> randomValues = new 
ArrayList<>(randomValueCount);
+    public static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount);
+    public static final ArrayList<Double> mapValues = new 
ArrayList<>(mapKeyCount);
+    public static final ArrayList<Long> setupKeys = new 
ArrayList<>(setupKeyCount);
+    public static final int newKeyCount = 500_000;
+    public static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount);
+    public static final int randomValueCount = 1_000_000;
+    public static final ArrayList<Long> randomValues = new 
ArrayList<>(randomValueCount);
 
     static {
         for (int i = 0; i < mapKeyCount; i++) {
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
new file mode 100644
index 0000000..4853c87
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for list state benchmark testing. */
+public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
+    private final String STATE_NAME = "listState";
+    private final ListStateDescriptor<Long> STATE_DESC =
+            configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+    private ListState<Long> listState;
+    private List<Long> dummyLists;
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + 
TtlListStateBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        keyedStateBackend = createKeyedStateBackend();
+        listState = getListState(keyedStateBackend, STATE_DESC);
+        dummyLists = new ArrayList<>(listValueCount);
+        for (int i = 0; i < listValueCount; ++i) {
+            dummyLists.add(random.nextLong());
+        }
+        keyIndex = new AtomicInteger();
+    }
+
+    @Setup(Level.Iteration)
+    public void setUpPerIteration() throws Exception {
+        for (int i = 0; i < setupKeyCount; ++i) {
+            keyedStateBackend.setCurrentKey((long) i);
+            listState.add(random.nextLong());
+        }
+        // make sure only one sst file left, so all get invocation will access 
this single file,
+        // to prevent the spike caused by different key distribution in 
multiple sst files,
+        // the more access to the older sst file, the lower throughput will be.
+        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
+            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
+                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
+            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+        }
+    }
+
+    @TearDown(Level.Iteration)
+    public void tearDownPerIteration() throws Exception {
+        applyToAllKeys(
+                keyedStateBackend,
+                STATE_DESC,
+                (k, state) -> {
+                    keyedStateBackend.setCurrentKey(k);
+                    state.clear();
+                });
+        // make the clearance effective, trigger compaction for RocksDB, and 
GC for heap.
+        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
+            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
+                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
+            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+        } else {
+            System.gc();
+        }
+        // wait a while for the clearance to take effect.
+        Thread.sleep(1000);
+    }
+
+    @Benchmark
+    public void listUpdate(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        listState.update(keyValue.listValue);
+    }
+
+    @Benchmark
+    public void listAdd(StateBenchmarkBase.KeyValue keyValue) throws Exception 
{
+        keyedStateBackend.setCurrentKey(keyValue.newKey);
+        listState.update(keyValue.listValue);
+    }
+
+    @Benchmark
+    public void listAppend(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        listState.add(keyValue.value);
+    }
+
+    @Benchmark
+    public Iterable<Long> listGet(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return listState.get();
+    }
+
+    @Benchmark
+    public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, 
Blackhole bh) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        Iterable<Long> iterable = listState.get();
+        for (Long value : iterable) {
+            bh.consume(value);
+        }
+    }
+
+    @Benchmark
+    public void listAddAll(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        listState.addAll(dummyLists);
+    }
+}
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
new file mode 100644
index 0000000..ec2ccb1
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for map state benchmark testing. */
+public class TtlMapStateBenchmark extends TtlStateBenchmarkBase {
+    private MapState<Long, Double> mapState;
+    private Map<Long, Double> dummyMaps;
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + 
TtlMapStateBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        keyedStateBackend = createKeyedStateBackend();
+        mapState =
+                getMapState(
+                        keyedStateBackend,
+                        configTtl(new MapStateDescriptor<>("mapState", 
Long.class, Double.class)));
+        dummyMaps = new HashMap<>(mapKeyCount);
+        for (int i = 0; i < mapKeyCount; ++i) {
+            dummyMaps.put(mapKeys.get(i), random.nextDouble());
+        }
+        for (int i = 0; i < setupKeyCount; ++i) {
+            keyedStateBackend.setCurrentKey((long) i);
+            for (int j = 0; j < mapKeyCount; j++) {
+                mapState.put(mapKeys.get(j), random.nextDouble());
+            }
+        }
+        keyIndex = new AtomicInteger();
+    }
+
+    @Benchmark
+    public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        mapState.put(keyValue.mapKey, keyValue.mapValue);
+    }
+
+    @Benchmark
+    public void mapAdd(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.newKey);
+        mapState.put(keyValue.mapKey, keyValue.mapValue);
+    }
+
+    @Benchmark
+    public Double mapGet(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return mapState.get(keyValue.mapKey);
+    }
+
+    @Benchmark
+    public boolean mapIsEmpty(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return mapState.isEmpty();
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(mapKeyCount)
+    public void mapIterator(StateBenchmarkBase.KeyValue keyValue, Blackhole 
bh) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, Double> entry = iterator.next();
+            bh.consume(entry.getKey());
+            bh.consume(entry.getValue());
+        }
+    }
+
+    @Benchmark
+    public void mapPutAll(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        mapState.putAll(dummyMaps);
+    }
+}
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
new file mode 100644
index 0000000..12d62e9
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
@@ -0,0 +1,48 @@
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Param;
+
+import java.util.concurrent.TimeUnit;
+
+/** The base class for state tests with ttl. */
+public class TtlStateBenchmarkBase extends StateBenchmarkBase {
+
+    /** The expired time of ttl. */
+    public enum ExpiredTimeOptions {
+
+        /** 5 seconds. */
+        Seconds5(5000),
+
+        /** never expired but enable the ttl. */
+        MaxTime(Long.MAX_VALUE);
+
+        private Time time;
+        ExpiredTimeOptions(long mills) {
+            time = Time.of(mills, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Param({"Seconds5", "MaxTime"})
+    protected ExpiredTimeOptions expiredTime;
+
+    @Param({"OnCreateAndWrite", "OnReadAndWrite"})
+    protected StateTtlConfig.UpdateType updateType;
+
+    @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"})
+    protected StateTtlConfig.StateVisibility stateVisibility;
+
+    /** Configure the state descriptor with ttl. */
+    protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) 
{
+        StateTtlConfig ttlConfig =
+                new StateTtlConfig.Builder(expiredTime.time)
+                        .setUpdateType(updateType)
+                        .setStateVisibility(stateVisibility)
+                        .build();
+        stateDescriptor.enableTimeToLive(ttlConfig);
+        return stateDescriptor;
+    }
+}
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
new file mode 100644
index 0000000..7b3ca27
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for listValue state benchmark testing. */
+public class TtlValueStateBenchmark extends TtlStateBenchmarkBase {
+    private ValueState<Long> valueState;
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + 
TtlValueStateBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        keyedStateBackend = createKeyedStateBackend();
+        valueState = getValueState(keyedStateBackend, configTtl(new 
ValueStateDescriptor<>("kvState", Long.class)));
+        for (int i = 0; i < setupKeyCount; ++i) {
+            keyedStateBackend.setCurrentKey((long) i);
+            valueState.update(random.nextLong());
+        }
+        keyIndex = new AtomicInteger();
+    }
+
+    @Benchmark
+    public void valueUpdate(KeyValue keyValue) throws IOException {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        valueState.update(keyValue.value);
+    }
+
+    @Benchmark
+    public void valueAdd(KeyValue keyValue) throws IOException {
+        keyedStateBackend.setCurrentKey(keyValue.newKey);
+        valueState.update(keyValue.value);
+    }
+
+    @Benchmark
+    public Long valueGet(KeyValue keyValue) throws IOException {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return valueState.value();
+    }
+}

Reply via email to