This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 9da39cec93d65b0ef9770080f2823a6d140f1d2c Author: Zakelly <[email protected]> AuthorDate: Sat Dec 16 19:19:55 2023 +0800 [FLINK-30535] Introduce TTL state based benchmarks --- .../flink/state/benchmark/StateBenchmarkBase.java | 20 +-- .../state/benchmark/StateBenchmarkConstants.java | 28 ++-- .../state/benchmark/ttl/TtlListStateBenchmark.java | 150 +++++++++++++++++++++ .../state/benchmark/ttl/TtlMapStateBenchmark.java | 120 +++++++++++++++++ .../state/benchmark/ttl/TtlStateBenchmarkBase.java | 48 +++++++ .../benchmark/ttl/TtlValueStateBenchmark.java | 79 +++++++++++ 6 files changed, 421 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java index bc8bdb9..6b5e89e 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java @@ -53,13 +53,13 @@ import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeys /** Base implementation of the state benchmarks. */ public class StateBenchmarkBase extends BenchmarkBase { // TODO: why AtomicInteger? - static AtomicInteger keyIndex; - final ThreadLocalRandom random = ThreadLocalRandom.current(); + protected static AtomicInteger keyIndex; + protected final ThreadLocalRandom random = ThreadLocalRandom.current(); @Param({"HEAP", "ROCKSDB", "ROCKSDB_CHANGELOG"}) - private StateBackendBenchmarkUtils.StateBackendType backendType; + protected StateBackendBenchmarkUtils.StateBackendType backendType; - KeyedStateBackend<Long> keyedStateBackend; + protected KeyedStateBackend<Long> keyedStateBackend; protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception { Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf(); @@ -89,12 +89,12 @@ public class StateBenchmarkBase extends BenchmarkBase { @State(Scope.Thread) public static class KeyValue { - long newKey; - long setUpKey; - long mapKey; - double mapValue; - long value; - List<Long> listValue; + public long newKey; + public long setUpKey; + public long mapKey; + public double mapValue; + public long value; + public List<Long> listValue; @Setup(Level.Invocation) public void kvSetup() { diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java index 1bb9eed..c0a141f 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java @@ -26,22 +26,22 @@ import java.util.Random; * Constants for state benchmark tests. Also generates random keys/values in advance to avoid * possible affect of using {@link Random#nextLong()} */ -class StateBenchmarkConstants { +public class StateBenchmarkConstants { // TODO: why all of those static fields? Those should be inside a context class - static final int mapKeyCount = 10; - static final int listValueCount = 100; - static final int setupKeyCount = 500_000; - static final String rootDirName = "benchmark"; - static final String recoveryDirName = "localRecovery"; - static final String dbDirName = "dbPath"; + public static final int mapKeyCount = 10; + public static final int listValueCount = 100; + public static final int setupKeyCount = 500_000; + public static final String rootDirName = "benchmark"; + public static final String recoveryDirName = "localRecovery"; + public static final String dbDirName = "dbPath"; - static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount); - static final ArrayList<Double> mapValues = new ArrayList<>(mapKeyCount); - static final ArrayList<Long> setupKeys = new ArrayList<>(setupKeyCount); - static final int newKeyCount = 500_000; - static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount); - static final int randomValueCount = 1_000_000; - static final ArrayList<Long> randomValues = new ArrayList<>(randomValueCount); + public static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount); + public static final ArrayList<Double> mapValues = new ArrayList<>(mapKeyCount); + public static final ArrayList<Long> setupKeys = new ArrayList<>(setupKeyCount); + public static final int newKeyCount = 500_000; + public static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount); + public static final int randomValueCount = 1_000_000; + public static final ArrayList<Long> randomValues = new ArrayList<>(randomValueCount); static { for (int i = 0; i < mapKeyCount; i++) { diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java new file mode 100644 index 0000000..4853c87 --- /dev/null +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for list state benchmark testing. */ +public class TtlListStateBenchmark extends TtlStateBenchmarkBase { + private final String STATE_NAME = "listState"; + private final ListStateDescriptor<Long> STATE_DESC = + configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); + private ListState<Long> listState; + private List<Long> dummyLists; + + public static void main(String[] args) throws RunnerException { + Options opt = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + TtlListStateBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(opt).run(); + } + + @Setup + public void setUp() throws Exception { + keyedStateBackend = createKeyedStateBackend(); + listState = getListState(keyedStateBackend, STATE_DESC); + dummyLists = new ArrayList<>(listValueCount); + for (int i = 0; i < listValueCount; ++i) { + dummyLists.add(random.nextLong()); + } + keyIndex = new AtomicInteger(); + } + + @Setup(Level.Iteration) + public void setUpPerIteration() throws Exception { + for (int i = 0; i < setupKeyCount; ++i) { + keyedStateBackend.setCurrentKey((long) i); + listState.add(random.nextLong()); + } + // make sure only one sst file left, so all get invocation will access this single file, + // to prevent the spike caused by different key distribution in multiple sst files, + // the more access to the older sst file, the lower throughput will be. + if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { + RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend = + (RocksDBKeyedStateBackend<Long>) keyedStateBackend; + compactState(rocksDBKeyedStateBackend, STATE_DESC); + } + } + + @TearDown(Level.Iteration) + public void tearDownPerIteration() throws Exception { + applyToAllKeys( + keyedStateBackend, + STATE_DESC, + (k, state) -> { + keyedStateBackend.setCurrentKey(k); + state.clear(); + }); + // make the clearance effective, trigger compaction for RocksDB, and GC for heap. + if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { + RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend = + (RocksDBKeyedStateBackend<Long>) keyedStateBackend; + compactState(rocksDBKeyedStateBackend, STATE_DESC); + } else { + System.gc(); + } + // wait a while for the clearance to take effect. + Thread.sleep(1000); + } + + @Benchmark + public void listUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + listState.update(keyValue.listValue); + } + + @Benchmark + public void listAdd(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.newKey); + listState.update(keyValue.listValue); + } + + @Benchmark + public void listAppend(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + listState.add(keyValue.value); + } + + @Benchmark + public Iterable<Long> listGet(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + return listState.get(); + } + + @Benchmark + public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + Iterable<Long> iterable = listState.get(); + for (Long value : iterable) { + bh.consume(value); + } + } + + @Benchmark + public void listAddAll(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + listState.addAll(dummyLists); + } +} diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java new file mode 100644 index 0000000..ec2ccb1 --- /dev/null +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for map state benchmark testing. */ +public class TtlMapStateBenchmark extends TtlStateBenchmarkBase { + private MapState<Long, Double> mapState; + private Map<Long, Double> dummyMaps; + + public static void main(String[] args) throws RunnerException { + Options opt = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + TtlMapStateBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(opt).run(); + } + + @Setup + public void setUp() throws Exception { + keyedStateBackend = createKeyedStateBackend(); + mapState = + getMapState( + keyedStateBackend, + configTtl(new MapStateDescriptor<>("mapState", Long.class, Double.class))); + dummyMaps = new HashMap<>(mapKeyCount); + for (int i = 0; i < mapKeyCount; ++i) { + dummyMaps.put(mapKeys.get(i), random.nextDouble()); + } + for (int i = 0; i < setupKeyCount; ++i) { + keyedStateBackend.setCurrentKey((long) i); + for (int j = 0; j < mapKeyCount; j++) { + mapState.put(mapKeys.get(j), random.nextDouble()); + } + } + keyIndex = new AtomicInteger(); + } + + @Benchmark + public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + mapState.put(keyValue.mapKey, keyValue.mapValue); + } + + @Benchmark + public void mapAdd(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.newKey); + mapState.put(keyValue.mapKey, keyValue.mapValue); + } + + @Benchmark + public Double mapGet(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + return mapState.get(keyValue.mapKey); + } + + @Benchmark + public boolean mapIsEmpty(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + return mapState.isEmpty(); + } + + @Benchmark + @OperationsPerInvocation(mapKeyCount) + public void mapIterator(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, Double> entry = iterator.next(); + bh.consume(entry.getKey()); + bh.consume(entry.getValue()); + } + } + + @Benchmark + public void mapPutAll(StateBenchmarkBase.KeyValue keyValue) throws Exception { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + mapState.putAll(dummyMaps); + } +} diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java new file mode 100644 index 0000000..12d62e9 --- /dev/null +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java @@ -0,0 +1,48 @@ +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Param; + +import java.util.concurrent.TimeUnit; + +/** The base class for state tests with ttl. */ +public class TtlStateBenchmarkBase extends StateBenchmarkBase { + + /** The expired time of ttl. */ + public enum ExpiredTimeOptions { + + /** 5 seconds. */ + Seconds5(5000), + + /** never expired but enable the ttl. */ + MaxTime(Long.MAX_VALUE); + + private Time time; + ExpiredTimeOptions(long mills) { + time = Time.of(mills, TimeUnit.MILLISECONDS); + } + } + + @Param({"Seconds5", "MaxTime"}) + protected ExpiredTimeOptions expiredTime; + + @Param({"OnCreateAndWrite", "OnReadAndWrite"}) + protected StateTtlConfig.UpdateType updateType; + + @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"}) + protected StateTtlConfig.StateVisibility stateVisibility; + + /** Configure the state descriptor with ttl. */ + protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) { + StateTtlConfig ttlConfig = + new StateTtlConfig.Builder(expiredTime.time) + .setUpdateType(updateType) + .setStateVisibility(stateVisibility) + .build(); + stateDescriptor.enableTimeToLive(ttlConfig); + return stateDescriptor; + } +} diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java new file mode 100644 index 0000000..7b3ca27 --- /dev/null +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for listValue state benchmark testing. */ +public class TtlValueStateBenchmark extends TtlStateBenchmarkBase { + private ValueState<Long> valueState; + + public static void main(String[] args) throws RunnerException { + Options opt = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + TtlValueStateBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(opt).run(); + } + + @Setup + public void setUp() throws Exception { + keyedStateBackend = createKeyedStateBackend(); + valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class))); + for (int i = 0; i < setupKeyCount; ++i) { + keyedStateBackend.setCurrentKey((long) i); + valueState.update(random.nextLong()); + } + keyIndex = new AtomicInteger(); + } + + @Benchmark + public void valueUpdate(KeyValue keyValue) throws IOException { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + valueState.update(keyValue.value); + } + + @Benchmark + public void valueAdd(KeyValue keyValue) throws IOException { + keyedStateBackend.setCurrentKey(keyValue.newKey); + valueState.update(keyValue.value); + } + + @Benchmark + public Long valueGet(KeyValue keyValue) throws IOException { + keyedStateBackend.setCurrentKey(keyValue.setUpKey); + return valueState.value(); + } +}
