This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7725216d80ea1a9c9846b894dcb083ea14e577df Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Oct 26 18:23:48 2023 +0200 [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time --- docs/content/docs/ops/traces.md | 8 ++++++-- .../org/apache/flink/runtime/metrics/MetricNames.java | 1 + .../streaming/state/EmbeddedRocksDBStateBackend.java | 1 + .../state/RocksDBKeyedStateBackendBuilder.java | 6 ++++++ .../contrib/streaming/state/RocksDBStateDownloader.java | 15 ++++++++++++++- .../restore/RocksDBIncrementalRestoreOperation.java | 7 ++++++- .../streaming/state/RocksDBStateDownloaderTest.java | 17 ++++++++++++++--- .../flink/contrib/streaming/state/RocksDBTestUtils.java | 1 + .../state/benchmark/StateBackendBenchmarkUtils.java | 1 + .../org/apache/flink/test/state/BackendSwitchSpecs.java | 1 + 10 files changed, 51 insertions(+), 7 deletions(-) diff --git a/docs/content/docs/ops/traces.md b/docs/content/docs/ops/traces.md index ab11023c794..3fffd7ce430 100644 --- a/docs/content/docs/ops/traces.md +++ b/docs/content/docs/ops/traces.md @@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio </thead> <tbody> <tr> - <th rowspan="14">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th> + <th rowspan="15">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th> <th rowspan="6"><strong>Checkpoint</strong></th> <td>startTs</td> <td>Timestamp when the checkpoint has started.</td> @@ -123,7 +123,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio <td>What was the state of this checkpoint: FAILED or COMPLETED.</td> </tr> <tr> - <th rowspan="8"><strong>JobInitialization</strong></th> + <th rowspan="9"><strong>JobInitialization</strong></th> <td>startTs</td> <td>Timestamp when the job initialization has started.</td> </tr> @@ -155,6 +155,10 @@ Flink reports a single span trace for the whole checkpoint and job initializatio <td>(Max/Sum)GateRestoreDurationMs</td> <td>The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's input buffers.</td> </tr> + <tr> + <td>(Max/Sum)DownloadStateDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td> + <td>The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS.</td> + </tr> </tbody> </table> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 7484383497d..b0e5be64592 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -68,6 +68,7 @@ public class MetricNames { public static final String READ_OUTPUT_DATA_DURATION = "ReadOutputDataDurationMs"; public static final String INITIALIZE_STATE_DURATION = "InitializeStateDurationMs"; public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs"; + public static final String DOWNLOAD_STATE_DURATION = "DownloadStateDurationMs"; public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" + SUFFIX_RATE; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index f9801249473..048515b5b44 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -457,6 +457,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke parameters.getTtlTimeProvider(), latencyTrackingStateConfig, parameters.getMetricGroup(), + parameters.getCustomInitializationMetrics(), parameters.getStateHandles(), keyGroupCompressionDecorator, parameters.getCancelStreamRegistry()) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index e7959add211..6d2ca01344d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; @@ -110,6 +111,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken private final File instanceRocksDBPath; private final MetricGroup metricGroup; + private final StateBackend.CustomInitializationMetrics customInitializationMetrics; /** True if incremental checkpointing is enabled. */ private boolean enableIncrementalCheckpointing; @@ -142,6 +144,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, + StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, CloseableRegistry cancelStreamRegistry) { @@ -168,6 +171,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.instanceBasePath = instanceBasePath; this.instanceRocksDBPath = getInstanceRocksDBPath(instanceBasePath); this.metricGroup = metricGroup; + this.customInitializationMetrics = customInitializationMetrics; this.enableIncrementalCheckpointing = false; this.nativeMetricOptions = new RocksDBNativeMetricOptions(); this.numberOfTransferingThreads = @@ -212,6 +216,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ttlTimeProvider, latencyTrackingStateConfig, metricGroup, + (key, value) -> {}, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry); @@ -474,6 +479,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, + customInitializationMetrics, restoreStateHandles, ttlCompactFiltersManager, writeBatchSize, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index 3d0c1830de4..af1694c66e7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -19,11 +19,13 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; @@ -39,10 +41,17 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; + /** Help class for downloading RocksDB state files. */ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { - public RocksDBStateDownloader(int restoringThreadNum) { + + private final CustomInitializationMetrics customInitializationMetrics; + + public RocksDBStateDownloader( + int restoringThreadNum, CustomInitializationMetrics customInitializationMetrics) { super(restoringThreadNum); + this.customInitializationMetrics = customInitializationMetrics; } /** @@ -61,11 +70,15 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { // Make sure we also react to external close signals. closeableRegistry.registerCloseable(internalCloser); try { + long startTimeMs = SystemClock.getInstance().relativeTimeMillis(); List<CompletableFuture<Void>> futures = transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser) .collect(Collectors.toList()); // Wait until either all futures completed successfully or one failed exceptionally. FutureUtils.completeAll(futures).get(); + customInitializationMetrics.addMetric( + DOWNLOAD_STATE_DURATION, + SystemClock.getInstance().relativeTimeMillis() - startTimeMs); } catch (Exception e) { downloadRequests.stream() .map(StateHandleDownloadSpec::getDownloadDestination) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 11d4756ae13..faac6bee9bf 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics; import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; @@ -98,6 +99,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private final int keyGroupPrefixBytes; private final StateSerializerProvider<K> keySerializerProvider; private final ClassLoader userCodeClassLoader; + private final CustomInitializationMetrics customInitializationMetrics; private long lastCompletedCheckpointId; private UUID backendUID; private final long writeBatchSize; @@ -120,6 +122,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, + CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, @@ -141,6 +144,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.backendUID = UUID.randomUUID(); this.writeBatchSize = writeBatchSize; this.overlapFractionThreshold = overlapFractionThreshold; + this.customInitializationMetrics = customInitializationMetrics; this.restoreStateHandles = restoreStateHandles; this.cancelStreamRegistry = cancelStreamRegistry; this.keyGroupRange = keyGroupRange; @@ -254,7 +258,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private void transferRemoteStateToLocalDirectory( Collection<StateHandleDownloadSpec> downloadRequests) throws Exception { try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(numberOfTransferringThreads)) { + new RocksDBStateDownloader( + numberOfTransferringThreads, customInitializationMetrics)) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, cancelStreamRegistry); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java index dd5026da052..3177f373f6a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java @@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -39,11 +40,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -71,7 +75,8 @@ public class RocksDBStateDownloaderTest extends TestLogger { stateHandles, stateHandle); - try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) { + try (RocksDBStateDownloader rocksDBStateDownloader = + new RocksDBStateDownloader(5, (key, value) -> {})) { rocksDBStateDownloader.transferAllStateDataToDirectory( Collections.singletonList( new StateHandleDownloadSpec( @@ -97,11 +102,16 @@ public class RocksDBStateDownloaderTest extends TestLogger { temporaryFolder.newFolder().toPath(), contents[i], i)); } - try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(4)) { + Set<String> customMetrics = new HashSet<>(); + + try (RocksDBStateDownloader rocksDBStateDownloader = + new RocksDBStateDownloader(4, (key, value) -> customMetrics.add(key))) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, new CloseableRegistry()); } + assertThat(customMetrics).containsExactly(MetricNames.DOWNLOAD_STATE_DURATION); + for (int i = 0; i < numRemoteHandles; ++i) { StateHandleDownloadSpec downloadRequest = downloadRequests.get(i); Path dstPath = downloadRequest.getDownloadDestination(); @@ -138,7 +148,8 @@ public class RocksDBStateDownloaderTest extends TestLogger { "error-handle")); CloseableRegistry closeableRegistry = new CloseableRegistry(); - try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) { + try (RocksDBStateDownloader rocksDBStateDownloader = + new RocksDBStateDownloader(5, (key, value) -> {})) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, closeableRegistry); fail("Exception is expected"); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index 18fc39a9ff7..2fc862664ce 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -76,6 +76,7 @@ public final class RocksDBTestUtils { TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), + (key, value) -> {}, Collections.emptyList(), UncompressedStreamCompressionDecorator.INSTANCE, new CloseableRegistry()); diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java index 0e5fbe8d022..7d766858483 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java @@ -193,6 +193,7 @@ public class StateBackendBenchmarkUtils { ttlTimeProvider, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), + (key, value) -> {}, Collections.emptyList(), AbstractStateBackend.getCompressionDecorator(executionConfig), new CloseableRegistry()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java index 5b28b51e1e3..b271510a799 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java @@ -112,6 +112,7 @@ public final class BackendSwitchSpecs { TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), + (key, value) -> {}, stateHandles, UncompressedStreamCompressionDecorator.INSTANCE, new CloseableRegistry())
