[FLINK-8360][checkpointing] Implement file-based local recovery for 
FsStateBackend

This reverts commit 8925b7c


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e04321fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e04321fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e04321fa

Branch: refs/heads/master
Commit: e04321fa12aba2ba304549393afe0f27d54db99f
Parents: df3e6bb
Author: Stefan Richter <[email protected]>
Authored: Wed Feb 21 12:03:59 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:14:21 2018 +0100

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       | 419 ++++++++++++-------
 .../StreamOperatorSnapshotRestoreTest.java      | 167 ++++++--
 2 files changed, 408 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e04321fa/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 5d5f716..d9a5ec1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.HashMapSerializer;
@@ -53,6 +54,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
@@ -64,12 +66,15 @@ import 
org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -113,25 +118,34 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final Map<String, 
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
 
        /**
-        * Determines whether or not we run snapshots asynchronously. This 
impacts the choice of the underlying
-        * {@link StateTable} implementation.
+        * The configuration for local recovery.
         */
-       private final boolean asynchronousSnapshots;
+       private final LocalRecoveryConfig localRecoveryConfig;
+
+       /**
+        * The snapshot strategy for this backend. This determines, e.g., if 
snapshots are synchronous or asynchronous.
+        */
+       private final HeapSnapshotStrategy snapshotStrategy;
 
        public HeapKeyedStateBackend(
-               TaskKvStateRegistry kvStateRegistry,
-               TypeSerializer<K> keySerializer,
-               ClassLoader userCodeClassLoader,
-               int numberOfKeyGroups,
-               KeyGroupRange keyGroupRange,
-               boolean asynchronousSnapshots,
-               ExecutionConfig executionConfig,
-               LocalRecoveryConfig localRecoveryConfig) {
+                       TaskKvStateRegistry kvStateRegistry,
+                       TypeSerializer<K> keySerializer,
+                       ClassLoader userCodeClassLoader,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange,
+                       boolean asynchronousSnapshots,
+                       ExecutionConfig executionConfig,
+                       LocalRecoveryConfig localRecoveryConfig) {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
-               this.asynchronousSnapshots = asynchronousSnapshots;
-               LOG.info("Initializing heap keyed state backend with stream 
factory.");
+               this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
 
+               SnapshotStrategySynchronicityBehavior<K> synchronicityTrait = 
asynchronousSnapshots ?
+                       new AsyncSnapshotStrategySynchronicityBehavior() :
+                       new SyncSnapshotStrategySynchronicityBehavior();
+
+               this.snapshotStrategy = new 
HeapSnapshotStrategy(synchronicityTrait);
+               LOG.info("Initializing heap keyed state backend with stream 
factory.");
                this.restoredKvStateMetaInfos = new HashMap<>();
        }
 
@@ -160,7 +174,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                StateTable<K, N, V> stateTable = (StateTable<K, N, V>) 
stateTables.get(stateName);
 
                if (stateTable == null) {
-                       stateTable = newStateTable(newMetaInfo);
+                       stateTable = 
snapshotStrategy.newStateTable(newMetaInfo);
                        stateTables.put(stateName, stateTable);
                } else {
                        // TODO with eager registration in place, these checks 
should be moved to restorePartitionedState()
@@ -293,139 +307,12 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        final long checkpointId,
                        final long timestamp,
                        final CheckpointStreamFactory streamFactory,
-                       CheckpointOptions checkpointOptions) throws Exception {
-
-               if (!hasRegisteredState()) {
-                       return DoneFuture.of(SnapshotResult.empty());
-               }
-
-               long syncStartTime = System.currentTimeMillis();
-
-               Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
-                               "Too many KV-States: " + stateTables.size() +
-                                               ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
-
-               List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
metaInfoSnapshots = new ArrayList<>(stateTables.size());
-
-               final Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
-
-               final Map<StateTable<K, ?, ?>, StateTableSnapshot> 
cowStateStableSnapshots = new HashedMap(stateTables.size());
-
-               for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
-                       kVStateToId.put(kvState.getKey(), kVStateToId.size());
-                       StateTable<K, ?, ?> stateTable = kvState.getValue();
-                       if (null != stateTable) {
-                               
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
-                               cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
-                       }
-               }
-
-               final KeyedBackendSerializationProxy<K> serializationProxy =
-                       new KeyedBackendSerializationProxy<>(
-                               keySerializer,
-                               metaInfoSnapshots,
-                               
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
-
-               //--------------------------------------------------- this 
becomes the end of sync part
-
-               // implementation of the async IO operation, based on FutureTask
-               final 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
-                       new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
-
-                               
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
-
-                               @Override
-                               protected void acquireResources() throws 
Exception {
-                                       stream = 
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-                                       
cancelStreamRegistry.registerCloseable(stream);
-                               }
-
-                               @Override
-                               protected void releaseResources() throws 
Exception {
-
-                                       if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
-                                               IOUtils.closeQuietly(stream);
-                                               stream = null;
-                                       }
-
-                                       for (StateTableSnapshot tableSnapshot : 
cowStateStableSnapshots.values()) {
-                                               tableSnapshot.release();
-                                       }
-                               }
-
-                               @Override
-                               protected void stopOperation() throws Exception 
{
-                                       if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
-                                               IOUtils.closeQuietly(stream);
-                                               stream = null;
-                                       }
-                               }
-
-                               @Override
-                               public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
-                                       long asyncStartTime = 
System.currentTimeMillis();
-
-                                       
CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
-
-                                       DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(localStream);
-                                       serializationProxy.write(outView);
-
-                                       long[] keyGroupRangeOffsets = new 
long[keyGroupRange.getNumberOfKeyGroups()];
-
-                                       for (int keyGroupPos = 0; keyGroupPos < 
keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
-                                               int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
-                                               
keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
-                                               outView.writeInt(keyGroupId);
-
-                                               for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-                                                       OutputStream 
kgCompressionOut = 
keyGroupCompressionDecorator.decorateWithCompression(localStream);
-                                                       
DataOutputViewStreamWrapper kgCompressionView = new 
DataOutputViewStreamWrapper(kgCompressionOut);
-                                                       
kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
-                                                       
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
 keyGroupId);
-                                                       
kgCompressionOut.close(); // this will just close the outer stream
-                                               }
-                                       }
-
-                                       if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
-
-                                               final StreamStateHandle 
streamStateHandle = stream.closeAndGetHandle();
-                                               stream = null;
-
-                                               if (asynchronousSnapshots) {
-                                                       LOG.info("Heap backend 
snapshot ({}, asynchronous part) in thread {} took {} ms.",
-                                                               streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
-                                               }
-
-                                               if (streamStateHandle != null) {
-
-                                                       KeyGroupRangeOffsets 
offsets =
-                                                               new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
-
-                                                       final 
KeyGroupsStateHandle keyGroupsStateHandle =
-                                                               new 
KeyGroupsStateHandle(offsets, streamStateHandle);
-
-                                                       return 
SnapshotResult.of(keyGroupsStateHandle);
-                                               }
-                                       }
-
-                                       return SnapshotResult.empty();
-                               }
-                       };
-
-               
AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
+                       CheckpointOptions checkpointOptions) {
 
-               if (!asynchronousSnapshots) {
-                       task.run();
-               }
-
-               LOG.info("Heap backend snapshot (" + streamFactory + ", 
synchronous part) in thread " +
-                               Thread.currentThread() + " took " + 
(System.currentTimeMillis() - syncStartTime) + " ms.");
-
-               return task;
+               return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
        }
 
        @SuppressWarnings("deprecation")
-       @Override
        public void restore(Collection<KeyedStateHandle> restoredState) throws 
Exception {
                if (restoredState == null || restoredState.isEmpty()) {
                        return;
@@ -525,7 +412,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                                        
restoredMetaInfo.getNamespaceSerializer(),
                                                                        
restoredMetaInfo.getStateSerializer());
 
-                                               stateTable = 
newStateTable(registeredKeyedBackendStateMetaInfo);
+                                               stateTable = 
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
                                                
stateTables.put(restoredMetaInfo.getName(), stateTable);
                                                
kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
                                                ++numRegisteredKvStates;
@@ -614,14 +501,244 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return sum;
        }
 
-       public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
-               return asynchronousSnapshots ?
-                               new CopyOnWriteStateTable<>(this, newMetaInfo) :
-                               new NestedMapsStateTable<>(this, newMetaInfo);
-       }
-
        @Override
        public boolean supportsAsynchronousSnapshots() {
-               return asynchronousSnapshots;
+               return snapshotStrategy.isAsynchronous();
+       }
+
+       @VisibleForTesting
+       public LocalRecoveryConfig getLocalRecoveryConfig() {
+               return localRecoveryConfig;
+       }
+
+       private interface SnapshotStrategySynchronicityBehavior<K> {
+
+               default void finalizeSnapshotBeforeReturnHook(Runnable 
runnable) {
+
+               }
+
+               default void logOperationCompleted(CheckpointStreamFactory 
streamFactory, long startTime) {
+
+               }
+
+               boolean isAsynchronous();
+
+               <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo);
+       }
+
+       private class AsyncSnapshotStrategySynchronicityBehavior implements 
SnapshotStrategySynchronicityBehavior<K> {
+
+               @Override
+               public void logOperationCompleted(CheckpointStreamFactory 
streamFactory, long startTime) {
+                       LOG.info("Heap backend snapshot ({}, asynchronous part) 
in thread {} took {} ms.",
+                               streamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
+               }
+
+               @Override
+               public boolean isAsynchronous() {
+                       return true;
+               }
+
+               @Override
+               public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+                       return new 
CopyOnWriteStateTable<>(HeapKeyedStateBackend.this, newMetaInfo);
+               }
+       }
+
+       private class SyncSnapshotStrategySynchronicityBehavior implements 
SnapshotStrategySynchronicityBehavior<K> {
+
+               @Override
+               public void finalizeSnapshotBeforeReturnHook(Runnable runnable) 
{
+                       // this triggers a synchronous execution from the main 
checkpointing thread.
+                       runnable.run();
+               }
+
+               @Override
+               public boolean isAsynchronous() {
+                       return false;
+               }
+
+               @Override
+               public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+                       return new 
NestedMapsStateTable<>(HeapKeyedStateBackend.this, newMetaInfo);
+               }
+       }
+
+       /**
+        * Base class for the snapshots of the heap backend that outlines the 
algorithm and offers some hooks to realize
+        * the concrete strategies. Subclasses must be threadsafe.
+        */
+       private class HeapSnapshotStrategy
+               implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, 
SnapshotStrategySynchronicityBehavior<K> {
+
+               private final SnapshotStrategySynchronicityBehavior<K> 
snapshotStrategySynchronicityTrait;
+
+               public HeapSnapshotStrategy(
+                       SnapshotStrategySynchronicityBehavior<K> 
snapshotStrategySynchronicityTrait) {
+                       this.snapshotStrategySynchronicityTrait = 
snapshotStrategySynchronicityTrait;
+               }
+
+               @Override
+               public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
performSnapshot(
+                       long checkpointId,
+                       long timestamp,
+                       CheckpointStreamFactory primaryStreamFactory,
+                       CheckpointOptions checkpointOptions) {
+
+                       if (!hasRegisteredState()) {
+                               return DoneFuture.of(SnapshotResult.empty());
+                       }
+
+                       long syncStartTime = System.currentTimeMillis();
+
+                       Preconditions.checkState(stateTables.size() <= 
Short.MAX_VALUE,
+                               "Too many KV-States: " + stateTables.size() +
+                                       ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
+                               new ArrayList<>(stateTables.size());
+
+                       final Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
+
+                       final Map<StateTable<K, ?, ?>, StateTableSnapshot> 
cowStateStableSnapshots =
+                               new HashedMap(stateTables.size());
+
+                       for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
+                               kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
+                               StateTable<K, ?, ?> stateTable = 
kvState.getValue();
+                               if (null != stateTable) {
+                                       
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
+                                       cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
+                               }
+                       }
+
+                       final KeyedBackendSerializationProxy<K> 
serializationProxy =
+                               new KeyedBackendSerializationProxy<>(
+                                       keySerializer,
+                                       metaInfoSnapshots,
+                                       
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
+
+                       final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> 
checkpointStreamSupplier =
+
+                               
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(
+                                       
localRecoveryConfig.getLocalRecoveryMode()) ?
+
+                                       () -> 
CheckpointStreamWithResultProvider.createDuplicatingStream(
+                                               checkpointId,
+                                               
CheckpointedStateScope.EXCLUSIVE,
+                                               primaryStreamFactory,
+                                               
localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+                                       () -> 
CheckpointStreamWithResultProvider.createSimpleStream(
+                                               
CheckpointedStateScope.EXCLUSIVE,
+                                               primaryStreamFactory);
+
+                       //--------------------------------------------------- 
this becomes the end of sync part
+
+                       // implementation of the async IO operation, based on 
FutureTask
+                       final 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
+                               new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
+
+                                       CheckpointStreamWithResultProvider 
streamAndResultExtractor = null;
+
+                                       @Override
+                                       protected void acquireResources() 
throws Exception {
+                                               streamAndResultExtractor = 
checkpointStreamSupplier.get();
+                                               
cancelStreamRegistry.registerCloseable(streamAndResultExtractor);
+                                       }
+
+                                       @Override
+                                       protected void releaseResources() {
+
+                                               
unregisterAndCloseStreamAndResultExtractor();
+
+                                               for (StateTableSnapshot 
tableSnapshot : cowStateStableSnapshots.values()) {
+                                                       tableSnapshot.release();
+                                               }
+                                       }
+
+                                       @Override
+                                       protected void stopOperation() {
+                                               
unregisterAndCloseStreamAndResultExtractor();
+                                       }
+
+                                       private void 
unregisterAndCloseStreamAndResultExtractor() {
+                                               if 
(cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) {
+                                                       
IOUtils.closeQuietly(streamAndResultExtractor);
+                                                       
streamAndResultExtractor = null;
+                                               }
+                                       }
+
+                                       @Nonnull
+                                       @Override
+                                       protected 
SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
+
+                                               long startTime = 
System.currentTimeMillis();
+
+                                               
CheckpointStreamFactory.CheckpointStateOutputStream localStream =
+                                                       
this.streamAndResultExtractor.getCheckpointOutputStream();
+
+                                               DataOutputViewStreamWrapper 
outView = new DataOutputViewStreamWrapper(localStream);
+                                               
serializationProxy.write(outView);
+
+                                               long[] keyGroupRangeOffsets = 
new long[keyGroupRange.getNumberOfKeyGroups()];
+
+                                               for (int keyGroupPos = 0; 
keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+                                                       int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
+                                                       
keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
+                                                       
outView.writeInt(keyGroupId);
+
+                                                       for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+                                                               try 
(OutputStream kgCompressionOut = 
keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
+                                                                       
DataOutputViewStreamWrapper kgCompressionView = new 
DataOutputViewStreamWrapper(kgCompressionOut);
+                                                                       
kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
+                                                                       
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
 keyGroupId);
+                                                               } // this will 
just close the outer compression stream
+                                                       }
+                                               }
+
+                                               if 
(cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) {
+                                                       KeyGroupRangeOffsets 
kgOffs = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+                                                       
SnapshotResult<StreamStateHandle> result =
+                                                               
streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
+                                                       
streamAndResultExtractor = null;
+                                                       
logOperationCompleted(primaryStreamFactory, startTime);
+                                                       return 
CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, 
kgOffs);
+                                               }
+
+                                               return SnapshotResult.empty();
+                                       }
+                               };
+
+                       
AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task =
+                               AsyncStoppableTaskWithCallback.from(ioCallable);
+
+                       finalizeSnapshotBeforeReturnHook(task);
+
+                       LOG.info("Heap backend snapshot (" + 
primaryStreamFactory + ", synchronous part) in thread " +
+                               Thread.currentThread() + " took " + 
(System.currentTimeMillis() - syncStartTime) + " ms.");
+
+                       return task;
+               }
+
+               @Override
+               public void finalizeSnapshotBeforeReturnHook(Runnable runnable) 
{
+                       
snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
+               }
+
+               @Override
+               public void logOperationCompleted(CheckpointStreamFactory 
streamFactory, long startTime) {
+                       
snapshotStrategySynchronicityTrait.logOperationCompleted(streamFactory, 
startTime);
+               }
+
+               @Override
+               public boolean isAsynchronous() {
+                       return 
snapshotStrategySynchronicityTrait.isAsynchronous();
+               }
+
+               @Override
+               public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+                       return 
snapshotStrategySynchronicityTrait.newStateTable(newMetaInfo);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e04321fa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 9d0b9e2..b2b568e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -25,65 +27,159 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.BitSet;
 
 /**
  * Tests for {@link StreamOperator} snapshot restoration.
  */
-public class StreamOperatorSnapshotRestoreTest {
+public class StreamOperatorSnapshotRestoreTest extends TestLogger {
+
+       private static final int ONLY_JM_RECOVERY = 0;
+       private static final int TM_AND_JM_RECOVERY = 1;
+       private static final int TM_REMOVE_JM_RECOVERY = 2;
+       private static final int JM_REMOVE_TM_RECOVERY = 3;
 
        private static final int MAX_PARALLELISM = 10;
 
+       protected static TemporaryFolder temporaryFolder;
+
+       @BeforeClass
+       public static void beforeClass() throws IOException {
+               temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+       }
+
+       @AfterClass
+       public static void afterClass() {
+               temporaryFolder.delete();
+       }
+
+       /**
+        * Test restoring an operator from a snapshot (local recovery 
deactivated).
+        */
        @Test
        public void testOperatorStatesSnapshotRestore() throws Exception {
+               testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY);
+       }
+
+       /**
+        * Test restoring an operator from a snapshot (local recovery 
activated).
+        */
+       @Test
+       public void testOperatorStatesSnapshotRestoreWithLocalState() throws 
Exception {
+               testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY);
+       }
+
+       /**
+        * Test restoring an operator from a snapshot (local recovery 
activated, JM snapshot deleted).
+        *
+        * <p>This case does not really simulate a practical scenario, but we 
make sure that restore happens from the local
+        * state here because we discard the JM state.
+        */
+       @Test
+       public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM() 
throws Exception {
+               
testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY);
+       }
+
+       /**
+        * Test restoring an operator from a snapshot (local recovery 
activated, local TM snapshot deleted).
+        *
+        * <p>This tests discards the local state, to simulate corruption and 
checks that we still recover from the fallback
+        * JM state.
+        */
+       @Test
+       public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM() 
throws Exception {
+               
testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY);
+       }
+
+       private void testOperatorStatesSnapshotRestoreInternal(final int mode) 
throws Exception {
 
                
//-------------------------------------------------------------------------- 
snapshot
 
+               StateBackend stateBackend = createStateBackend();
+
                TestOneInputStreamOperator op = new 
TestOneInputStreamOperator(false);
 
+               JobID jobID = new JobID();
+               JobVertexID jobVertexID = new JobVertexID();
+               int subtaskIdx = 0;
+
+               LocalRecoveryDirectoryProvider directoryProvider =
+                       new 
LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, 
jobVertexID, subtaskIdx);
+
+               LocalRecoveryConfig localRecoveryConfig =
+                       mode != ONLY_JM_RECOVERY ?
+                               new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, 
directoryProvider) :
+                               new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, 
directoryProvider);
+
+               MockEnvironment mockEnvironment = new MockEnvironment(
+                       jobID,
+                       jobVertexID,
+                       "test",
+                       1024L * 1024L,
+                       new MockInputSplitProvider(),
+                       1024 * 1024,
+                       new Configuration(),
+                       new ExecutionConfig(),
+                       new TestTaskStateManager(localRecoveryConfig),
+                       MAX_PARALLELISM,
+                       1,
+                       subtaskIdx,
+                       getClass().getClassLoader());
+
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Integer> testHarness =
-                               new KeyedOneInputStreamOperatorTestHarness<>(
-                                               op,
-                                               new KeySelector<Integer, 
Integer>() {
-                                                       @Override
-                                                       public Integer 
getKey(Integer value) throws Exception {
-                                                               return value;
-                                                       }
-                                               },
-                                               
TypeInformation.of(Integer.class),
-                                               MAX_PARALLELISM,
-                                               1 /* num subtasks */,
-                                               0 /* subtask index */);
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               op,
+                               (KeySelector<Integer, Integer>) value -> value,
+                               TypeInformation.of(Integer.class),
+                               mockEnvironment);
+
+               testHarness.setStateBackend(stateBackend);
                testHarness.open();
 
                for (int i = 0; i < 10; ++i) {
                        testHarness.processElement(new StreamRecord<>(i));
                }
 
-               OperatorSubtaskState handles = testHarness.snapshot(1L, 1L);
+               OperatorSnapshotFinalizer snapshotWithLocalState = 
testHarness.snapshotWithLocalState(1L, 1L);
 
                testHarness.close();
 
@@ -91,17 +187,12 @@ public class StreamOperatorSnapshotRestoreTest {
 
                op = new TestOneInputStreamOperator(true);
                testHarness = new 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
-                               op,
-                               new KeySelector<Integer, Integer>() {
-                                       @Override
-                                       public Integer getKey(Integer value) 
throws Exception {
-                                               return value;
-                                       }
-                               },
-                               TypeInformation.of(Integer.class),
-                               MAX_PARALLELISM,
-                               1 /* num subtasks */,
-                               0 /* subtask index */) {
+                       op,
+                       (KeySelector<Integer, Integer>) value -> value,
+                       TypeInformation.of(Integer.class),
+                       MAX_PARALLELISM,
+                       1 /* num subtasks */,
+                       0 /* subtask index */) {
 
                        @Override
                        protected StreamTaskStateInitializer 
createStreamTaskStateManager(
@@ -122,7 +213,21 @@ public class StreamOperatorSnapshotRestoreTest {
                        }
                };
 
-               testHarness.initializeState(handles);
+               testHarness.setStateBackend(stateBackend);
+
+               OperatorSubtaskState jobManagerOwnedState = 
snapshotWithLocalState.getJobManagerOwnedState();
+               OperatorSubtaskState taskLocalState = 
snapshotWithLocalState.getTaskLocalState();
+
+               // We check if local state was created when we enabled local 
recovery
+               Assert.assertTrue(mode > ONLY_JM_RECOVERY == (taskLocalState != 
null && taskLocalState.hasState()));
+
+               if (mode == TM_REMOVE_JM_RECOVERY) {
+                       
jobManagerOwnedState.getManagedKeyedState().discardState();
+               } else if (mode == JM_REMOVE_TM_RECOVERY) {
+                       taskLocalState.getManagedKeyedState().discardState();
+               }
+
+               testHarness.initializeState(jobManagerOwnedState, 
taskLocalState);
 
                testHarness.open();
 
@@ -133,6 +238,15 @@ public class StreamOperatorSnapshotRestoreTest {
                testHarness.close();
        }
 
+       protected StateBackend createStateBackend() throws IOException {
+               return createStateBackendInternal();
+       }
+
+       protected final FsStateBackend createStateBackendInternal() throws 
IOException {
+               File checkpointDir = temporaryFolder.newFolder();
+               return new FsStateBackend(checkpointDir.toURI());
+       }
+
        static class TestOneInputStreamOperator
                        extends AbstractStreamOperator<Integer>
                        implements OneInputStreamOperator<Integer, Integer> {
@@ -237,5 +351,4 @@ public class StreamOperatorSnapshotRestoreTest {
                        }
                }
        }
-
 }

Reply via email to