This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8c2f9f8f6824137d14715be9ff3c714d565da86a Author: Zakelly <[email protected]> AuthorDate: Sun Dec 24 15:13:05 2023 +0800 [FLINK-30535] Customize TtlTimeProvider in state benchmarks --- .../benchmark/StateBackendBenchmarkUtils.java | 37 ++++++++++++++-------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java index 09c15a7eb14..63199d9b4ca 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java @@ -83,33 +83,42 @@ public class StateBackendBenchmarkUtils { private static File rootDir; public static KeyedStateBackend<Long> createKeyedStateBackend( - StateBackendType backendType, File baseDir) throws IOException { + StateBackendType backendType, File baseDir, TtlTimeProvider ttlTimeProvider) + throws IOException { switch (backendType) { case HEAP: rootDir = prepareDirectory(rootDirName, baseDir); - return createHeapKeyedStateBackend(rootDir); + return createHeapKeyedStateBackend(rootDir, ttlTimeProvider); case ROCKSDB: rootDir = prepareDirectory(rootDirName, baseDir); - return createRocksDBKeyedStateBackend(rootDir); + return createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider); case HEAP_CHANGELOG: rootDir = prepareDirectory(rootDirName, baseDir); - return createChangelogKeyedStateBackend(createHeapKeyedStateBackend(rootDir)); + return createChangelogKeyedStateBackend( + createHeapKeyedStateBackend(rootDir, ttlTimeProvider)); case ROCKSDB_CHANGELOG: rootDir = prepareDirectory(rootDirName, baseDir); - return createChangelogKeyedStateBackend(createRocksDBKeyedStateBackend(rootDir)); + return createChangelogKeyedStateBackend( + createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider)); case BATCH_EXECUTION: - return createBatchExecutionStateBackend(); + return createBatchExecutionStateBackend(ttlTimeProvider); default: throw new IllegalArgumentException("Unknown backend type: " + backendType); } } + public static KeyedStateBackend<Long> createKeyedStateBackend( + StateBackendType backendType, File baseDir) throws IOException { + return createKeyedStateBackend(backendType, baseDir, TtlTimeProvider.DEFAULT); + } + public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType backendType) throws IOException { return createKeyedStateBackend(backendType, null); } - private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend() { + private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend( + TtlTimeProvider ttlTimeProvider) { return new BatchExecutionStateBackend() .createKeyedStateBackend( MockEnvironment.builder().build(), @@ -119,7 +128,7 @@ public class StateBackendBenchmarkUtils { 2, new KeyGroupRange(0, 1), null, - TtlTimeProvider.DEFAULT, + ttlTimeProvider, new UnregisteredMetricsGroup(), Collections.emptyList(), null); @@ -154,8 +163,8 @@ public class StateBackendBenchmarkUtils { new Path(cpPathFile.getPath()), null, new JobID(), 1024, 4096)); } - private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(File rootDir) - throws IOException { + private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend( + File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException { File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir); File dbPathFile = prepareDirectory(dbDirName, rootDir); ExecutionConfig executionConfig = new ExecutionConfig(); @@ -175,7 +184,7 @@ public class StateBackendBenchmarkUtils { new LocalRecoveryConfig(null), RocksDBPriorityQueueConfig.buildWithPriorityQueueType( EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB), - TtlTimeProvider.DEFAULT, + ttlTimeProvider, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), Collections.emptyList(), @@ -189,8 +198,8 @@ public class StateBackendBenchmarkUtils { } } - private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(File rootDir) - throws IOException { + private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend( + File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException { File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir); KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups(); @@ -205,7 +214,7 @@ public class StateBackendBenchmarkUtils { numberOfKeyGroups, keyGroupRange, executionConfig, - TtlTimeProvider.DEFAULT, + ttlTimeProvider, LatencyTrackingStateConfig.disabled(), Collections.emptyList(), AbstractStateBackend.getCompressionDecorator(executionConfig),
