This is an automated email from the ASF dual-hosted git repository. panyuepeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit c05ca114382ba48a3b4d27d268bc753ed6844f8c Author: panyuepeng <[email protected]> AuthorDate: Wed Dec 17 13:13:18 2025 +0800 [FLINK-38801][benchmark] Adjust code lines format and corresponding namings to follow Google AOSP style. --- .../benchmark/BlockingPartitionBenchmark.java | 7 +- .../BlockingPartitionRemoteChannelBenchmark.java | 1 - .../benchmark/CheckpointEnvironmentContext.java | 6 +- .../ContinuousFileReaderOperatorBenchmark.java | 28 ++-- .../flink/benchmark/FlinkEnvironmentContext.java | 5 +- .../HighAvailabilityServiceBenchmark.java | 177 +++++++++++---------- .../apache/flink/benchmark/KeyByBenchmarks.java | 7 +- .../flink/benchmark/MultipleInputBenchmark.java | 3 +- .../flink/benchmark/ProcessingTimerBenchmark.java | 12 +- .../flink/benchmark/RemoteBenchmarkBase.java | 4 +- .../SerializationFrameworkMiniBenchmarks.java | 15 +- .../flink/benchmark/StateBackendBenchmarkBase.java | 7 +- .../benchmark/WatermarkAggregationBenchmark.java | 13 +- .../benchmark/full/PojoSerializationBenchmark.java | 4 +- .../full/SerializationFrameworkAllBenchmarks.java | 6 +- .../full/StringSerializationBenchmark.java | 18 ++- .../benchmark/functions/IntegerLongSource.java | 1 + .../flink/benchmark/functions/LongSourceType.java | 2 +- ...FailureAndRestartAllTasksBenchmarkExecutor.java | 3 +- ...DownstreamTasksInBatchJobBenchmarkExecutor.java | 7 +- ...hMapStateBackendRescalingBenchmarkExecutor.java | 26 ++- .../flink/state/benchmark/ListStateBenchmark.java | 24 +-- .../flink/state/benchmark/MapStateBenchmark.java | 26 +-- .../state/benchmark/RescalingBenchmarkBase.java | 12 +- ...ksdbStateBackendRescalingBenchmarkExecutor.java | 21 ++- .../flink/state/benchmark/StateBenchmarkBase.java | 39 ++--- .../state/benchmark/StateBenchmarkConstants.java | 56 +++---- .../flink/state/benchmark/ValueStateBenchmark.java | 4 +- .../state/benchmark/ttl/TtlListStateBenchmark.java | 18 ++- .../state/benchmark/ttl/TtlMapStateBenchmark.java | 21 +-- .../state/benchmark/ttl/TtlStateBenchmarkBase.java | 2 + .../benchmark/ttl/TtlValueStateBenchmark.java | 10 +- 32 files changed, 316 insertions(+), 269 deletions(-) diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java index 0159de1..db3962d 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java @@ -85,14 +85,13 @@ public class BlockingPartitionBenchmark extends BenchmarkBase { env.setBufferTimeout(-1); } - protected Configuration createConfiguration( - boolean compressionEnabled) { + protected Configuration createConfiguration(boolean compressionEnabled) { Configuration configuration = super.createConfiguration(); configuration.set( NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC, - compressionEnabled ? - NettyShuffleEnvironmentOptions.CompressionCodec.LZ4 + compressionEnabled + ? NettyShuffleEnvironmentOptions.CompressionCodec.LZ4 : NettyShuffleEnvironmentOptions.CompressionCodec.NONE); configuration.set( CoreOptions.TMP_DIRS, diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java index da411bb..9df44e3 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java @@ -20,7 +20,6 @@ package org.apache.flink.benchmark; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.util.FileUtils; diff --git a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java index 0295cdf..d7e5a29 100644 --- a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java +++ b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java @@ -98,8 +98,7 @@ public abstract class CheckpointEnvironmentContext extends FlinkEnvironmentConte TaskManagerOptions.MEMORY_SEGMENT_SIZE, CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE); config.set( - CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, - Duration.ofMillis(0)); + CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofMillis(0)); config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false); return config; }), @@ -110,8 +109,7 @@ public abstract class CheckpointEnvironmentContext extends FlinkEnvironmentConte TaskManagerOptions.MEMORY_SEGMENT_SIZE, CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE); config.set( - CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, - Duration.ofMillis(1)); + CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofMillis(1)); config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false); return config; }), diff --git a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java index be13691..c6d0ea1 100644 --- a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java @@ -27,9 +27,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import joptsimple.internal.Strings; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.runner.Runner; @@ -43,19 +43,19 @@ import java.util.concurrent.TimeoutException; @OperationsPerInvocation(value = ContinuousFileReaderOperatorBenchmark.RECORDS_PER_INVOCATION) public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase { - private static final int SPLITS_PER_INVOCATION = 100; - private static final int LINES_PER_SPLIT = 175_000; - public static final int RECORDS_PER_INVOCATION = SPLITS_PER_INVOCATION * LINES_PER_SPLIT; + private static final int splitsPerInvocation = 100; + private static final int linesPerSplit = 175_000; + public static final int RECORDS_PER_INVOCATION = splitsPerInvocation * linesPerSplit; - private static final TimestampedFileInputSplit SPLIT = + private static final TimestampedFileInputSplit split = new TimestampedFileInputSplit(0, 0, new Path("."), 0, 0, new String[] {}); - private static final String LINE = Strings.repeat('0', 10); + private static final String line = Strings.repeat('0', 10); // Source should wait until all elements reach sink. Otherwise, END_OF_INPUT is sent once all // splits are emitted. // Thus, all subsequent reads in ContinuousFileReaderOperator would be made in CLOSING state in // a simple while-true loop (MailboxExecutor.isIdle is always true). - private static OneShotLatch TARGET_COUNT_REACHED_LATCH = new OneShotLatch(); + private static OneShotLatch targetCountReachedLatch = new OneShotLatch(); public static void main(String[] args) throws RunnerException { Options options = @@ -73,7 +73,7 @@ public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase { @Benchmark public void readFileSplit(FlinkEnvironmentContext context) throws Exception { - TARGET_COUNT_REACHED_LATCH.reset(); + targetCountReachedLatch.reset(); StreamExecutionEnvironment env = context.env; env.enableCheckpointing(100) .setParallelism(1) @@ -93,15 +93,15 @@ public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase { @Override public void run(SourceContext<TimestampedFileInputSplit> ctx) { - while (isRunning && count < SPLITS_PER_INVOCATION) { + while (isRunning && count < splitsPerInvocation) { count++; synchronized (ctx.getCheckpointLock()) { - ctx.collect(SPLIT); + ctx.collect(split); } } while (isRunning) { try { - TARGET_COUNT_REACHED_LATCH.await(100, TimeUnit.MILLISECONDS); + targetCountReachedLatch.await(100, TimeUnit.MILLISECONDS); return; } catch (InterruptedException e) { if (!isRunning) { @@ -124,13 +124,13 @@ public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase { @Override public boolean reachedEnd() { - return count >= ContinuousFileReaderOperatorBenchmark.LINES_PER_SPLIT; + return count >= ContinuousFileReaderOperatorBenchmark.linesPerSplit; } @Override public String nextRecord(String s) { count++; - return LINE; + return line; } @Override @@ -151,7 +151,7 @@ public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase { @Override public void invoke(String value, Context context) { if (++count == RECORDS_PER_INVOCATION) { - TARGET_COUNT_REACHED_LATCH.trigger(); + targetCountReachedLatch.trigger(); } } } diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java index b51de81..c3fa19c 100644 --- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java +++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java @@ -103,11 +103,12 @@ public class FlinkEnvironmentContext { final Configuration configuration = new Configuration(); configuration.set(RestOptions.BIND_PORT, "0"); // no equivalent config available. - //configuration.setInteger( + // configuration.setInteger( // NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS); configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME); configuration.set(DeploymentOptions.ATTACHED, true); - // It doesn't make sense to wait for the final checkpoint in benchmarks since it only prolongs + // It doesn't make sense to wait for the final checkpoint in benchmarks since it only + // prolongs // the test but doesn't give any advantages. configuration.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); // TODO: remove this line after FLINK-28243 will be done diff --git a/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java b/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java index e49c61e..c91f693 100644 --- a/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java @@ -18,7 +18,6 @@ package org.apache.flink.benchmark; -import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -27,6 +26,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.FileUtils; + +import org.apache.curator.test.TestingServer; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; @@ -54,88 +55,94 @@ import static org.openjdk.jmh.annotations.Scope.Thread; */ @OutputTimeUnit(SECONDS) public class HighAvailabilityServiceBenchmark extends BenchmarkBase { - public static void main(String[] args) throws RunnerException { - Options options = - new OptionsBuilder() - .verbosity(VerboseMode.NORMAL) - .include(".*" + HighAvailabilityServiceBenchmark.class.getCanonicalName() + ".*") - .build(); - - new Runner(options).run(); - } - - @Benchmark - public void submitJobThroughput(HighAvailabilityContext context) throws Exception { - context.miniCluster.executeJobBlocking(buildNoOpJob()); - } - - private static JobGraph buildNoOpJob() { - JobGraph jobGraph = new JobGraph(JobID.generate(), UUID.randomUUID().toString()); - jobGraph.addVertex(createNoOpVertex()); - return jobGraph; - } - - private static JobVertex createNoOpVertex() { - JobVertex vertex = new JobVertex("v"); - vertex.setInvokableClass(NoOpInvokable.class); - vertex.setParallelism(1); - vertex.setMaxParallelism(1); - return vertex; - } - - @State(Thread) - public static class HighAvailabilityContext extends FlinkEnvironmentContext { - private TestingServer testingServer; - public final File haDir; - - @Param({"ZOOKEEPER", "NONE"}) - public HighAvailabilityMode highAvailabilityMode; - - public HighAvailabilityContext() { - try { - haDir = Files.createTempDirectory("bench-ha-").toFile(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void setUp() throws Exception { - if (isZookeeperHighAvailability()) { - testingServer = new TestingServer(); - testingServer.start(); - } - - // The method `super.setUp()` will call `createConfiguration()` to get Configuration and - // create a `MiniCluster`. We need to start TestingServer before `createConfiguration()`, - // then we can add zookeeper quorum in the configuration. So we can only start - // `TestingServer` before `super.setUp()`. - super.setUp(); - } - - private boolean isZookeeperHighAvailability() { - return highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER; - } - - @Override - protected Configuration createConfiguration() { - Configuration configuration = super.createConfiguration(); - configuration.set(HighAvailabilityOptions.HA_MODE, highAvailabilityMode.name()); - configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); - if (isZookeeperHighAvailability()) { - configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - } - return configuration; - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - if (isZookeeperHighAvailability()) { - testingServer.stop(); - testingServer.close(); - } - FileUtils.deleteDirectory(haDir); - } - } + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include( + ".*" + + HighAvailabilityServiceBenchmark.class.getCanonicalName() + + ".*") + .build(); + + new Runner(options).run(); + } + + @Benchmark + public void submitJobThroughput(HighAvailabilityContext context) throws Exception { + context.miniCluster.executeJobBlocking(buildNoOpJob()); + } + + private static JobGraph buildNoOpJob() { + JobGraph jobGraph = new JobGraph(JobID.generate(), UUID.randomUUID().toString()); + jobGraph.addVertex(createNoOpVertex()); + return jobGraph; + } + + private static JobVertex createNoOpVertex() { + JobVertex vertex = new JobVertex("v"); + vertex.setInvokableClass(NoOpInvokable.class); + vertex.setParallelism(1); + vertex.setMaxParallelism(1); + return vertex; + } + + @State(Thread) + public static class HighAvailabilityContext extends FlinkEnvironmentContext { + private TestingServer testingServer; + public final File haDir; + + @Param({"ZOOKEEPER", "NONE"}) + public HighAvailabilityMode highAvailabilityMode; + + public HighAvailabilityContext() { + try { + haDir = Files.createTempDirectory("bench-ha-").toFile(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void setUp() throws Exception { + if (isZookeeperHighAvailability()) { + testingServer = new TestingServer(); + testingServer.start(); + } + + // The method `super.setUp()` will call `createConfiguration()` to get Configuration and + // create a `MiniCluster`. We need to start TestingServer before + // `createConfiguration()`, + // then we can add zookeeper quorum in the configuration. So we can only start + // `TestingServer` before `super.setUp()`. + super.setUp(); + } + + private boolean isZookeeperHighAvailability() { + return highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER; + } + + @Override + protected Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + configuration.set(HighAvailabilityOptions.HA_MODE, highAvailabilityMode.name()); + configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); + if (isZookeeperHighAvailability()) { + configuration.set( + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, + testingServer.getConnectString()); + } + return configuration; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + if (isZookeeperHighAvailability()) { + testingServer.stop(); + testingServer.close(); + } + FileUtils.deleteDirectory(haDir); + } + } } diff --git a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java index c4cb907..149cacd 100644 --- a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java @@ -23,8 +23,8 @@ import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; - import org.apache.flink.streaming.util.keys.KeySelectorUtil; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.runner.Runner; @@ -68,8 +68,9 @@ public class KeyByBenchmarks extends BenchmarkBase { StreamExecutionEnvironment env = context.env; env.setParallelism(4); - DataStreamSource<int[]> source = env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10)); - source.keyBy(KeySelectorUtil.getSelectorForArray(new int[]{0}, source.getType())) + DataStreamSource<int[]> source = + env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10)); + source.keyBy(KeySelectorUtil.getSelectorForArray(new int[] {0}, source.getType())) .addSink(new DiscardingSink<>()); env.execute(); diff --git a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java index 72ad6c8..95eca61 100644 --- a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java @@ -167,7 +167,8 @@ public class MultipleInputBenchmark extends BenchmarkBase { @Override public SourceReader<Integer, MockSourceSplit> createReader( SourceReaderContext readerContext) { - return new MockSourceReader(MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED, true) { + return new MockSourceReader( + MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED, true) { @Override public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) { if (canFinish.isDone() && !canFinish.isCompletedExceptionally()) { diff --git a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java index 83939b8..21db382 100644 --- a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink; import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; import org.apache.flink.util.Collector; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.runner.Runner; @@ -41,7 +42,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase { private static final int PARALLELISM = 1; - private static OneShotLatch LATCH = new OneShotLatch(); + private static OneShotLatch latch = new OneShotLatch(); public static void main(String[] args) throws RunnerException { Options options = @@ -55,7 +56,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase { @Benchmark public void fireProcessingTimers(FlinkEnvironmentContext context) throws Exception { - LATCH.reset(); + latch.reset(); StreamExecutionEnvironment env = context.env; env.setParallelism(PARALLELISM); @@ -84,7 +85,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase { sourceContext.collect(String.valueOf(random.nextLong())); } - LATCH.await(); + latch.await(); } @Override @@ -111,7 +112,8 @@ public class ProcessingTimerBenchmark extends BenchmarkBase { throws Exception { final long currTimestamp = System.currentTimeMillis(); for (int i = 0; i < timersPerRecord; i++) { - context.timerService().registerProcessingTimeTimer(currTimestamp - timersPerRecord + i); + context.timerService() + .registerProcessingTimeTimer(currTimestamp - timersPerRecord + i); } } @@ -119,7 +121,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase { public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { if (++firedTimesCount == timersPerRecord) { - LATCH.trigger(); + latch.trigger(); } } } diff --git a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java index e6fef61..70715e0 100644 --- a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java +++ b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java @@ -36,7 +36,9 @@ public abstract class RemoteBenchmarkBase extends BenchmarkBase { return 1; } - /** @return the number of vertices the respective job graph contains. */ + /** + * @return the number of vertices the respective job graph contains. + */ abstract int getNumberOfVertices(); } } diff --git a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java index dd4a242..61402c0 100644 --- a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java @@ -68,7 +68,8 @@ public class SerializationFrameworkMiniBenchmarks extends BenchmarkBase { StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + SerializerConfigImpl serializerConfig = + (SerializerConfigImpl) executionConfig.getSerializerConfig(); serializerConfig.registerPojoType(MyPojo.class); serializerConfig.registerPojoType(MyOperation.class); @@ -85,7 +86,8 @@ public class SerializationFrameworkMiniBenchmarks extends BenchmarkBase { StreamExecutionEnvironment env = context.env; env.setParallelism(1); ExecutionConfig executionConfig = env.getConfig(); - SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + SerializerConfigImpl serializerConfig = + (SerializerConfigImpl) executionConfig.getSerializerConfig(); serializerConfig.registerPojoType(MyPojo.class); serializerConfig.registerPojoType(MyOperation.class); @@ -115,7 +117,8 @@ public class SerializationFrameworkMiniBenchmarks extends BenchmarkBase { StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + SerializerConfigImpl serializerConfig = + (SerializerConfigImpl) executionConfig.getSerializerConfig(); serializerConfig.setForceKryo(true); serializerConfig.registerKryoType(MyPojo.class); serializerConfig.registerKryoType(MyOperation.class); @@ -167,9 +170,9 @@ public class SerializationFrameworkMiniBenchmarks extends BenchmarkBase { super.init(); templates = new String[] { - makeString(StringSerializationBenchmark.asciiChars, 1024), - makeString(StringSerializationBenchmark.russianChars, 1024), - makeString(StringSerializationBenchmark.chineseChars, 1024) + makeString(StringSerializationBenchmark.ASCII_CHARS, 1024), + makeString(StringSerializationBenchmark.RUSSIAN_CHARS, 1024), + makeString(StringSerializationBenchmark.CHINESE_CHARS, 1024) }; } diff --git a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java index 00d665e..629ee99 100644 --- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java +++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java @@ -21,11 +21,7 @@ package org.apache.flink.benchmark; import org.apache.flink.benchmark.functions.IntegerLongSource; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.StateBackendOptions; -import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.util.FileUtils; @@ -65,7 +61,6 @@ public class StateBackendBenchmarkBase extends BenchmarkBase { e.printStackTrace(); } - Configuration configuration = Configuration.fromMap(env.getConfiguration().toMap()); String checkpointDataUri = "file://" + checkpointDir.getAbsolutePath(); switch (stateBackend) { @@ -93,7 +88,7 @@ public class StateBackendBenchmarkBase extends BenchmarkBase { } // default character - //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation)); } diff --git a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java index 07d6063..9bdac47 100644 --- a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java @@ -16,15 +16,12 @@ * limitations under the License. */ - package org.apache.flink.benchmark; import org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentBenchmark; import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.TearDown; @@ -34,7 +31,9 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import org.openjdk.jmh.runner.options.VerboseMode; -/** The watermark aggregation benchmark for source coordinator when enabling the watermark alignment. */ +/** + * The watermark aggregation benchmark for source coordinator when enabling the watermark alignment. + */ public class WatermarkAggregationBenchmark extends BenchmarkBase { private static final int NUM_SUBTASKS = 5000; @@ -47,7 +46,10 @@ public class WatermarkAggregationBenchmark extends BenchmarkBase { Options options = new OptionsBuilder() .verbosity(VerboseMode.NORMAL) - .include(".*" + WatermarkAggregationBenchmark.class.getCanonicalName() + ".*") + .include( + ".*" + + WatermarkAggregationBenchmark.class.getCanonicalName() + + ".*") .build(); new Runner(options).run(); @@ -71,5 +73,4 @@ public class WatermarkAggregationBenchmark extends BenchmarkBase { public void teardown() throws Exception { benchmark.teardown(); } - } diff --git a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java index f6a961b..98a34b9 100644 --- a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java @@ -61,7 +61,9 @@ public class PojoSerializationBenchmark extends BenchmarkBase { TypeInformation.of(SerializationFrameworkMiniBenchmarks.MyPojo.class) .createSerializer(config.getSerializerConfig()); TypeSerializer<SerializationFrameworkMiniBenchmarks.MyPojo> kryoSerializer = - new KryoSerializer<>(SerializationFrameworkMiniBenchmarks.MyPojo.class, config.getSerializerConfig()); + new KryoSerializer<>( + SerializationFrameworkMiniBenchmarks.MyPojo.class, + config.getSerializerConfig()); TypeSerializer<org.apache.flink.benchmark.avro.MyPojo> avroSerializer = new AvroSerializer<>(org.apache.flink.benchmark.avro.MyPojo.class); diff --git a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java index dafb4ae..94f7152 100644 --- a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java @@ -127,7 +127,8 @@ public class SerializationFrameworkAllBenchmarks extends SerializationFrameworkM StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + SerializerConfigImpl serializerConfig = + (SerializerConfigImpl) executionConfig.getSerializerConfig(); serializerConfig.setForceKryo(true); serializerConfig.addDefaultKryoSerializer( org.apache.flink.benchmark.thrift.MyPojo.class, TBaseSerializer.class); @@ -147,7 +148,8 @@ public class SerializationFrameworkAllBenchmarks extends SerializationFrameworkM StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + SerializerConfigImpl serializerConfig = + (SerializerConfigImpl) executionConfig.getSerializerConfig(); serializerConfig.setForceKryo(true); serializerConfig.registerTypeWithKryoSerializer( org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class, diff --git a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java index 6cc032e..dd7c3b2 100644 --- a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java @@ -52,20 +52,24 @@ import java.util.concurrent.TimeUnit; @OutputTimeUnit(TimeUnit.MILLISECONDS) public class StringSerializationBenchmark extends BenchmarkBase { - public static final char[] asciiChars = + public static final char[] ASCII_CHARS = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890".toCharArray(); - public static final char[] russianChars = + public static final char[] RUSSIAN_CHARS = "йцукенгшщзхъфывапролджэячсмитьбюЙЦУКЕНГШЩЗХЪФЫВАПРОЛДЖЭЯЧСМИТЬБЮ".toCharArray(); - public static final char[] chineseChars = + public static final char[] CHINESE_CHARS = "的是不了人我在有他这为之大来以个中上们到国说和地也子要时道出而于就下得可你年生".toCharArray(); + @Param({"ascii", "russian", "chinese"}) public String type; + @Param({"4", "128", "16384"}) public String lengthStr; + int length; String input; ExecutionConfig config = new ExecutionConfig(); - TypeSerializer<String> serializer = TypeInformation.of(String.class).createSerializer(config.getSerializerConfig()); + TypeSerializer<String> serializer = + TypeInformation.of(String.class).createSerializer(config.getSerializerConfig()); ByteArrayInputStream serializedBuffer; DataInputView serializedStream; @@ -85,13 +89,13 @@ public class StringSerializationBenchmark extends BenchmarkBase { length = Integer.parseInt(lengthStr); switch (type) { case "ascii": - input = generate(asciiChars, length); + input = generate(ASCII_CHARS, length); break; case "russian": - input = generate(russianChars, length); + input = generate(RUSSIAN_CHARS, length); break; case "chinese": - input = generate(chineseChars, length); + input = generate(CHINESE_CHARS, length); break; default: throw new IllegalArgumentException(type + "charset is not supported"); diff --git a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java index 22ad2f9..6afba8d 100644 --- a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java +++ b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java @@ -24,6 +24,7 @@ public class IntegerLongSource extends RichParallelSourceFunction<IntegerLongSou private volatile boolean running = true; private int numberOfKeys; private long numberOfElements; + public IntegerLongSource(int numberOfKeys, long numberOfElements) { this.numberOfKeys = numberOfKeys; this.numberOfElements = numberOfElements; diff --git a/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java b/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java index d44eafc..f4c45f8 100644 --- a/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java +++ b/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java @@ -54,4 +54,4 @@ public enum LongSourceType { public DataStreamSource<Long> source(StreamExecutionEnvironment environment, long maxValue) { return factory.apply(environment, maxValue); } -}; +} diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java index 105ad5e..2b89b55 100644 --- a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java +++ b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java @@ -32,7 +32,8 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.runner.RunnerException; /** The benchmark of handle global failure and restarting tasks in a STREAMING/BATCH job. */ -public class HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor extends SchedulerBenchmarkExecutorBase { +public class HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor + extends SchedulerBenchmarkExecutorBase { @Param({"BATCH", "STREAMING", "BATCH_EVENLY", "STREAMING_EVENLY"}) private JobConfiguration jobConfiguration; diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java index 182776b..b718747 100644 --- a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java +++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java @@ -35,7 +35,12 @@ import org.openjdk.jmh.runner.RunnerException; public class SchedulingDownstreamTasksInBatchJobBenchmarkExecutor extends SchedulerBenchmarkExecutorBase { - @Param({"BATCH", "BATCH_HYBRID_DEFAULT", "BATCH_HYBRID_PARTIAL_FINISHED", "BATCH_HYBRID_ALL_FINISHED"}) + @Param({ + "BATCH", + "BATCH_HYBRID_DEFAULT", + "BATCH_HYBRID_PARTIAL_FINISHED", + "BATCH_HYBRID_ALL_FINISHED" + }) private JobConfiguration jobConfiguration; private SchedulingDownstreamTasksInBatchJobBenchmark benchmark; diff --git a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java index 1ec6032..f630cc8 100644 --- a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java +++ b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java @@ -19,13 +19,16 @@ package org.apache.flink.state.benchmark; import org.apache.flink.api.common.JobID; -import org.apache.flink.config.ConfigUtil; -import org.apache.flink.config.StateBenchmarkOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; -import org.openjdk.jmh.annotations.*; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.RunnerException; import java.io.IOException; @@ -48,7 +51,8 @@ public class HashMapStateBackendRescalingBenchmarkExecutor extends RescalingBenc @Setup(Level.Trial) public void setUp() throws Exception { - // FsStateBackend is deprecated in favor of HashMapStateBackend with setting checkpointStorage. + // FsStateBackend is deprecated in favor of HashMapStateBackend with setting + // checkpointStorage. HashMapStateBackend stateBackend = new HashMapStateBackend(); benchmark = new RescalingBenchmarkBuilder<byte[]>() @@ -56,10 +60,16 @@ public class HashMapStateBackendRescalingBenchmarkExecutor extends RescalingBenc .setParallelismBefore(rescaleType.getParallelismBefore()) .setParallelismAfter(rescaleType.getParallelismAfter()) .setCheckpointStorageAccess( - new FileSystemCheckpointStorage(new URI("file://" + prepareDirectory("rescaleDb").getAbsolutePath()), 0) + new FileSystemCheckpointStorage( + new URI( + "file://" + + prepareDirectory("rescaleDb") + .getAbsolutePath()), + 0) .createCheckpointStorage(new JobID())) .setStateBackend(stateBackend) - .setStreamRecordGenerator(new ByteArrayRecordGenerator(numberOfKeys, keyLen)) + .setStreamRecordGenerator( + new ByteArrayRecordGenerator(numberOfKeys, keyLen)) .setStateProcessFunctionSupplier(TestKeyedFunction::new) .build(); benchmark.setUp(); diff --git a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java index c8c6f2d..e350ace 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java @@ -39,14 +39,14 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.LIST_VALUE_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Implementation for list state benchmark testing. */ public class ListStateBenchmark extends StateBenchmarkBase { - private final String STATE_NAME = "listState"; - private final ListStateDescriptor<Long> STATE_DESC = - new ListStateDescriptor<>(STATE_NAME, Long.class); + private final String stateName = "listState"; + private final ListStateDescriptor<Long> stateDesc = + new ListStateDescriptor<>(stateName, Long.class); private ListState<Long> listState; private List<Long> dummyLists; @@ -63,9 +63,9 @@ public class ListStateBenchmark extends StateBenchmarkBase { @Setup public void setUp() throws Exception { keyedStateBackend = createKeyedStateBackend(); - listState = getListState(keyedStateBackend, STATE_DESC); - dummyLists = new ArrayList<>(listValueCount); - for (int i = 0; i < listValueCount; ++i) { + listState = getListState(keyedStateBackend, stateDesc); + dummyLists = new ArrayList<>(LIST_VALUE_COUNT); + for (int i = 0; i < LIST_VALUE_COUNT; ++i) { dummyLists.add(random.nextLong()); } keyIndex = new AtomicInteger(); @@ -73,27 +73,27 @@ public class ListStateBenchmark extends StateBenchmarkBase { @Setup(Level.Iteration) public void setUpPerIteration() throws Exception { - for (int i = 0; i < setupKeyCount; ++i) { + for (int i = 0; i < SETUP_KEY_COUNT; ++i) { keyedStateBackend.setCurrentKey((long) i); listState.add(random.nextLong()); } // make sure only one sst file left, so all get invocation will access this single file, // to prevent the spike caused by different key distribution in multiple sst files, // the more access to the older sst file, the lower throughput will be. - compactState(keyedStateBackend, STATE_DESC); + compactState(keyedStateBackend, stateDesc); } @TearDown(Level.Iteration) public void tearDownPerIteration() throws Exception { applyToAllKeys( keyedStateBackend, - STATE_DESC, + stateDesc, (k, state) -> { keyedStateBackend.setCurrentKey(k); state.clear(); }); // make the clearance effective, trigger compaction for RocksDB, and GC for heap. - if (!compactState(keyedStateBackend, STATE_DESC)) { + if (!compactState(keyedStateBackend, stateDesc)) { System.gc(); } // wait a while for the clearance to take effect. diff --git a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java index 32dce60..c840b5b 100644 --- a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java @@ -37,9 +37,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Implementation for map state benchmark testing. */ public class MapStateBenchmark extends StateBenchmarkBase { @@ -63,14 +63,14 @@ public class MapStateBenchmark extends StateBenchmarkBase { getMapState( keyedStateBackend, new MapStateDescriptor<>("mapState", Long.class, Double.class)); - dummyMaps = new HashMap<>(mapKeyCount); - for (int i = 0; i < mapKeyCount; ++i) { - dummyMaps.put(mapKeys.get(i), random.nextDouble()); + dummyMaps = new HashMap<>(MAP_KEY_COUNT); + for (int i = 0; i < MAP_KEY_COUNT; ++i) { + dummyMaps.put(MAP_KEYS.get(i), random.nextDouble()); } - for (int i = 0; i < setupKeyCount; ++i) { + for (int i = 0; i < SETUP_KEY_COUNT; ++i) { keyedStateBackend.setCurrentKey((long) i); - for (int j = 0; j < mapKeyCount; j++) { - mapState.put(mapKeys.get(j), random.nextDouble()); + for (int j = 0; j < MAP_KEY_COUNT; j++) { + mapState.put(MAP_KEYS.get(j), random.nextDouble()); } } keyIndex = new AtomicInteger(); @@ -107,7 +107,7 @@ public class MapStateBenchmark extends StateBenchmarkBase { } @Benchmark - @OperationsPerInvocation(mapKeyCount) + @OperationsPerInvocation(MAP_KEY_COUNT) public void mapKeys(KeyValue keyValue, Blackhole bh) throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); for (Long key : mapState.keys()) { @@ -116,7 +116,7 @@ public class MapStateBenchmark extends StateBenchmarkBase { } @Benchmark - @OperationsPerInvocation(mapKeyCount) + @OperationsPerInvocation(MAP_KEY_COUNT) public void mapValues(KeyValue keyValue, Blackhole bh) throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); for (Double value : mapState.values()) { @@ -125,7 +125,7 @@ public class MapStateBenchmark extends StateBenchmarkBase { } @Benchmark - @OperationsPerInvocation(mapKeyCount) + @OperationsPerInvocation(MAP_KEY_COUNT) public void mapEntries(KeyValue keyValue, Blackhole bh) throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); Iterable<Map.Entry<Long, Double>> iterable = mapState.entries(); @@ -138,7 +138,7 @@ public class MapStateBenchmark extends StateBenchmarkBase { } @Benchmark - @OperationsPerInvocation(mapKeyCount) + @OperationsPerInvocation(MAP_KEY_COUNT) public void mapIterator(KeyValue keyValue, Blackhole bh) throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator(); diff --git a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java index f5b749d..4b6d68c 100644 --- a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.state.benchmark; import org.apache.flink.api.common.functions.OpenContext; @@ -23,13 +24,10 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.benchmark.BenchmarkBase; -import org.apache.flink.config.ConfigUtil; -import org.apache.flink.config.StateBenchmarkOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.state.benchmark.RescalingBenchmark; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; + import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; @@ -41,8 +39,6 @@ import org.openjdk.jmh.runner.options.VerboseMode; import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.Arrays; import java.util.Iterator; import java.util.Random; @@ -105,9 +101,7 @@ public class RescalingBenchmarkBase extends BenchmarkBase { private final byte[] fatArray; private int count = 0; - - protected ByteArrayRecordGenerator(final int numberOfKeys, - final int keyLen) { + protected ByteArrayRecordGenerator(final int numberOfKeys, final int keyLen) { this.numberOfKeys = numberOfKeys; fatArray = new byte[keyLen]; } diff --git a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java index b552ad7..205fc8e 100644 --- a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java +++ b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java @@ -15,17 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.state.benchmark; import org.apache.flink.api.common.JobID; -import org.apache.flink.config.ConfigUtil; -import org.apache.flink.config.StateBenchmarkOptions; -import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; -import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; -import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.RunnerException; import java.io.IOException; @@ -55,10 +58,14 @@ public class RocksdbStateBackendRescalingBenchmarkExecutor extends RescalingBenc .setParallelismAfter(rescaleType.getParallelismAfter()) .setManagedMemorySize(512 * 1024 * 1024) .setCheckpointStorageAccess( - new FileSystemCheckpointStorage("file://" + prepareDirectory("rescaleDb").getAbsolutePath()) + new FileSystemCheckpointStorage( + "file://" + + prepareDirectory("rescaleDb") + .getAbsolutePath()) .createCheckpointStorage(new JobID())) .setStateBackend(stateBackend) - .setStreamRecordGenerator(new ByteArrayRecordGenerator(numberOfKeys, keyLen)) + .setStreamRecordGenerator( + new ByteArrayRecordGenerator(numberOfKeys, keyLen)) .setStateProcessFunctionSupplier(TestKeyedFunction::new) .build(); benchmark.setUp(); diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java index 8c1f970..c61c82e 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.state.benchmark; import org.apache.flink.benchmark.BenchmarkBase; @@ -22,8 +23,8 @@ import org.apache.flink.config.ConfigUtil; import org.apache.flink.config.StateBenchmarkOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.KeyedStateBackend; - import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; @@ -41,15 +42,15 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.cleanUp; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapValues; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.newKeyCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.newKeys; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.randomValueCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.randomValues; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeys; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_VALUES; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.NEW_KEYS; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.NEW_KEY_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.RANDOM_VALUES; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.RANDOM_VALUE_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEYS; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Base implementation of the state benchmarks. */ public class StateBenchmarkBase extends BenchmarkBase { @@ -66,8 +67,10 @@ public class StateBenchmarkBase extends BenchmarkBase { return createKeyedStateBackend(TtlTimeProvider.DEFAULT); } - protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider ttlTimeProvider) throws Exception { - return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, createStateDataDir()); + protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider ttlTimeProvider) + throws Exception { + return StateBackendBenchmarkUtils.createKeyedStateBackend( + backendType, createStateDataDir()); } public static File createStateDataDir() throws IOException { @@ -108,16 +111,16 @@ public class StateBenchmarkBase extends BenchmarkBase { @Setup(Level.Invocation) public void kvSetup() { int currentIndex = getCurrentIndex(); - setUpKey = setupKeys.get(currentIndex % setupKeyCount); - newKey = newKeys.get(currentIndex % newKeyCount); - mapKey = mapKeys.get(currentIndex % mapKeyCount); - mapValue = mapValues.get(currentIndex % mapKeyCount); - value = randomValues.get(currentIndex % randomValueCount); + setUpKey = SETUP_KEYS.get(currentIndex % SETUP_KEY_COUNT); + newKey = NEW_KEYS.get(currentIndex % NEW_KEY_COUNT); + mapKey = MAP_KEYS.get(currentIndex % MAP_KEY_COUNT); + mapValue = MAP_VALUES.get(currentIndex % MAP_KEY_COUNT); + value = RANDOM_VALUES.get(currentIndex % RANDOM_VALUE_COUNT); // TODO: singletonList is taking 25% of time in mapAdd benchmark... This shouldn't be // initiated if benchmark is not using it and for the benchmarks that are using it, // this should also be probably somehow avoided. listValue = - Collections.singletonList(randomValues.get(currentIndex % randomValueCount)); + Collections.singletonList(RANDOM_VALUES.get(currentIndex % RANDOM_VALUE_COUNT)); } @TearDown(Level.Invocation) diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java index c0a141f..d445a20 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java @@ -28,54 +28,54 @@ import java.util.Random; */ public class StateBenchmarkConstants { // TODO: why all of those static fields? Those should be inside a context class - public static final int mapKeyCount = 10; - public static final int listValueCount = 100; - public static final int setupKeyCount = 500_000; - public static final String rootDirName = "benchmark"; - public static final String recoveryDirName = "localRecovery"; - public static final String dbDirName = "dbPath"; + public static final int MAP_KEY_COUNT = 10; + public static final int LIST_VALUE_COUNT = 100; + public static final int SETUP_KEY_COUNT = 500_000; + public static final String ROOT_DIR_NAME = "benchmark"; + public static final String RECOVERY_DIR_NAME = "localRecovery"; + public static final String DB_PATH = "dbPath"; - public static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount); - public static final ArrayList<Double> mapValues = new ArrayList<>(mapKeyCount); - public static final ArrayList<Long> setupKeys = new ArrayList<>(setupKeyCount); - public static final int newKeyCount = 500_000; - public static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount); - public static final int randomValueCount = 1_000_000; - public static final ArrayList<Long> randomValues = new ArrayList<>(randomValueCount); + public static final ArrayList<Long> MAP_KEYS = new ArrayList<>(MAP_KEY_COUNT); + public static final ArrayList<Double> MAP_VALUES = new ArrayList<>(MAP_KEY_COUNT); + public static final ArrayList<Long> SETUP_KEYS = new ArrayList<>(SETUP_KEY_COUNT); + public static final int NEW_KEY_COUNT = 500_000; + public static final ArrayList<Long> NEW_KEYS = new ArrayList<>(NEW_KEY_COUNT); + public static final int RANDOM_VALUE_COUNT = 1_000_000; + public static final ArrayList<Long> RANDOM_VALUES = new ArrayList<>(RANDOM_VALUE_COUNT); static { - for (int i = 0; i < mapKeyCount; i++) { - mapKeys.add((long) i); + for (int i = 0; i < MAP_KEY_COUNT; i++) { + MAP_KEYS.add((long) i); } - Collections.shuffle(mapKeys); + Collections.shuffle(MAP_KEYS); } static { Random random = new Random(); - for (int i = 0; i < mapKeyCount; i++) { - mapValues.add(random.nextDouble()); + for (int i = 0; i < MAP_KEY_COUNT; i++) { + MAP_VALUES.add(random.nextDouble()); } - Collections.shuffle(mapValues); + Collections.shuffle(MAP_VALUES); } static { - for (long i = 0; i < setupKeyCount; i++) { - setupKeys.add(i); + for (long i = 0; i < SETUP_KEY_COUNT; i++) { + SETUP_KEYS.add(i); } - Collections.shuffle(setupKeys); + Collections.shuffle(SETUP_KEYS); } static { - for (long i = 0; i < newKeyCount; i++) { - newKeys.add(i + setupKeyCount); + for (long i = 0; i < NEW_KEY_COUNT; i++) { + NEW_KEYS.add(i + SETUP_KEY_COUNT); } - Collections.shuffle(newKeys); + Collections.shuffle(NEW_KEYS); } static { - for (long i = 0; i < randomValueCount; i++) { - randomValues.add(i); + for (long i = 0; i < RANDOM_VALUE_COUNT; i++) { + RANDOM_VALUES.add(i); } - Collections.shuffle(randomValues); + Collections.shuffle(RANDOM_VALUES); } } diff --git a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java index 0be9e14..7c35d50 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java @@ -33,7 +33,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Implementation for listValue state benchmark testing. */ public class ValueStateBenchmark extends StateBenchmarkBase { @@ -54,7 +54,7 @@ public class ValueStateBenchmark extends StateBenchmarkBase { keyedStateBackend = createKeyedStateBackend(); valueState = getValueState(keyedStateBackend, new ValueStateDescriptor<>("kvState", Long.class)); - for (int i = 0; i < setupKeyCount; ++i) { + for (int i = 0; i < SETUP_KEY_COUNT; ++i) { keyedStateBackend.setCurrentKey((long) i); valueState.update(random.nextLong()); } diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java index 829b440..33fdac7 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java @@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.state.benchmark.StateBenchmarkBase; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Setup; @@ -39,12 +40,12 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.LIST_VALUE_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Implementation for list state benchmark testing. */ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { - private final String STATE_NAME = "listState"; + private final String stateName = "listState"; private ListStateDescriptor<Long> stateDesc; private ListState<Long> listState; private List<Long> dummyLists; @@ -62,10 +63,10 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { @Setup public void setUp() throws Exception { keyedStateBackend = createKeyedStateBackend(); - stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); + stateDesc = configTtl(new ListStateDescriptor<>(stateName, Long.class)); listState = getListState(keyedStateBackend, stateDesc); - dummyLists = new ArrayList<>(listValueCount); - for (int i = 0; i < listValueCount; ++i) { + dummyLists = new ArrayList<>(LIST_VALUE_COUNT); + for (int i = 0; i < LIST_VALUE_COUNT; ++i) { dummyLists.add(random.nextLong()); } keyIndex = new AtomicInteger(); @@ -73,7 +74,7 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { @Setup(Level.Iteration) public void setUpPerIteration() throws Exception { - for (int i = 0; i < setupKeyCount; ++i) { + for (int i = 0; i < SETUP_KEY_COUNT; ++i) { keyedStateBackend.setCurrentKey((long) i); setTtlWhenInitialization(); listState.add(random.nextLong()); @@ -127,7 +128,8 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase { } @Benchmark - public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception { + public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) + throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); Iterable<Long> iterable = listState.get(); for (Long value : iterable) { diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java index 772a103..8e61ffe 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java @@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.state.benchmark.StateBenchmarkBase; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.OperationsPerInvocation; @@ -38,9 +39,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Implementation for map state benchmark testing. */ public class TtlMapStateBenchmark extends TtlStateBenchmarkBase { @@ -64,15 +65,15 @@ public class TtlMapStateBenchmark extends TtlStateBenchmarkBase { getMapState( keyedStateBackend, configTtl(new MapStateDescriptor<>("mapState", Long.class, Double.class))); - dummyMaps = new HashMap<>(mapKeyCount); - for (int i = 0; i < mapKeyCount; ++i) { - dummyMaps.put(mapKeys.get(i), random.nextDouble()); + dummyMaps = new HashMap<>(MAP_KEY_COUNT); + for (int i = 0; i < MAP_KEY_COUNT; ++i) { + dummyMaps.put(MAP_KEYS.get(i), random.nextDouble()); } - for (int i = 0; i < setupKeyCount; ++i) { + for (int i = 0; i < SETUP_KEY_COUNT; ++i) { keyedStateBackend.setCurrentKey((long) i); - for (int j = 0; j < mapKeyCount; j++) { + for (int j = 0; j < MAP_KEY_COUNT; j++) { setTtlWhenInitialization(); - mapState.put(mapKeys.get(j), random.nextDouble()); + mapState.put(MAP_KEYS.get(j), random.nextDouble()); } } keyIndex = new AtomicInteger(); @@ -108,7 +109,7 @@ public class TtlMapStateBenchmark extends TtlStateBenchmarkBase { } @Benchmark - @OperationsPerInvocation(mapKeyCount) + @OperationsPerInvocation(MAP_KEY_COUNT) public void mapIterator(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception { keyedStateBackend.setCurrentKey(keyValue.setUpKey); Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator(); diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java index bfe0017..a5d4fa1 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.state.benchmark.StateBenchmarkBase; + import org.openjdk.jmh.annotations.Param; import java.time.Duration; @@ -42,6 +43,7 @@ public class TtlStateBenchmarkBase extends StateBenchmarkBase { NeverExpired(0); public long advanceTimePerIteration; + ExpiredTimeOptions(int expirePercentPerIteration) { this.advanceTimePerIteration = initialTime * expirePercentPerIteration / 100; } diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java index ee34cfb..12ed5c2 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java @@ -20,6 +20,7 @@ package org.apache.flink.state.benchmark.ttl; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Setup; @@ -33,7 +34,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState; -import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT; /** Implementation for listValue state benchmark testing. */ public class TtlValueStateBenchmark extends TtlStateBenchmarkBase { @@ -52,8 +53,11 @@ public class TtlValueStateBenchmark extends TtlStateBenchmarkBase { @Setup public void setUp() throws Exception { keyedStateBackend = createKeyedStateBackend(); - valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class))); - for (int i = 0; i < setupKeyCount; ++i) { + valueState = + getValueState( + keyedStateBackend, + configTtl(new ValueStateDescriptor<>("kvState", Long.class))); + for (int i = 0; i < SETUP_KEY_COUNT; ++i) { setTtlWhenInitialization(); keyedStateBackend.setCurrentKey((long) i); valueState.update(random.nextLong());
