This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dc0f6f658be4e8d1845857bb16ac5155ccb0873
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Tue Jan 16 17:22:02 2024 +0100

    [FLINK-34134] Collect and report size/location statistics during task 
restore.
---
 .../apache/flink/runtime/metrics/MetricNames.java  |  1 +
 .../api/operators/BackendRestorerProcedure.java    |  9 +++-
 .../operators/StreamTaskStateInitializerImpl.java  | 62 +++++++++++++++++----
 .../operators/BackendRestorerProcedureTest.java    | 11 ++--
 .../StreamTaskStateInitializerImplTest.java        | 63 +++++++++++++++++++---
 5 files changed, 125 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index b0e5be64592..4aa375020c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -69,6 +69,7 @@ public class MetricNames {
     public static final String INITIALIZE_STATE_DURATION = 
"InitializeStateDurationMs";
     public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs";
     public static final String DOWNLOAD_STATE_DURATION = 
"DownloadStateDurationMs";
+    public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes";
 
     public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" 
+ SUFFIX_RATE;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index 5a0cea94927..83f6cdcc41f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -91,7 +91,9 @@ public class BackendRestorerProcedure<T extends Closeable & 
Disposable, S extend
      * @throws Exception if the backend could not be created or restored.
      */
     @Nonnull
-    public T createAndRestore(@Nonnull List<? extends Collection<S>> 
restoreOptions)
+    public T createAndRestore(
+            @Nonnull List<? extends Collection<S>> restoreOptions,
+            @Nonnull StateObject.StateObjectSizeStatsCollector stats)
             throws Exception {
 
         if (restoreOptions.isEmpty()) {
@@ -132,7 +134,10 @@ public class BackendRestorerProcedure<T extends Closeable 
& Disposable, S extend
             }
 
             try {
-                return attemptCreateAndRestore(restoreState);
+                T successfullyRestored = attemptCreateAndRestore(restoreState);
+                // Obtain and report stats for the state objects used in our 
successful restore
+                restoreState.forEach(handle -> handle.collectSizeStats(stats));
+                return successfullyRestored;
             } catch (Exception ex) {
 
                 collectedException = ExceptionUtils.firstOrSuppressed(ex, 
collectedException);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 737a75f0627..9cd11a09a81 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -65,7 +67,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import static 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromKeyedStateHandles;
@@ -166,6 +170,9 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
         CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs 
= null;
         InternalTimeServiceManager<?> timeServiceManager;
 
+        final StateObject.StateObjectSizeStatsCollector statsCollector =
+                StateObject.StateObjectSizeStatsCollector.create();
+
         try {
 
             // -------------- Keyed State Backend --------------
@@ -176,28 +183,32 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                             prioritizedOperatorSubtaskStates,
                             streamTaskCloseableRegistry,
                             metricGroup,
-                            managedMemoryFraction);
+                            managedMemoryFraction,
+                            statsCollector);
 
             // -------------- Operator State Backend --------------
             operatorStateBackend =
                     operatorStateBackend(
                             operatorIdentifierText,
                             prioritizedOperatorSubtaskStates,
-                            streamTaskCloseableRegistry);
+                            streamTaskCloseableRegistry,
+                            statsCollector);
 
             // -------------- Raw State Streams --------------
             rawKeyedStateInputs =
                     rawKeyedStateInputs(
                             prioritizedOperatorSubtaskStates
                                     .getPrioritizedRawKeyedState()
-                                    .iterator());
+                                    .iterator(),
+                            statsCollector);
             streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
 
             rawOperatorStateInputs =
                     rawOperatorStateInputs(
                             prioritizedOperatorSubtaskStates
                                     .getPrioritizedRawOperatorState()
-                                    .iterator());
+                                    .iterator(),
+                            statsCollector);
             
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
 
             // -------------- Internal Timer Service Manager --------------
@@ -227,6 +238,24 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                 timeServiceManager = null;
             }
 
+            // Add stats for input channel and result partition state
+            Stream.concat(
+                            
prioritizedOperatorSubtaskStates.getPrioritizedInputChannelState()
+                                    .stream(),
+                            
prioritizedOperatorSubtaskStates.getPrioritizedResultSubpartitionState()
+                                    .stream())
+                    .filter(Objects::nonNull)
+                    .forEach(channelHandle -> 
channelHandle.collectSizeStats(statsCollector));
+
+            // Report collected stats to metrics
+            statsCollector
+                    .getStats()
+                    .forEach(
+                            (location, metricValue) ->
+                                    initializationMetrics.addDurationMetric(
+                                            MetricNames.RESTORED_STATE_SIZE + 
"." + location,
+                                            metricValue));
+
             // -------------- Preparing return value --------------
 
             return new StreamOperatorStateContextImpl(
@@ -269,7 +298,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
     protected OperatorStateBackend operatorStateBackend(
             String operatorIdentifierText,
             PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
-            CloseableRegistry backendCloseableRegistry)
+            CloseableRegistry backendCloseableRegistry,
+            StateObject.StateObjectSizeStatsCollector statsCollector)
             throws Exception {
 
         String logDescription = "operator state backend for " + 
operatorIdentifierText;
@@ -295,7 +325,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
         try {
             return backendRestorer.createAndRestore(
-                    
prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
+                    
prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState(),
+                    statsCollector);
         } finally {
             if 
(backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
                 IOUtils.closeQuietly(cancelStreamRegistryForRestore);
@@ -309,7 +340,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
             PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
             CloseableRegistry backendCloseableRegistry,
             MetricGroup metricGroup,
-            double managedMemoryFraction)
+            double managedMemoryFraction,
+            StateObject.StateObjectSizeStatsCollector statsCollector)
             throws Exception {
 
         if (keySerializer == null) {
@@ -365,7 +397,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
         try {
             return backendRestorer.createAndRestore(
-                    
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
+                    
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState(),
+                    statsCollector);
         } finally {
             if 
(backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
                 IOUtils.closeQuietly(cancelStreamRegistryForRestore);
@@ -374,7 +407,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
     }
 
     protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(
-            Iterator<StateObjectCollection<OperatorStateHandle>> 
restoreStateAlternatives) {
+            @Nonnull Iterator<StateObjectCollection<OperatorStateHandle>> 
restoreStateAlternatives,
+            @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) 
{
 
         if (restoreStateAlternatives.hasNext()) {
 
@@ -386,6 +420,10 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                     "Local recovery is currently not implemented for raw 
operator state, but found state alternative.");
 
             if (rawOperatorState != null) {
+                // Report restore size stats
+                rawOperatorState.forEach(
+                        stateObject -> 
stateObject.collectSizeStats(statsCollector));
+
                 return new CloseableIterable<StatePartitionStreamProvider>() {
 
                     final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
@@ -411,7 +449,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
     }
 
     protected CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs(
-            Iterator<StateObjectCollection<KeyedStateHandle>> 
restoreStateAlternatives) {
+            @Nonnull Iterator<StateObjectCollection<KeyedStateHandle>> 
restoreStateAlternatives,
+            @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) 
{
 
         if (restoreStateAlternatives.hasNext()) {
             Collection<KeyedStateHandle> rawKeyedState = 
restoreStateAlternatives.next();
@@ -424,6 +463,9 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
             if (rawKeyedState != null) {
                 Collection<KeyGroupsStateHandle> keyGroupsStateHandles = 
transform(rawKeyedState);
+                // Report restore size stats
+                keyGroupsStateHandles.forEach(
+                        stateObject -> 
stateObject.collectSizeStats(statsCollector));
                 final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
                 return new 
CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
index 31c49c5815e..1e0fe548212 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingFSDataInputStream;
 import org.apache.flink.util.FlinkException;
@@ -121,7 +122,8 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                         backendSupplier, closeableRegistry, "test op state 
backend");
 
         OperatorStateBackend restoredBackend =
-                restorerProcedure.createAndRestore(sortedRestoreOptions);
+                restorerProcedure.createAndRestore(
+                        sortedRestoreOptions, 
StateObject.StateObjectSizeStatsCollector.create());
         Assert.assertNotNull(restoredBackend);
 
         try {
@@ -165,7 +167,8 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                         backendSupplier, closeableRegistry, "test op state 
backend");
 
         try {
-            restorerProcedure.createAndRestore(sortedRestoreOptions);
+            restorerProcedure.createAndRestore(
+                    sortedRestoreOptions, 
StateObject.StateObjectSizeStatsCollector.create());
             Assert.fail();
         } catch (Exception ignore) {
         }
@@ -199,7 +202,9 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                 new Thread(
                         () -> {
                             try {
-                                
restorerProcedure.createAndRestore(sortedRestoreOptions);
+                                restorerProcedure.createAndRestore(
+                                        sortedRestoreOptions,
+                                        
StateObject.StateObjectSizeStatsCollector.create());
                             } catch (Exception e) {
                                 exceptionReference.set(e);
                             }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 905ce40a09d..1d52b26dbdd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -25,11 +25,13 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
 import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
@@ -38,6 +40,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
@@ -59,14 +62,17 @@ import org.junit.Test;
 
 import java.io.Closeable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.OptionalLong;
 import java.util.Random;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
 import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.state.OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -81,7 +87,12 @@ public class StreamTaskStateInitializerImplTest {
 
         // No job manager provided state to restore
         StreamTaskStateInitializer streamTaskStateManager =
-                streamTaskStateManager(stateBackend, null, true);
+                streamTaskStateManager(
+                        stateBackend,
+                        null,
+                        new SubTaskInitializationMetricsBuilder(
+                                
SystemClock.getInstance().absoluteTimeMillis()),
+                        true);
 
         OperatorID operatorID = new OperatorID(47L, 11L);
         AbstractStreamOperator<?> streamOperator = 
mock(AbstractStreamOperator.class);
@@ -187,8 +198,13 @@ public class StreamTaskStateInitializerImplTest {
         JobManagerTaskRestore jobManagerTaskRestore =
                 new JobManagerTaskRestore(42L, taskStateSnapshot);
 
+        SubTaskInitializationMetricsBuilder metricsBuilder =
+                new SubTaskInitializationMetricsBuilder(
+                        SystemClock.getInstance().absoluteTimeMillis());
+
         StreamTaskStateInitializer streamTaskStateManager =
-                streamTaskStateManager(mockingBackend, jobManagerTaskRestore, 
false);
+                streamTaskStateManager(
+                        mockingBackend, jobManagerTaskRestore, metricsBuilder, 
false);
 
         AbstractStreamOperator<?> streamOperator = 
mock(AbstractStreamOperator.class);
         when(streamOperator.getOperatorID()).thenReturn(operatorID);
@@ -217,7 +233,7 @@ public class StreamTaskStateInitializerImplTest {
         CloseableIterable<StatePartitionStreamProvider> operatorStateInputs =
                 stateContext.rawOperatorStateInputs();
 
-        Assert.assertTrue("Expected the context to be restored", 
stateContext.isRestored());
+        assertTrue("Expected the context to be restored", 
stateContext.isRestored());
         Assert.assertEquals(OptionalLong.of(42L), 
stateContext.getRestoredCheckpointId());
 
         Assert.assertNotNull(operatorStateBackend);
@@ -240,6 +256,41 @@ public class StreamTaskStateInitializerImplTest {
         }
         Assert.assertEquals(3, count);
 
+        long expectedSumLocalMemory =
+                Stream.of(
+                                
operatorSubtaskState.getManagedOperatorState().stream(),
+                                
operatorSubtaskState.getManagedKeyedState().stream(),
+                                
operatorSubtaskState.getRawKeyedState().stream(),
+                                
operatorSubtaskState.getRawOperatorState().stream())
+                        .flatMap(i -> i)
+                        .mapToLong(StateObject::getStateSize)
+                        .sum();
+
+        long expectedSumUnknown =
+                Stream.concat(
+                                
operatorSubtaskState.getInputChannelState().stream(),
+                                
operatorSubtaskState.getResultSubpartitionState().stream())
+                        .mapToLong(StateObject::getStateSize)
+                        .sum();
+
+        SubTaskInitializationMetrics metrics = metricsBuilder.build();
+        Assert.assertEquals(
+                new HashMap<String, Long>() {
+                    {
+                        put(
+                                MetricNames.RESTORED_STATE_SIZE
+                                        + "."
+                                        + 
StateObject.StateObjectLocation.LOCAL_MEMORY.name(),
+                                expectedSumLocalMemory);
+                        put(
+                                MetricNames.RESTORED_STATE_SIZE
+                                        + "."
+                                        + 
StateObject.StateObjectLocation.UNKNOWN.name(),
+                                expectedSumUnknown);
+                    }
+                },
+                metrics.getDurationMetrics());
+
         checkCloseablesRegistered(
                 closeableRegistry,
                 operatorStateBackend,
@@ -251,13 +302,14 @@ public class StreamTaskStateInitializerImplTest {
     private static void checkCloseablesRegistered(
             CloseableRegistry closeableRegistry, Closeable... closeables) {
         for (Closeable closeable : closeables) {
-            
Assert.assertTrue(closeableRegistry.unregisterCloseable(closeable));
+            assertTrue(closeableRegistry.unregisterCloseable(closeable));
         }
     }
 
     private StreamTaskStateInitializer streamTaskStateManager(
             StateBackend stateBackend,
             JobManagerTaskRestore jobManagerTaskRestore,
+            SubTaskInitializationMetricsBuilder metricsBuilder,
             boolean createTimerServiceManager) {
 
         JobID jobID = new JobID(42L, 43L);
@@ -291,8 +343,7 @@ public class StreamTaskStateInitializerImplTest {
             return new StreamTaskStateInitializerImpl(
                     dummyEnvironment,
                     stateBackend,
-                    new SubTaskInitializationMetricsBuilder(
-                            SystemClock.getInstance().absoluteTimeMillis()),
+                    metricsBuilder,
                     TtlTimeProvider.DEFAULT,
                     new InternalTimeServiceManager.Provider() {
                         @Override

Reply via email to