This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 9fb3482dc772cf19c87bd9e6d4a1709ada8ace65
Author: Zakelly <[email protected]>
AuthorDate: Wed Dec 27 19:43:22 2023 +0800

    [FLINK-30535] Use customized TtlTimeProvider
---
 .../flink/state/benchmark/StateBenchmarkBase.java  |  5 ++
 .../state/benchmark/ttl/TtlListStateBenchmark.java | 14 ++--
 .../state/benchmark/ttl/TtlMapStateBenchmark.java  |  7 ++
 .../state/benchmark/ttl/TtlStateBenchmarkBase.java | 80 +++++++++++++++++++---
 .../benchmark/ttl/TtlValueStateBenchmark.java      |  8 +++
 5 files changed, 98 insertions(+), 16 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index 6b5e89e..99e9c48 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.config.StateBenchmarkOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
@@ -62,6 +63,10 @@ public class StateBenchmarkBase extends BenchmarkBase {
     protected KeyedStateBackend<Long> keyedStateBackend;
 
     protected KeyedStateBackend<Long> createKeyedStateBackend() throws 
Exception {
+        return createKeyedStateBackend(TtlTimeProvider.DEFAULT);
+    }
+
+    protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider 
ttlTimeProvider) throws Exception {
         Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
         String stateDataDirPath = 
benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
         File dataDir = null;
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
index 4853c87..1977cf6 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -46,8 +46,7 @@ import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyC
 /** Implementation for list state benchmark testing. */
 public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
     private final String STATE_NAME = "listState";
-    private final ListStateDescriptor<Long> STATE_DESC =
-            configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+    private ListStateDescriptor<Long> stateDesc;
     private ListState<Long> listState;
     private List<Long> dummyLists;
 
@@ -64,7 +63,8 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
     @Setup
     public void setUp() throws Exception {
         keyedStateBackend = createKeyedStateBackend();
-        listState = getListState(keyedStateBackend, STATE_DESC);
+        stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, 
Long.class));
+        listState = getListState(keyedStateBackend, stateDesc);
         dummyLists = new ArrayList<>(listValueCount);
         for (int i = 0; i < listValueCount; ++i) {
             dummyLists.add(random.nextLong());
@@ -76,6 +76,7 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
     public void setUpPerIteration() throws Exception {
         for (int i = 0; i < setupKeyCount; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
+            setTtlWhenInitialization();
             listState.add(random.nextLong());
         }
         // make sure only one sst file left, so all get invocation will access 
this single file,
@@ -84,15 +85,16 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
         if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
             RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
                     (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+            compactState(rocksDBKeyedStateBackend, stateDesc);
         }
+        advanceTimePerIteration();
     }
 
     @TearDown(Level.Iteration)
     public void tearDownPerIteration() throws Exception {
         applyToAllKeys(
                 keyedStateBackend,
-                STATE_DESC,
+                stateDesc,
                 (k, state) -> {
                     keyedStateBackend.setCurrentKey(k);
                     state.clear();
@@ -101,7 +103,7 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
         if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
             RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
                     (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+            compactState(rocksDBKeyedStateBackend, stateDesc);
         } else {
             System.gc();
         }
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
index ec2ccb1..772a103 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.infra.Blackhole;
@@ -70,12 +71,18 @@ public class TtlMapStateBenchmark extends 
TtlStateBenchmarkBase {
         for (int i = 0; i < setupKeyCount; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
             for (int j = 0; j < mapKeyCount; j++) {
+                setTtlWhenInitialization();
                 mapState.put(mapKeys.get(j), random.nextDouble());
             }
         }
         keyIndex = new AtomicInteger();
     }
 
+    @Setup(Level.Iteration)
+    public void setUpPerIteration() throws Exception {
+        advanceTimePerIteration();
+    }
+
     @Benchmark
     public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws 
Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
index 12d62e9..45d28d2 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
@@ -1,8 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.state.benchmark.ttl;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
 import org.openjdk.jmh.annotations.Param;
 
@@ -11,38 +31,78 @@ import java.util.concurrent.TimeUnit;
 /** The base class for state tests with ttl. */
 public class TtlStateBenchmarkBase extends StateBenchmarkBase {
 
+    private static final long initialTime = 1000000;
+
     /** The expired time of ttl. */
     public enum ExpiredTimeOptions {
 
-        /** 5 seconds. */
-        Seconds5(5000),
+        /** Expire 3 percent of the initial keys per iteration. */
+        Expire3PercentPerIteration(3),
 
         /** never expired but enable the ttl. */
-        MaxTime(Long.MAX_VALUE);
+        NeverExpired(0);
 
-        private Time time;
-        ExpiredTimeOptions(long mills) {
-            time = Time.of(mills, TimeUnit.MILLISECONDS);
+        public long advanceTimePerIteration;
+        ExpiredTimeOptions(int expirePercentPerIteration) {
+            this.advanceTimePerIteration = initialTime * 
expirePercentPerIteration / 100;
         }
     }
 
-    @Param({"Seconds5", "MaxTime"})
-    protected ExpiredTimeOptions expiredTime;
+    @Param({"Expire3PercentPerIteration", "NeverExpired"})
+    protected ExpiredTimeOptions expiredOption;
 
     @Param({"OnCreateAndWrite", "OnReadAndWrite"})
     protected StateTtlConfig.UpdateType updateType;
 
-    @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"})
+    @Param({"NeverReturnExpired", "ReturnExpiredIfNotCleanedUp"})
     protected StateTtlConfig.StateVisibility stateVisibility;
 
+    protected ControllableTtlTimeProvider timeProvider;
+
     /** Configure the state descriptor with ttl. */
     protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) 
{
         StateTtlConfig ttlConfig =
-                new StateTtlConfig.Builder(expiredTime.time)
+                new StateTtlConfig.Builder(Time.of(initialTime, 
TimeUnit.MILLISECONDS))
                         .setUpdateType(updateType)
                         .setStateVisibility(stateVisibility)
                         .build();
         stateDescriptor.enableTimeToLive(ttlConfig);
         return stateDescriptor;
     }
+
+    @Override
+    protected KeyedStateBackend<Long> createKeyedStateBackend() throws 
Exception {
+        timeProvider = new ControllableTtlTimeProvider();
+        return createKeyedStateBackend(timeProvider);
+    }
+
+    protected void setTtlWhenInitialization() {
+        timeProvider.setCurrentTimestamp(random.nextLong(initialTime + 1));
+    }
+
+    protected void finishInitialization() {
+        timeProvider.setCurrentTimestamp(initialTime);
+    }
+
+    protected void advanceTimePerIteration() {
+        timeProvider.advanceTimestamp(expiredOption.advanceTimePerIteration);
+    }
+
+    static class ControllableTtlTimeProvider implements TtlTimeProvider {
+
+        long current = 0L;
+
+        @Override
+        public long currentTimestamp() {
+            return current;
+        }
+
+        public void setCurrentTimestamp(long value) {
+            current = value;
+        }
+
+        public void advanceTimestamp(long value) {
+            current += value;
+        }
+    }
 }
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
index 7b3ca27..ee34cfb 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
@@ -53,10 +54,17 @@ public class TtlValueStateBenchmark extends 
TtlStateBenchmarkBase {
         keyedStateBackend = createKeyedStateBackend();
         valueState = getValueState(keyedStateBackend, configTtl(new 
ValueStateDescriptor<>("kvState", Long.class)));
         for (int i = 0; i < setupKeyCount; ++i) {
+            setTtlWhenInitialization();
             keyedStateBackend.setCurrentKey((long) i);
             valueState.update(random.nextLong());
         }
         keyIndex = new AtomicInteger();
+        finishInitialization();
+    }
+
+    @Setup(Level.Iteration)
+    public void setUpPerIteration() throws Exception {
+        advanceTimePerIteration();
     }
 
     @Benchmark

Reply via email to