[FLINK-8360][checkpointing] Implement file-based local recovery for FsStateBackend
This reverts commit 8925b7c Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e04321fa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e04321fa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e04321fa Branch: refs/heads/master Commit: e04321fa12aba2ba304549393afe0f27d54db99f Parents: df3e6bb Author: Stefan Richter <[email protected]> Authored: Wed Feb 21 12:03:59 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:14:21 2018 +0100 ---------------------------------------------------------------------- .../state/heap/HeapKeyedStateBackend.java | 419 ++++++++++++------- .../StreamOperatorSnapshotRestoreTest.java | 167 ++++++-- 2 files changed, 408 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e04321fa/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 5d5f716..d9a5ec1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.HashMapSerializer; @@ -53,6 +54,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.SnapshotStrategy; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; @@ -64,12 +66,15 @@ import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; +import org.apache.flink.util.function.SupplierWithException; import org.apache.commons.collections.map.HashedMap; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -113,25 +118,34 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; /** - * Determines whether or not we run snapshots asynchronously. This impacts the choice of the underlying - * {@link StateTable} implementation. + * The configuration for local recovery. */ - private final boolean asynchronousSnapshots; + private final LocalRecoveryConfig localRecoveryConfig; + + /** + * The snapshot strategy for this backend. This determines, e.g., if snapshots are synchronous or asynchronous. + */ + private final HeapSnapshotStrategy snapshotStrategy; public HeapKeyedStateBackend( - TaskKvStateRegistry kvStateRegistry, - TypeSerializer<K> keySerializer, - ClassLoader userCodeClassLoader, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - boolean asynchronousSnapshots, - ExecutionConfig executionConfig, - LocalRecoveryConfig localRecoveryConfig) { + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + boolean asynchronousSnapshots, + ExecutionConfig executionConfig, + LocalRecoveryConfig localRecoveryConfig) { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); - this.asynchronousSnapshots = asynchronousSnapshots; - LOG.info("Initializing heap keyed state backend with stream factory."); + this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); + SnapshotStrategySynchronicityBehavior<K> synchronicityTrait = asynchronousSnapshots ? + new AsyncSnapshotStrategySynchronicityBehavior() : + new SyncSnapshotStrategySynchronicityBehavior(); + + this.snapshotStrategy = new HeapSnapshotStrategy(synchronicityTrait); + LOG.info("Initializing heap keyed state backend with stream factory."); this.restoredKvStateMetaInfos = new HashMap<>(); } @@ -160,7 +174,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName); if (stateTable == null) { - stateTable = newStateTable(newMetaInfo); + stateTable = snapshotStrategy.newStateTable(newMetaInfo); stateTables.put(stateName, stateTable); } else { // TODO with eager registration in place, these checks should be moved to restorePartitionedState() @@ -293,139 +307,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, - CheckpointOptions checkpointOptions) throws Exception { - - if (!hasRegisteredState()) { - return DoneFuture.of(SnapshotResult.empty()); - } - - long syncStartTime = System.currentTimeMillis(); - - Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE, - "Too many KV-States: " + stateTables.size() + - ". Currently at most " + Short.MAX_VALUE + " states are supported"); - - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = new ArrayList<>(stateTables.size()); - - final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size()); - - final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size()); - - for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { - kVStateToId.put(kvState.getKey(), kVStateToId.size()); - StateTable<K, ?, ?> stateTable = kvState.getValue(); - if (null != stateTable) { - metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot()); - cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot()); - } - } - - final KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - keySerializer, - metaInfoSnapshots, - !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); - - //--------------------------------------------------- this becomes the end of sync part - - // implementation of the async IO operation, based on FutureTask - final AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = - new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { - - CheckpointStreamFactory.CheckpointStateOutputStream stream = null; - - @Override - protected void acquireResources() throws Exception { - stream = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - cancelStreamRegistry.registerCloseable(stream); - } - - @Override - protected void releaseResources() throws Exception { - - if (cancelStreamRegistry.unregisterCloseable(stream)) { - IOUtils.closeQuietly(stream); - stream = null; - } - - for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) { - tableSnapshot.release(); - } - } - - @Override - protected void stopOperation() throws Exception { - if (cancelStreamRegistry.unregisterCloseable(stream)) { - IOUtils.closeQuietly(stream); - stream = null; - } - } - - @Override - public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { - long asyncStartTime = System.currentTimeMillis(); - - CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream; - - DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream); - serializationProxy.write(outView); - - long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; - - for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { - int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); - keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); - outView.writeInt(keyGroupId); - - for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { - OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream); - DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); - kgCompressionView.writeShort(kVStateToId.get(kvState.getKey())); - cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId); - kgCompressionOut.close(); // this will just close the outer stream - } - } - - if (cancelStreamRegistry.unregisterCloseable(stream)) { - - final StreamStateHandle streamStateHandle = stream.closeAndGetHandle(); - stream = null; - - if (asynchronousSnapshots) { - LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); - } - - if (streamStateHandle != null) { - - KeyGroupRangeOffsets offsets = - new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); - - final KeyGroupsStateHandle keyGroupsStateHandle = - new KeyGroupsStateHandle(offsets, streamStateHandle); - - return SnapshotResult.of(keyGroupsStateHandle); - } - } - - return SnapshotResult.empty(); - } - }; - - AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = AsyncStoppableTaskWithCallback.from(ioCallable); + CheckpointOptions checkpointOptions) { - if (!asynchronousSnapshots) { - task.run(); - } - - LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " + - Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms."); - - return task; + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } @SuppressWarnings("deprecation") - @Override public void restore(Collection<KeyedStateHandle> restoredState) throws Exception { if (restoredState == null || restoredState.isEmpty()) { return; @@ -525,7 +412,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { restoredMetaInfo.getNamespaceSerializer(), restoredMetaInfo.getStateSerializer()); - stateTable = newStateTable(registeredKeyedBackendStateMetaInfo); + stateTable = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo); stateTables.put(restoredMetaInfo.getName(), stateTable); kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName()); ++numRegisteredKvStates; @@ -614,14 +501,244 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return sum; } - public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) { - return asynchronousSnapshots ? - new CopyOnWriteStateTable<>(this, newMetaInfo) : - new NestedMapsStateTable<>(this, newMetaInfo); - } - @Override public boolean supportsAsynchronousSnapshots() { - return asynchronousSnapshots; + return snapshotStrategy.isAsynchronous(); + } + + @VisibleForTesting + public LocalRecoveryConfig getLocalRecoveryConfig() { + return localRecoveryConfig; + } + + private interface SnapshotStrategySynchronicityBehavior<K> { + + default void finalizeSnapshotBeforeReturnHook(Runnable runnable) { + + } + + default void logOperationCompleted(CheckpointStreamFactory streamFactory, long startTime) { + + } + + boolean isAsynchronous(); + + <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo); + } + + private class AsyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> { + + @Override + public void logOperationCompleted(CheckpointStreamFactory streamFactory, long startTime) { + LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + } + + @Override + public boolean isAsynchronous() { + return true; + } + + @Override + public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) { + return new CopyOnWriteStateTable<>(HeapKeyedStateBackend.this, newMetaInfo); + } + } + + private class SyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> { + + @Override + public void finalizeSnapshotBeforeReturnHook(Runnable runnable) { + // this triggers a synchronous execution from the main checkpointing thread. + runnable.run(); + } + + @Override + public boolean isAsynchronous() { + return false; + } + + @Override + public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) { + return new NestedMapsStateTable<>(HeapKeyedStateBackend.this, newMetaInfo); + } + } + + /** + * Base class for the snapshots of the heap backend that outlines the algorithm and offers some hooks to realize + * the concrete strategies. Subclasses must be threadsafe. + */ + private class HeapSnapshotStrategy + implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, SnapshotStrategySynchronicityBehavior<K> { + + private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait; + + public HeapSnapshotStrategy( + SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait) { + this.snapshotStrategySynchronicityTrait = snapshotStrategySynchronicityTrait; + } + + @Override + public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory primaryStreamFactory, + CheckpointOptions checkpointOptions) { + + if (!hasRegisteredState()) { + return DoneFuture.of(SnapshotResult.empty()); + } + + long syncStartTime = System.currentTimeMillis(); + + Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE, + "Too many KV-States: " + stateTables.size() + + ". Currently at most " + Short.MAX_VALUE + " states are supported"); + + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = + new ArrayList<>(stateTables.size()); + + final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size()); + + final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = + new HashedMap(stateTables.size()); + + for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { + kVStateToId.put(kvState.getKey(), kVStateToId.size()); + StateTable<K, ?, ?> stateTable = kvState.getValue(); + if (null != stateTable) { + metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot()); + cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot()); + } + } + + final KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>( + keySerializer, + metaInfoSnapshots, + !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); + + final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = + + LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals( + localRecoveryConfig.getLocalRecoveryMode()) ? + + () -> CheckpointStreamWithResultProvider.createDuplicatingStream( + checkpointId, + CheckpointedStateScope.EXCLUSIVE, + primaryStreamFactory, + localRecoveryConfig.getLocalStateDirectoryProvider()) : + + () -> CheckpointStreamWithResultProvider.createSimpleStream( + CheckpointedStateScope.EXCLUSIVE, + primaryStreamFactory); + + //--------------------------------------------------- this becomes the end of sync part + + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = + new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { + + CheckpointStreamWithResultProvider streamAndResultExtractor = null; + + @Override + protected void acquireResources() throws Exception { + streamAndResultExtractor = checkpointStreamSupplier.get(); + cancelStreamRegistry.registerCloseable(streamAndResultExtractor); + } + + @Override + protected void releaseResources() { + + unregisterAndCloseStreamAndResultExtractor(); + + for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) { + tableSnapshot.release(); + } + } + + @Override + protected void stopOperation() { + unregisterAndCloseStreamAndResultExtractor(); + } + + private void unregisterAndCloseStreamAndResultExtractor() { + if (cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) { + IOUtils.closeQuietly(streamAndResultExtractor); + streamAndResultExtractor = null; + } + } + + @Nonnull + @Override + protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception { + + long startTime = System.currentTimeMillis(); + + CheckpointStreamFactory.CheckpointStateOutputStream localStream = + this.streamAndResultExtractor.getCheckpointOutputStream(); + + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream); + serializationProxy.write(outView); + + long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; + + for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { + int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); + keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); + outView.writeInt(keyGroupId); + + for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { + try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream)) { + DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); + kgCompressionView.writeShort(kVStateToId.get(kvState.getKey())); + cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId); + } // this will just close the outer compression stream + } + } + + if (cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) { + KeyGroupRangeOffsets kgOffs = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); + SnapshotResult<StreamStateHandle> result = + streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult(); + streamAndResultExtractor = null; + logOperationCompleted(primaryStreamFactory, startTime); + return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs); + } + + return SnapshotResult.empty(); + } + }; + + AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = + AsyncStoppableTaskWithCallback.from(ioCallable); + + finalizeSnapshotBeforeReturnHook(task); + + LOG.info("Heap backend snapshot (" + primaryStreamFactory + ", synchronous part) in thread " + + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms."); + + return task; + } + + @Override + public void finalizeSnapshotBeforeReturnHook(Runnable runnable) { + snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable); + } + + @Override + public void logOperationCompleted(CheckpointStreamFactory streamFactory, long startTime) { + snapshotStrategySynchronicityTrait.logOperationCompleted(streamFactory, startTime); + } + + @Override + public boolean isAsynchronous() { + return snapshotStrategySynchronicityTrait.isAsynchronous(); + } + + @Override + public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) { + return snapshotStrategySynchronicityTrait.newStateTable(newMetaInfo); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e04321fa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 9d0b9e2..b2b568e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -25,65 +27,159 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.util.BitSet; /** * Tests for {@link StreamOperator} snapshot restoration. */ -public class StreamOperatorSnapshotRestoreTest { +public class StreamOperatorSnapshotRestoreTest extends TestLogger { + + private static final int ONLY_JM_RECOVERY = 0; + private static final int TM_AND_JM_RECOVERY = 1; + private static final int TM_REMOVE_JM_RECOVERY = 2; + private static final int JM_REMOVE_TM_RECOVERY = 3; private static final int MAX_PARALLELISM = 10; + protected static TemporaryFolder temporaryFolder; + + @BeforeClass + public static void beforeClass() throws IOException { + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + } + + @AfterClass + public static void afterClass() { + temporaryFolder.delete(); + } + + /** + * Test restoring an operator from a snapshot (local recovery deactivated). + */ @Test public void testOperatorStatesSnapshotRestore() throws Exception { + testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY); + } + + /** + * Test restoring an operator from a snapshot (local recovery activated). + */ + @Test + public void testOperatorStatesSnapshotRestoreWithLocalState() throws Exception { + testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY); + } + + /** + * Test restoring an operator from a snapshot (local recovery activated, JM snapshot deleted). + * + * <p>This case does not really simulate a practical scenario, but we make sure that restore happens from the local + * state here because we discard the JM state. + */ + @Test + public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM() throws Exception { + testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY); + } + + /** + * Test restoring an operator from a snapshot (local recovery activated, local TM snapshot deleted). + * + * <p>This tests discards the local state, to simulate corruption and checks that we still recover from the fallback + * JM state. + */ + @Test + public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM() throws Exception { + testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY); + } + + private void testOperatorStatesSnapshotRestoreInternal(final int mode) throws Exception { //-------------------------------------------------------------------------- snapshot + StateBackend stateBackend = createStateBackend(); + TestOneInputStreamOperator op = new TestOneInputStreamOperator(false); + JobID jobID = new JobID(); + JobVertexID jobVertexID = new JobVertexID(); + int subtaskIdx = 0; + + LocalRecoveryDirectoryProvider directoryProvider = + new LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx); + + LocalRecoveryConfig localRecoveryConfig = + mode != ONLY_JM_RECOVERY ? + new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider) : + new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider); + + MockEnvironment mockEnvironment = new MockEnvironment( + jobID, + jobVertexID, + "test", + 1024L * 1024L, + new MockInputSplitProvider(), + 1024 * 1024, + new Configuration(), + new ExecutionConfig(), + new TestTaskStateManager(localRecoveryConfig), + MAX_PARALLELISM, + 1, + subtaskIdx, + getClass().getClassLoader()); + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - op, - new KeySelector<Integer, Integer>() { - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, - TypeInformation.of(Integer.class), - MAX_PARALLELISM, - 1 /* num subtasks */, - 0 /* subtask index */); + new KeyedOneInputStreamOperatorTestHarness<>( + op, + (KeySelector<Integer, Integer>) value -> value, + TypeInformation.of(Integer.class), + mockEnvironment); + + testHarness.setStateBackend(stateBackend); testHarness.open(); for (int i = 0; i < 10; ++i) { testHarness.processElement(new StreamRecord<>(i)); } - OperatorSubtaskState handles = testHarness.snapshot(1L, 1L); + OperatorSnapshotFinalizer snapshotWithLocalState = testHarness.snapshotWithLocalState(1L, 1L); testHarness.close(); @@ -91,17 +187,12 @@ public class StreamOperatorSnapshotRestoreTest { op = new TestOneInputStreamOperator(true); testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>( - op, - new KeySelector<Integer, Integer>() { - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, - TypeInformation.of(Integer.class), - MAX_PARALLELISM, - 1 /* num subtasks */, - 0 /* subtask index */) { + op, + (KeySelector<Integer, Integer>) value -> value, + TypeInformation.of(Integer.class), + MAX_PARALLELISM, + 1 /* num subtasks */, + 0 /* subtask index */) { @Override protected StreamTaskStateInitializer createStreamTaskStateManager( @@ -122,7 +213,21 @@ public class StreamOperatorSnapshotRestoreTest { } }; - testHarness.initializeState(handles); + testHarness.setStateBackend(stateBackend); + + OperatorSubtaskState jobManagerOwnedState = snapshotWithLocalState.getJobManagerOwnedState(); + OperatorSubtaskState taskLocalState = snapshotWithLocalState.getTaskLocalState(); + + // We check if local state was created when we enabled local recovery + Assert.assertTrue(mode > ONLY_JM_RECOVERY == (taskLocalState != null && taskLocalState.hasState())); + + if (mode == TM_REMOVE_JM_RECOVERY) { + jobManagerOwnedState.getManagedKeyedState().discardState(); + } else if (mode == JM_REMOVE_TM_RECOVERY) { + taskLocalState.getManagedKeyedState().discardState(); + } + + testHarness.initializeState(jobManagerOwnedState, taskLocalState); testHarness.open(); @@ -133,6 +238,15 @@ public class StreamOperatorSnapshotRestoreTest { testHarness.close(); } + protected StateBackend createStateBackend() throws IOException { + return createStateBackendInternal(); + } + + protected final FsStateBackend createStateBackendInternal() throws IOException { + File checkpointDir = temporaryFolder.newFolder(); + return new FsStateBackend(checkpointDir.toURI()); + } + static class TestOneInputStreamOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { @@ -237,5 +351,4 @@ public class StreamOperatorSnapshotRestoreTest { } } } - }
