This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7725216d80ea1a9c9846b894dcb083ea14e577df
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Oct 26 18:23:48 2023 +0200

    [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files 
download time
---
 docs/content/docs/ops/traces.md                         |  8 ++++++--
 .../org/apache/flink/runtime/metrics/MetricNames.java   |  1 +
 .../streaming/state/EmbeddedRocksDBStateBackend.java    |  1 +
 .../state/RocksDBKeyedStateBackendBuilder.java          |  6 ++++++
 .../contrib/streaming/state/RocksDBStateDownloader.java | 15 ++++++++++++++-
 .../restore/RocksDBIncrementalRestoreOperation.java     |  7 ++++++-
 .../streaming/state/RocksDBStateDownloaderTest.java     | 17 ++++++++++++++---
 .../flink/contrib/streaming/state/RocksDBTestUtils.java |  1 +
 .../state/benchmark/StateBackendBenchmarkUtils.java     |  1 +
 .../org/apache/flink/test/state/BackendSwitchSpecs.java |  1 +
 10 files changed, 51 insertions(+), 7 deletions(-)

diff --git a/docs/content/docs/ops/traces.md b/docs/content/docs/ops/traces.md
index ab11023c794..3fffd7ce430 100644
--- a/docs/content/docs/ops/traces.md
+++ b/docs/content/docs/ops/traces.md
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
and job initializatio
   </thead>
   <tbody>
     <tr>
-      <th 
rowspan="14">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
+      <th 
rowspan="15">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
       <th rowspan="6"><strong>Checkpoint</strong></th>
       <td>startTs</td>
       <td>Timestamp when the checkpoint has started.</td>
@@ -123,7 +123,7 @@ Flink reports a single span trace for the whole checkpoint 
and job initializatio
       <td>What was the state of this checkpoint: FAILED or COMPLETED.</td>
     </tr>
     <tr>
-      <th rowspan="8"><strong>JobInitialization</strong></th>
+      <th rowspan="9"><strong>JobInitialization</strong></th>
       <td>startTs</td>
       <td>Timestamp when the job initialization has started.</td>
     </tr>
@@ -155,6 +155,10 @@ Flink reports a single span trace for the whole checkpoint 
and job initializatio
       <td>(Max/Sum)GateRestoreDurationMs</td>
       <td>The aggregated (max and sum) across all subtasks duration of reading 
unaligned checkpoint's input buffers.</td>
     </tr>
+    <tr>
+      <td>(Max/Sum)DownloadStateDurationMs<br><br>(optional - currently only 
supported by RocksDB Incremental)</td>
+      <td>The aggregated (max and sum) across all subtasks duration of 
downloading state files from the DFS.</td>
+    </tr>
   </tbody>
 </table>
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 7484383497d..b0e5be64592 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -68,6 +68,7 @@ public class MetricNames {
     public static final String READ_OUTPUT_DATA_DURATION = 
"ReadOutputDataDurationMs";
     public static final String INITIALIZE_STATE_DURATION = 
"InitializeStateDurationMs";
     public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs";
+    public static final String DOWNLOAD_STATE_DURATION = 
"DownloadStateDurationMs";
 
     public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" 
+ SUFFIX_RATE;
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index f9801249473..048515b5b44 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -457,6 +457,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                                 parameters.getTtlTimeProvider(),
                                 latencyTrackingStateConfig,
                                 parameters.getMetricGroup(),
+                                parameters.getCustomInitializationMetrics(),
                                 parameters.getStateHandles(),
                                 keyGroupCompressionDecorator,
                                 parameters.getCancelStreamRegistry())
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index e7959add211..6d2ca01344d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
@@ -110,6 +111,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
     private final File instanceRocksDBPath;
 
     private final MetricGroup metricGroup;
+    private final StateBackend.CustomInitializationMetrics 
customInitializationMetrics;
 
     /** True if incremental checkpointing is enabled. */
     private boolean enableIncrementalCheckpointing;
@@ -142,6 +144,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             TtlTimeProvider ttlTimeProvider,
             LatencyTrackingStateConfig latencyTrackingStateConfig,
             MetricGroup metricGroup,
+            StateBackend.CustomInitializationMetrics 
customInitializationMetrics,
             @Nonnull Collection<KeyedStateHandle> stateHandles,
             StreamCompressionDecorator keyGroupCompressionDecorator,
             CloseableRegistry cancelStreamRegistry) {
@@ -168,6 +171,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         this.instanceBasePath = instanceBasePath;
         this.instanceRocksDBPath = getInstanceRocksDBPath(instanceBasePath);
         this.metricGroup = metricGroup;
+        this.customInitializationMetrics = customInitializationMetrics;
         this.enableIncrementalCheckpointing = false;
         this.nativeMetricOptions = new RocksDBNativeMetricOptions();
         this.numberOfTransferingThreads =
@@ -212,6 +216,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 ttlTimeProvider,
                 latencyTrackingStateConfig,
                 metricGroup,
+                (key, value) -> {},
                 stateHandles,
                 keyGroupCompressionDecorator,
                 cancelStreamRegistry);
@@ -474,6 +479,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
                     metricGroup,
+                    customInitializationMetrics,
                     restoreStateHandles,
                     ttlCompactFiltersManager,
                     writeBatchSize,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index 3d0c1830de4..af1694c66e7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -19,11 +19,13 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -39,10 +41,17 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;
+
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
-    public RocksDBStateDownloader(int restoringThreadNum) {
+
+    private final CustomInitializationMetrics customInitializationMetrics;
+
+    public RocksDBStateDownloader(
+            int restoringThreadNum, CustomInitializationMetrics 
customInitializationMetrics) {
         super(restoringThreadNum);
+        this.customInitializationMetrics = customInitializationMetrics;
     }
 
     /**
@@ -61,11 +70,15 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
         // Make sure we also react to external close signals.
         closeableRegistry.registerCloseable(internalCloser);
         try {
+            long startTimeMs = SystemClock.getInstance().relativeTimeMillis();
             List<CompletableFuture<Void>> futures =
                     transferAllStateDataToDirectoryAsync(downloadRequests, 
internalCloser)
                             .collect(Collectors.toList());
             // Wait until either all futures completed successfully or one 
failed exceptionally.
             FutureUtils.completeAll(futures).get();
+            customInitializationMetrics.addMetric(
+                    DOWNLOAD_STATE_DURATION,
+                    SystemClock.getInstance().relativeTimeMillis() - 
startTimeMs);
         } catch (Exception e) {
             downloadRequests.stream()
                     .map(StateHandleDownloadSpec::getDownloadDestination)
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 11d4756ae13..faac6bee9bf 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
@@ -98,6 +99,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements 
RocksDBRestoreOper
     private final int keyGroupPrefixBytes;
     private final StateSerializerProvider<K> keySerializerProvider;
     private final ClassLoader userCodeClassLoader;
+    private final CustomInitializationMetrics customInitializationMetrics;
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
@@ -120,6 +122,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             RocksDBNativeMetricOptions nativeMetricOptions,
             MetricGroup metricGroup,
+            CustomInitializationMetrics customInitializationMetrics,
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
@@ -141,6 +144,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.backendUID = UUID.randomUUID();
         this.writeBatchSize = writeBatchSize;
         this.overlapFractionThreshold = overlapFractionThreshold;
+        this.customInitializationMetrics = customInitializationMetrics;
         this.restoreStateHandles = restoreStateHandles;
         this.cancelStreamRegistry = cancelStreamRegistry;
         this.keyGroupRange = keyGroupRange;
@@ -254,7 +258,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
     private void transferRemoteStateToLocalDirectory(
             Collection<StateHandleDownloadSpec> downloadRequests) throws 
Exception {
         try (RocksDBStateDownloader rocksDBStateDownloader =
-                new RocksDBStateDownloader(numberOfTransferringThreads)) {
+                new RocksDBStateDownloader(
+                        numberOfTransferringThreads, 
customInitializationMetrics)) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     downloadRequests, cancelStreamRegistry);
         }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index dd5026da052..3177f373f6a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.metrics.MetricNames;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -39,11 +40,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -71,7 +75,8 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                         stateHandles,
                         stateHandle);
 
-        try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(5)) {
+        try (RocksDBStateDownloader rocksDBStateDownloader =
+                new RocksDBStateDownloader(5, (key, value) -> {})) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     Collections.singletonList(
                             new StateHandleDownloadSpec(
@@ -97,11 +102,16 @@ public class RocksDBStateDownloaderTest extends TestLogger 
{
                             temporaryFolder.newFolder().toPath(), contents[i], 
i));
         }
 
-        try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(4)) {
+        Set<String> customMetrics = new HashSet<>();
+
+        try (RocksDBStateDownloader rocksDBStateDownloader =
+                new RocksDBStateDownloader(4, (key, value) -> 
customMetrics.add(key))) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     downloadRequests, new CloseableRegistry());
         }
 
+        
assertThat(customMetrics).containsExactly(MetricNames.DOWNLOAD_STATE_DURATION);
+
         for (int i = 0; i < numRemoteHandles; ++i) {
             StateHandleDownloadSpec downloadRequest = downloadRequests.get(i);
             Path dstPath = downloadRequest.getDownloadDestination();
@@ -138,7 +148,8 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                                 "error-handle"));
 
         CloseableRegistry closeableRegistry = new CloseableRegistry();
-        try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(5)) {
+        try (RocksDBStateDownloader rocksDBStateDownloader =
+                new RocksDBStateDownloader(5, (key, value) -> {})) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     downloadRequests, closeableRegistry);
             fail("Exception is expected");
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index 18fc39a9ff7..2fc862664ce 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -76,6 +76,7 @@ public final class RocksDBTestUtils {
                 TtlTimeProvider.DEFAULT,
                 LatencyTrackingStateConfig.disabled(),
                 new UnregisteredMetricsGroup(),
+                (key, value) -> {},
                 Collections.emptyList(),
                 UncompressedStreamCompressionDecorator.INSTANCE,
                 new CloseableRegistry());
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
index 0e5fbe8d022..7d766858483 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
@@ -193,6 +193,7 @@ public class StateBackendBenchmarkUtils {
                         ttlTimeProvider,
                         LatencyTrackingStateConfig.disabled(),
                         new UnregisteredMetricsGroup(),
+                        (key, value) -> {},
                         Collections.emptyList(),
                         
AbstractStateBackend.getCompressionDecorator(executionConfig),
                         new CloseableRegistry());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java 
b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
index 5b28b51e1e3..b271510a799 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
@@ -112,6 +112,7 @@ public final class BackendSwitchSpecs {
                             TtlTimeProvider.DEFAULT,
                             LatencyTrackingStateConfig.disabled(),
                             new UnregisteredMetricsGroup(),
+                            (key, value) -> {},
                             stateHandles,
                             UncompressedStreamCompressionDecorator.INSTANCE,
                             new CloseableRegistry())

Reply via email to