Zakelly commented on code in PR #83:
URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1435779742


##########
src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java:
##########
@@ -3,46 +3,102 @@
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
 import org.openjdk.jmh.annotations.Param;
 
+import java.lang.reflect.Field;
 import java.util.concurrent.TimeUnit;
 
 /** The base class for state tests with ttl. */
 public class TtlStateBenchmarkBase extends StateBenchmarkBase {
 
+    private static final long initialTime = 1000000;
+
     /** The expired time of ttl. */
     public enum ExpiredTimeOptions {
 
-        /** 5 seconds. */
-        Seconds5(5000),
+        /** Expire 3 percent of the initial keys per iteration. */
+        ExpirePercent3PerIteration(3),
 
         /** never expired but enable the ttl. */
-        MaxTime(Long.MAX_VALUE);
+        NeverExpired(0);
 
-        private Time time;
-        ExpiredTimeOptions(long mills) {
-            time = Time.of(mills, TimeUnit.MILLISECONDS);
+        public long advanceTimePerIteration;
+        ExpiredTimeOptions(int expirePercentPerIteration) {
+            this.advanceTimePerIteration = initialTime * 
expirePercentPerIteration / 100;
         }
     }
 
-    @Param({"Seconds5", "MaxTime"})
-    protected ExpiredTimeOptions expiredTime;
+    @Param({"ExpirePercent3PerIteration", "NeverExpired"})
+    protected ExpiredTimeOptions expiredOption;
 
     @Param({"OnCreateAndWrite", "OnReadAndWrite"})
     protected StateTtlConfig.UpdateType updateType;
 
-    @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"})
+    @Param({"NeverReturnExpired"})
     protected StateTtlConfig.StateVisibility stateVisibility;
 
+    protected ControllableTtlTimeProvider timeProvider;
+
     /** Configure the state descriptor with ttl. */
     protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) 
{
         StateTtlConfig ttlConfig =
-                new StateTtlConfig.Builder(expiredTime.time)
+                new StateTtlConfig.Builder(Time.of(initialTime, 
TimeUnit.MILLISECONDS))
                         .setUpdateType(updateType)
                         .setStateVisibility(stateVisibility)
                         .build();
         stateDescriptor.enableTimeToLive(ttlConfig);
         return stateDescriptor;
     }
+
+    protected KeyedStateBackend<Long> createKeyedStateBackend() throws 
Exception {
+        KeyedStateBackend<Long> originalBackend = 
super.createKeyedStateBackend();
+        timeProvider = new ControllableTtlTimeProvider();
+        if (originalBackend instanceof AbstractKeyedStateBackend) {
+            // TODO: Config TtlTimeProvider in proper way.

Review Comment:
   Sure, I make a PR for this https://github.com/apache/flink/pull/23985



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to