This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9dc0f6f658be4e8d1845857bb16ac5155ccb0873 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Tue Jan 16 17:22:02 2024 +0100 [FLINK-34134] Collect and report size/location statistics during task restore. --- .../apache/flink/runtime/metrics/MetricNames.java | 1 + .../api/operators/BackendRestorerProcedure.java | 9 +++- .../operators/StreamTaskStateInitializerImpl.java | 62 +++++++++++++++++---- .../operators/BackendRestorerProcedureTest.java | 11 ++-- .../StreamTaskStateInitializerImplTest.java | 63 +++++++++++++++++++--- 5 files changed, 125 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index b0e5be64592..4aa375020c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -69,6 +69,7 @@ public class MetricNames { public static final String INITIALIZE_STATE_DURATION = "InitializeStateDurationMs"; public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs"; public static final String DOWNLOAD_STATE_DURATION = "DownloadStateDurationMs"; + public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes"; public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" + SUFFIX_RATE; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java index 5a0cea94927..83f6cdcc41f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java @@ -91,7 +91,9 @@ public class BackendRestorerProcedure<T extends Closeable & Disposable, S extend * @throws Exception if the backend could not be created or restored. */ @Nonnull - public T createAndRestore(@Nonnull List<? extends Collection<S>> restoreOptions) + public T createAndRestore( + @Nonnull List<? extends Collection<S>> restoreOptions, + @Nonnull StateObject.StateObjectSizeStatsCollector stats) throws Exception { if (restoreOptions.isEmpty()) { @@ -132,7 +134,10 @@ public class BackendRestorerProcedure<T extends Closeable & Disposable, S extend } try { - return attemptCreateAndRestore(restoreState); + T successfullyRestored = attemptCreateAndRestore(restoreState); + // Obtain and report stats for the state objects used in our successful restore + restoreState.forEach(handle -> handle.collectSizeStats(stats)); + return successfullyRestored; } catch (Exception ex) { collectedException = ExceptionUtils.firstOrSuppressed(ex, collectedException); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 737a75f0627..9cd11a09a81 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; @@ -65,7 +67,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.OptionalLong; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import static org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromKeyedStateHandles; @@ -166,6 +170,9 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null; InternalTimeServiceManager<?> timeServiceManager; + final StateObject.StateObjectSizeStatsCollector statsCollector = + StateObject.StateObjectSizeStatsCollector.create(); + try { // -------------- Keyed State Backend -------------- @@ -176,28 +183,32 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, - managedMemoryFraction); + managedMemoryFraction, + statsCollector); // -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, - streamTaskCloseableRegistry); + streamTaskCloseableRegistry, + statsCollector); // -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() - .iterator()); + .iterator(), + statsCollector); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() - .iterator()); + .iterator(), + statsCollector); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- @@ -227,6 +238,24 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize timeServiceManager = null; } + // Add stats for input channel and result partition state + Stream.concat( + prioritizedOperatorSubtaskStates.getPrioritizedInputChannelState() + .stream(), + prioritizedOperatorSubtaskStates.getPrioritizedResultSubpartitionState() + .stream()) + .filter(Objects::nonNull) + .forEach(channelHandle -> channelHandle.collectSizeStats(statsCollector)); + + // Report collected stats to metrics + statsCollector + .getStats() + .forEach( + (location, metricValue) -> + initializationMetrics.addDurationMetric( + MetricNames.RESTORED_STATE_SIZE + "." + location, + metricValue)); + // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( @@ -269,7 +298,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize protected OperatorStateBackend operatorStateBackend( String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, - CloseableRegistry backendCloseableRegistry) + CloseableRegistry backendCloseableRegistry, + StateObject.StateObjectSizeStatsCollector statsCollector) throws Exception { String logDescription = "operator state backend for " + operatorIdentifierText; @@ -295,7 +325,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize try { return backendRestorer.createAndRestore( - prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState()); + prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState(), + statsCollector); } finally { if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) { IOUtils.closeQuietly(cancelStreamRegistryForRestore); @@ -309,7 +340,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry, MetricGroup metricGroup, - double managedMemoryFraction) + double managedMemoryFraction, + StateObject.StateObjectSizeStatsCollector statsCollector) throws Exception { if (keySerializer == null) { @@ -365,7 +397,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize try { return backendRestorer.createAndRestore( - prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState()); + prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState(), + statsCollector); } finally { if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) { IOUtils.closeQuietly(cancelStreamRegistryForRestore); @@ -374,7 +407,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs( - Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives) { + @Nonnull Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives, + @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) { if (restoreStateAlternatives.hasNext()) { @@ -386,6 +420,10 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize "Local recovery is currently not implemented for raw operator state, but found state alternative."); if (rawOperatorState != null) { + // Report restore size stats + rawOperatorState.forEach( + stateObject -> stateObject.collectSizeStats(statsCollector)); + return new CloseableIterable<StatePartitionStreamProvider>() { final CloseableRegistry closeableRegistry = new CloseableRegistry(); @@ -411,7 +449,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs( - Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives) { + @Nonnull Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives, + @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) { if (restoreStateAlternatives.hasNext()) { Collection<KeyedStateHandle> rawKeyedState = restoreStateAlternatives.next(); @@ -424,6 +463,9 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize if (rawKeyedState != null) { Collection<KeyGroupsStateHandle> keyGroupsStateHandles = transform(rawKeyedState); + // Report restore size stats + keyGroupsStateHandles.forEach( + stateObject -> stateObject.collectSizeStats(statsCollector)); final CloseableRegistry closeableRegistry = new CloseableRegistry(); return new CloseableIterable<KeyGroupStatePartitionStreamProvider>() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java index 31c49c5815e..1e0fe548212 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.runtime.util.BlockingFSDataInputStream; import org.apache.flink.util.FlinkException; @@ -121,7 +122,8 @@ public class BackendRestorerProcedureTest extends TestLogger { backendSupplier, closeableRegistry, "test op state backend"); OperatorStateBackend restoredBackend = - restorerProcedure.createAndRestore(sortedRestoreOptions); + restorerProcedure.createAndRestore( + sortedRestoreOptions, StateObject.StateObjectSizeStatsCollector.create()); Assert.assertNotNull(restoredBackend); try { @@ -165,7 +167,8 @@ public class BackendRestorerProcedureTest extends TestLogger { backendSupplier, closeableRegistry, "test op state backend"); try { - restorerProcedure.createAndRestore(sortedRestoreOptions); + restorerProcedure.createAndRestore( + sortedRestoreOptions, StateObject.StateObjectSizeStatsCollector.create()); Assert.fail(); } catch (Exception ignore) { } @@ -199,7 +202,9 @@ public class BackendRestorerProcedureTest extends TestLogger { new Thread( () -> { try { - restorerProcedure.createAndRestore(sortedRestoreOptions); + restorerProcedure.createAndRestore( + sortedRestoreOptions, + StateObject.StateObjectSizeStatsCollector.create()); } catch (Exception e) { exceptionReference.set(e); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 905ce40a09d..1d52b26dbdd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -25,11 +25,13 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics; import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; @@ -38,6 +40,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager; import org.apache.flink.runtime.state.TaskLocalStateStore; @@ -59,14 +62,17 @@ import org.junit.Test; import java.io.Closeable; import java.util.Collections; +import java.util.HashMap; import java.util.OptionalLong; import java.util.Random; +import java.util.stream.Stream; import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle; import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.state.OperatorStateHandle.Mode.SPLIT_DISTRIBUTE; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -81,7 +87,12 @@ public class StreamTaskStateInitializerImplTest { // No job manager provided state to restore StreamTaskStateInitializer streamTaskStateManager = - streamTaskStateManager(stateBackend, null, true); + streamTaskStateManager( + stateBackend, + null, + new SubTaskInitializationMetricsBuilder( + SystemClock.getInstance().absoluteTimeMillis()), + true); OperatorID operatorID = new OperatorID(47L, 11L); AbstractStreamOperator<?> streamOperator = mock(AbstractStreamOperator.class); @@ -187,8 +198,13 @@ public class StreamTaskStateInitializerImplTest { JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(42L, taskStateSnapshot); + SubTaskInitializationMetricsBuilder metricsBuilder = + new SubTaskInitializationMetricsBuilder( + SystemClock.getInstance().absoluteTimeMillis()); + StreamTaskStateInitializer streamTaskStateManager = - streamTaskStateManager(mockingBackend, jobManagerTaskRestore, false); + streamTaskStateManager( + mockingBackend, jobManagerTaskRestore, metricsBuilder, false); AbstractStreamOperator<?> streamOperator = mock(AbstractStreamOperator.class); when(streamOperator.getOperatorID()).thenReturn(operatorID); @@ -217,7 +233,7 @@ public class StreamTaskStateInitializerImplTest { CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = stateContext.rawOperatorStateInputs(); - Assert.assertTrue("Expected the context to be restored", stateContext.isRestored()); + assertTrue("Expected the context to be restored", stateContext.isRestored()); Assert.assertEquals(OptionalLong.of(42L), stateContext.getRestoredCheckpointId()); Assert.assertNotNull(operatorStateBackend); @@ -240,6 +256,41 @@ public class StreamTaskStateInitializerImplTest { } Assert.assertEquals(3, count); + long expectedSumLocalMemory = + Stream.of( + operatorSubtaskState.getManagedOperatorState().stream(), + operatorSubtaskState.getManagedKeyedState().stream(), + operatorSubtaskState.getRawKeyedState().stream(), + operatorSubtaskState.getRawOperatorState().stream()) + .flatMap(i -> i) + .mapToLong(StateObject::getStateSize) + .sum(); + + long expectedSumUnknown = + Stream.concat( + operatorSubtaskState.getInputChannelState().stream(), + operatorSubtaskState.getResultSubpartitionState().stream()) + .mapToLong(StateObject::getStateSize) + .sum(); + + SubTaskInitializationMetrics metrics = metricsBuilder.build(); + Assert.assertEquals( + new HashMap<String, Long>() { + { + put( + MetricNames.RESTORED_STATE_SIZE + + "." + + StateObject.StateObjectLocation.LOCAL_MEMORY.name(), + expectedSumLocalMemory); + put( + MetricNames.RESTORED_STATE_SIZE + + "." + + StateObject.StateObjectLocation.UNKNOWN.name(), + expectedSumUnknown); + } + }, + metrics.getDurationMetrics()); + checkCloseablesRegistered( closeableRegistry, operatorStateBackend, @@ -251,13 +302,14 @@ public class StreamTaskStateInitializerImplTest { private static void checkCloseablesRegistered( CloseableRegistry closeableRegistry, Closeable... closeables) { for (Closeable closeable : closeables) { - Assert.assertTrue(closeableRegistry.unregisterCloseable(closeable)); + assertTrue(closeableRegistry.unregisterCloseable(closeable)); } } private StreamTaskStateInitializer streamTaskStateManager( StateBackend stateBackend, JobManagerTaskRestore jobManagerTaskRestore, + SubTaskInitializationMetricsBuilder metricsBuilder, boolean createTimerServiceManager) { JobID jobID = new JobID(42L, 43L); @@ -291,8 +343,7 @@ public class StreamTaskStateInitializerImplTest { return new StreamTaskStateInitializerImpl( dummyEnvironment, stateBackend, - new SubTaskInitializationMetricsBuilder( - SystemClock.getInstance().absoluteTimeMillis()), + metricsBuilder, TtlTimeProvider.DEFAULT, new InternalTimeServiceManager.Provider() { @Override