This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 9fb3482dc772cf19c87bd9e6d4a1709ada8ace65 Author: Zakelly <[email protected]> AuthorDate: Wed Dec 27 19:43:22 2023 +0800 [FLINK-30535] Use customized TtlTimeProvider --- .../flink/state/benchmark/StateBenchmarkBase.java | 5 ++ .../state/benchmark/ttl/TtlListStateBenchmark.java | 14 ++-- .../state/benchmark/ttl/TtlMapStateBenchmark.java | 7 ++ .../state/benchmark/ttl/TtlStateBenchmarkBase.java | 80 +++++++++++++++++++--- .../benchmark/ttl/TtlValueStateBenchmark.java | 8 +++ 5 files changed, 98 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java index 6b5e89e..99e9c48 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java @@ -23,6 +23,7 @@ import org.apache.flink.config.StateBenchmarkOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; @@ -62,6 +63,10 @@ public class StateBenchmarkBase extends BenchmarkBase { protected KeyedStateBackend<Long> keyedStateBackend; protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception { + return createKeyedStateBackend(TtlTimeProvider.DEFAULT); + } + + protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider ttlTimeProvider) throws Exception { Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf(); String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR); File dataDir = null; diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java index 4853c87..1977cf6 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java @@ -46,8 +46,7 @@ import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyC /** Implementation for list state benchmark testing. */ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { private final String STATE_NAME = "listState"; - private final ListStateDescriptor<Long> STATE_DESC = - configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); + private ListStateDescriptor<Long> stateDesc; private ListState<Long> listState; private List<Long> dummyLists; @@ -64,7 +63,8 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { @Setup public void setUp() throws Exception { keyedStateBackend = createKeyedStateBackend(); - listState = getListState(keyedStateBackend, STATE_DESC); + stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); + listState = getListState(keyedStateBackend, stateDesc); dummyLists = new ArrayList<>(listValueCount); for (int i = 0; i < listValueCount; ++i) { dummyLists.add(random.nextLong()); @@ -76,6 +76,7 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { public void setUpPerIteration() throws Exception { for (int i = 0; i < setupKeyCount; ++i) { keyedStateBackend.setCurrentKey((long) i); + setTtlWhenInitialization(); listState.add(random.nextLong()); } // make sure only one sst file left, so all get invocation will access this single file, @@ -84,15 +85,16 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend = (RocksDBKeyedStateBackend<Long>) keyedStateBackend; - compactState(rocksDBKeyedStateBackend, STATE_DESC); + compactState(rocksDBKeyedStateBackend, stateDesc); } + advanceTimePerIteration(); } @TearDown(Level.Iteration) public void tearDownPerIteration() throws Exception { applyToAllKeys( keyedStateBackend, - STATE_DESC, + stateDesc, (k, state) -> { keyedStateBackend.setCurrentKey(k); state.clear(); @@ -101,7 +103,7 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend = (RocksDBKeyedStateBackend<Long>) keyedStateBackend; - compactState(rocksDBKeyedStateBackend, STATE_DESC); + compactState(rocksDBKeyedStateBackend, stateDesc); } else { System.gc(); } diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java index ec2ccb1..772a103 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.state.benchmark.StateBenchmarkBase; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.infra.Blackhole; @@ -70,12 +71,18 @@ public class TtlMapStateBenchmark extends TtlStateBenchmarkBase { for (int i = 0; i < setupKeyCount; ++i) { keyedStateBackend.setCurrentKey((long) i); for (int j = 0; j < mapKeyCount; j++) { + setTtlWhenInitialization(); mapState.put(mapKeys.get(j), random.nextDouble()); } } keyIndex = new AtomicInteger(); } + @Setup(Level.Iteration) + public void setUpPerIteration() throws Exception { + advanceTimePerIteration(); + } + @Benchmark public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java index 12d62e9..45d28d2 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java @@ -1,8 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.state.benchmark.ttl; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.state.benchmark.StateBenchmarkBase; import org.openjdk.jmh.annotations.Param; @@ -11,38 +31,78 @@ import java.util.concurrent.TimeUnit; /** The base class for state tests with ttl. */ public class TtlStateBenchmarkBase extends StateBenchmarkBase { + private static final long initialTime = 1000000; + /** The expired time of ttl. */ public enum ExpiredTimeOptions { - /** 5 seconds. */ - Seconds5(5000), + /** Expire 3 percent of the initial keys per iteration. */ + Expire3PercentPerIteration(3), /** never expired but enable the ttl. */ - MaxTime(Long.MAX_VALUE); + NeverExpired(0); - private Time time; - ExpiredTimeOptions(long mills) { - time = Time.of(mills, TimeUnit.MILLISECONDS); + public long advanceTimePerIteration; + ExpiredTimeOptions(int expirePercentPerIteration) { + this.advanceTimePerIteration = initialTime * expirePercentPerIteration / 100; } } - @Param({"Seconds5", "MaxTime"}) - protected ExpiredTimeOptions expiredTime; + @Param({"Expire3PercentPerIteration", "NeverExpired"}) + protected ExpiredTimeOptions expiredOption; @Param({"OnCreateAndWrite", "OnReadAndWrite"}) protected StateTtlConfig.UpdateType updateType; - @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"}) + @Param({"NeverReturnExpired", "ReturnExpiredIfNotCleanedUp"}) protected StateTtlConfig.StateVisibility stateVisibility; + protected ControllableTtlTimeProvider timeProvider; + /** Configure the state descriptor with ttl. */ protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) { StateTtlConfig ttlConfig = - new StateTtlConfig.Builder(expiredTime.time) + new StateTtlConfig.Builder(Time.of(initialTime, TimeUnit.MILLISECONDS)) .setUpdateType(updateType) .setStateVisibility(stateVisibility) .build(); stateDescriptor.enableTimeToLive(ttlConfig); return stateDescriptor; } + + @Override + protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception { + timeProvider = new ControllableTtlTimeProvider(); + return createKeyedStateBackend(timeProvider); + } + + protected void setTtlWhenInitialization() { + timeProvider.setCurrentTimestamp(random.nextLong(initialTime + 1)); + } + + protected void finishInitialization() { + timeProvider.setCurrentTimestamp(initialTime); + } + + protected void advanceTimePerIteration() { + timeProvider.advanceTimestamp(expiredOption.advanceTimePerIteration); + } + + static class ControllableTtlTimeProvider implements TtlTimeProvider { + + long current = 0L; + + @Override + public long currentTimestamp() { + return current; + } + + public void setCurrentTimestamp(long value) { + current = value; + } + + public void advanceTimestamp(long value) { + current += value; + } + } } diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java index 7b3ca27..ee34cfb 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java @@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; @@ -53,10 +54,17 @@ public class TtlValueStateBenchmark extends TtlStateBenchmarkBase { keyedStateBackend = createKeyedStateBackend(); valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class))); for (int i = 0; i < setupKeyCount; ++i) { + setTtlWhenInitialization(); keyedStateBackend.setCurrentKey((long) i); valueState.update(random.nextLong()); } keyIndex = new AtomicInteger(); + finishInitialization(); + } + + @Setup(Level.Iteration) + public void setUpPerIteration() throws Exception { + advanceTimePerIteration(); } @Benchmark
