[FLINK-5146] Improved resource cleanup in RocksDB keyed state backend

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4f802dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4f802dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4f802dd

Branch: refs/heads/master
Commit: e4f802dd502f38b922f668c2813728d5511ca289
Parents: 6e98a93
Author: Stefan Richter <[email protected]>
Authored: Sat Nov 12 21:13:28 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Dec 2 18:10:13 2016 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 312 ++++++++++------
 .../state/RocksDBStateBackendTest.java          | 364 ++++++++++++++++++-
 .../io/async/AbstractAsyncIOCallable.java       |   2 +-
 .../runtime/io/async/AsyncDoneCallback.java     |   4 +-
 .../async/AsyncStoppableTaskWithCallback.java   |   8 +-
 .../memory/MemCheckpointStreamFactory.java      |   2 +-
 .../api/operators/OperatorSnapshotResult.java   |   9 +-
 .../streaming/runtime/tasks/StreamTask.java     |  67 ++--
 .../flink/core/testutils/OneShotLatch.java      |   9 +
 9 files changed, 624 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index e498b34..bc5b17d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -61,7 +61,7 @@ import org.rocksdb.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -103,14 +103,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * asynchronous checkpoints and when disposing the DB. Otherwise, the 
asynchronous snapshot might try
         * iterating over a disposed DB. After aquriring the lock, always first 
check if (db == null).
         */
-       private final SerializableObject dbDisposeLock = new 
SerializableObject();
+       private final SerializableObject asyncSnapshotLock = new 
SerializableObject();
 
        /**
         * Our RocksDB data base, this is used by the actual subclasses of 
{@link AbstractRocksDBState}
         * to store state. The different k/v states that we have don't each 
have their own RocksDB
         * instance. They all write to this instance but to their own column 
family.
         */
-       @GuardedBy("dbDisposeLock")
        protected RocksDB db;
 
        /**
@@ -136,7 +135,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        ) throws Exception {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange);
-
                this.operatorIdentifier = operatorIdentifier;
                this.jobId = jobId;
                this.columnOptions = columnFamilyOptions;
@@ -206,8 +204,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
 
-               RocksDBRestoreOperation restoreOperation = new 
RocksDBRestoreOperation(this);
-               restoreOperation.doRestore(restoreState);
+               try {
+                       RocksDBRestoreOperation restoreOperation = new 
RocksDBRestoreOperation(this);
+                       restoreOperation.doRestore(restoreState);
+               } catch (Exception ex) {
+                       dispose();
+                       throw ex;
+               }
        }
 
        /**
@@ -217,23 +220,22 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        public void dispose() {
                super.dispose();
 
-               final RocksDB cleanupRockDBReference;
-
-               // Acquire the log on dbDisposeLock, so that no ongoing 
snapshots access the db during cleanup
-               synchronized (dbDisposeLock) {
+               // Acquire the lock, so that no ongoing snapshots access the db 
during cleanup
+               synchronized (asyncSnapshotLock) {
                        // IMPORTANT: null reference to signal potential async 
checkpoint workers that the db was disposed, as
                        // working on the disposed object results in SEGFAULTS. 
Other code has to check field #db for null
                        // and access it in a synchronized block that locks on 
#dbDisposeLock.
-                       cleanupRockDBReference = db;
-                       db = null;
-               }
+                       if (db != null) {
 
-               // Dispose decoupled db
-               if (cleanupRockDBReference != null) {
-                       for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> 
column : kvStateInformation.values()) {
-                               column.f0.close();
+                               for (Tuple2<ColumnFamilyHandle, 
StateDescriptor<?, ?>> column : kvStateInformation.values()) {
+                                       column.f0.close();
+                               }
+
+                               kvStateInformation.clear();
+
+                               db.close();
+                               db = null;
                        }
-                       cleanupRockDBReference.close();
                }
 
                try {
@@ -252,10 +254,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * is also stopped when the backend is closed through {@link 
#dispose()}. For each backend, this method must always
         * be called by the same thread.
         *
-        * @param checkpointId The Id of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
+        * @param checkpointId  The Id of the checkpoint.
+        * @param timestamp     The timestamp of the checkpoint.
         * @param streamFactory The factory that we can use for writing our 
state to streams.
-        *
         * @return Future to the state handle of the snapshot data.
         * @throws Exception
         */
@@ -267,14 +268,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                long startTime = System.currentTimeMillis();
 
-               if (kvStateInformation.isEmpty()) {
-                       LOG.info("Asynchronous RocksDB snapshot performed on 
empty keyed state at " + timestamp + " . Returning null.");
-                       return new DoneFuture<>(null);
-               }
-
                final RocksDBSnapshotOperation snapshotOperation = new 
RocksDBSnapshotOperation(this, streamFactory);
                // hold the db lock while operation on the db to guard us 
against async db disposal
-               synchronized (dbDisposeLock) {
+               synchronized (asyncSnapshotLock) {
+
+                       if (kvStateInformation.isEmpty()) {
+                               LOG.info("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
+                                               " . Returning null.");
+
+                               return new DoneFuture<>(null);
+                       }
+
                        if (db != null) {
                                snapshotOperation.takeDBSnapShot(checkpointId, 
timestamp);
                        } else {
@@ -295,18 +299,18 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        @Override
                                        public KeyGroupsStateHandle 
performOperation() throws Exception {
                                                long startTime = 
System.currentTimeMillis();
-                                               try {
-                                                       // hold the db lock 
while operation on the db to guard us against async db disposal
-                                                       synchronized 
(dbDisposeLock) {
-                                                               if (db != null) 
{
-                                                                       
snapshotOperation.writeDBSnapshot();
-                                                               } else {
+                                               synchronized 
(asyncSnapshotLock) {
+                                                       try {
+                                                               // hold the db 
lock while operation on the db to guard us against async db disposal
+                                                               if (db == null) 
{
                                                                        throw 
new IOException("RocksDB closed.");
                                                                }
-                                                       }
 
-                                               } finally {
-                                                       
snapshotOperation.closeCheckpointStream();
+                                                               
snapshotOperation.writeDBSnapshot();
+
+                                                       } finally {
+                                                               
snapshotOperation.closeCheckpointStream();
+                                                       }
                                                }
 
                                                LOG.info("Asynchronous RocksDB 
snapshot (" + streamFactory + ", asynchronous part) in thread " +
@@ -315,15 +319,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                return 
snapshotOperation.getSnapshotResultStateHandle();
                                        }
 
-                                       @Override
-                                       public void done() {
+                                       private void 
releaseSnapshotOperationResources(boolean canceled) {
                                                // hold the db lock while 
operation on the db to guard us against async db disposal
-                                               synchronized (dbDisposeLock) {
-                                                       if (db != null) {
-                                                               
snapshotOperation.releaseDBSnapshot();
-                                                       }
+                                               synchronized 
(asyncSnapshotLock) {
+                                                       
snapshotOperation.releaseSnapshotResources(canceled);
                                                }
                                        }
+
+                                       @Override
+                                       public void done(boolean canceled) {
+                                               
releaseSnapshotOperationResources(canceled);
+                                       }
                                };
 
                LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", 
synchronous part) in thread " +
@@ -348,14 +354,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private long checkpointTimeStamp;
 
                private Snapshot snapshot;
+               private ReadOptions readOptions;
                private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
                private DataOutputView outputView;
                private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
                private KeyGroupsStateHandle snapshotResultStateHandle;
 
-
-
-               public RocksDBSnapshotOperation(
+               RocksDBSnapshotOperation(
                                RocksDBKeyedStateBackend<?> stateBackend,
                                CheckpointStreamFactory 
checkpointStreamFactory) {
 
@@ -397,7 +402,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws IOException
                 */
                public void writeDBSnapshot() throws IOException, 
InterruptedException {
-                       Preconditions.checkNotNull(snapshot, "No ongoing 
snapshot to write.");
+
+                       if (null == snapshot) {
+                               throw new IOException("No snapshot available. 
Might be released due to cancellation.");
+                       }
+
                        Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
                        writeKVStateMetaData();
                        writeKVStateData();
@@ -412,6 +421,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        if (outStream != null) {
                                
stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
                                snapshotResultStateHandle = 
closeSnapshotStreamAndGetHandle();
+                       } else {
+                               snapshotResultStateHandle = null;
                        }
                }
 
@@ -419,13 +430,36 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * 5) Release the snapshot object for RocksDB and clean up.
                 *
                 */
-               public void releaseDBSnapshot() {
-                       Preconditions.checkNotNull(snapshot, "No ongoing 
snapshot to release.");
-                       stateBackend.db.releaseSnapshot(snapshot);
-                       snapshot = null;
-                       outStream = null;
-                       outputView = null;
-                       kvStateIterators = null;
+               public void releaseSnapshotResources(boolean canceled) {
+                       if (null != kvStateIterators) {
+                               for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
+                                       kvStateIterator.f0.close();
+                               }
+                               kvStateIterators = null;
+                       }
+
+                       if (null != snapshot) {
+                               if(null != stateBackend.db) {
+                                       
stateBackend.db.releaseSnapshot(snapshot);
+                               }
+                               snapshot.close();
+                               snapshot = null;
+                       }
+
+                       if (null != readOptions) {
+                               readOptions.close();
+                               readOptions = null;
+                       }
+
+                       if (canceled) {
+                               try {
+                                       if (null != snapshotResultStateHandle) {
+                                               
snapshotResultStateHandle.discardState();
+                                       }
+                               } catch (Exception ignored) {
+                                       LOG.warn("Exception occurred during 
snapshot state handle cleanup: " + ignored);
+                               }
+                       }
                }
 
                /**
@@ -462,7 +496,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                InstantiationUtil.serializeObject(outStream, 
column.getValue().f1);
 
                                //retrieve iterator for this k/v states
-                               ReadOptions readOptions = new ReadOptions();
+                               readOptions = new ReadOptions();
                                readOptions.setSnapshot(snapshot);
                                RocksIterator iterator = 
stateBackend.db.newIterator(column.getValue().f0, readOptions);
                                kvStateIterators.add(new Tuple2<>(iterator, 
kvStateId));
@@ -472,59 +506,64 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private void writeKVStateData() throws IOException, 
InterruptedException {
 
-                       RocksDBMergeIterator iterator = new 
RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes);
-
                        byte[] previousKey = null;
                        byte[] previousValue = null;
 
-                       //preamble: setup with first key-group as our lookahead
-                       if (iterator.isValid()) {
-                               //begin first key-group by recording the offset
-                               
keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
-                               //write the k/v-state id as metadata
-                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                               outputView.writeShort(iterator.kvStateId());
-                               previousKey = iterator.key();
-                               previousValue = iterator.value();
-                               iterator.next();
-                       }
+                       List<Tuple2<RocksIterator, Integer>> 
kvStateIteratorsHandover = this.kvStateIterators;
+                       this.kvStateIterators = null;
 
-                       //main loop: write k/v pairs ordered by (key-group, 
kv-state), thereby tracking key-group offsets.
-                       while (iterator.isValid()) {
+                       // Here we transfer ownership of RocksIterators to the 
RocksDBMergeIterator
+                       try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
+                                       kvStateIteratorsHandover, 
stateBackend.keyGroupPrefixBytes)) {
 
-                               assert (!hasMetaDataFollowsFlag(previousKey));
+                               //preamble: setup with first key-group as our 
lookahead
+                               if (mergeIterator.isValid()) {
+                                       //begin first key-group by recording 
the offset
+                                       
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
+                                       //write the k/v-state id as metadata
+                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                       
outputView.writeShort(mergeIterator.kvStateId());
+                                       previousKey = mergeIterator.key();
+                                       previousValue = mergeIterator.value();
+                                       mergeIterator.next();
+                               }
 
-                               //set signal in first key byte that meta data 
will follow in the stream after this k/v pair
-                               if (iterator.isNewKeyGroup() || 
iterator.isNewKeyValueState()) {
+                               //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
+                               while (mergeIterator.isValid()) {
 
-                                       //be cooperative and check for 
interruption from time to time in the hot loop
-                                       checkInterrupted();
+                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
 
-                                       
setMetaDataFollowsFlagInKey(previousKey);
-                               }
+                                       //set signal in first key byte that 
meta data will follow in the stream after this k/v pair
+                                       if (mergeIterator.isNewKeyGroup() || 
mergeIterator.isNewKeyValueState()) {
 
-                               writeKeyValuePair(previousKey, previousValue);
+                                               //be cooperative and check for 
interruption from time to time in the hot loop
+                                               checkInterrupted();
 
-                               //write meta data if we have to
-                               if (iterator.isNewKeyGroup()) {
-                                       //
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
outputView.writeShort(END_OF_KEY_GROUP_MARK);
-                                       //begin new key-group
-                                       
keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
-                                       //write the kev-state
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
outputView.writeShort(iterator.kvStateId());
-                               } else if (iterator.isNewKeyValueState()) {
-                                       //write the k/v-state
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
outputView.writeShort(iterator.kvStateId());
-                               }
+                                               
setMetaDataFollowsFlagInKey(previousKey);
+                                       }
 
-                               //request next k/v pair
-                               previousKey = iterator.key();
-                               previousValue = iterator.value();
-                               iterator.next();
+                                       writeKeyValuePair(previousKey, 
previousValue);
+
+                                       //write meta data if we have to
+                                       if (mergeIterator.isNewKeyGroup()) {
+                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                               
outputView.writeShort(END_OF_KEY_GROUP_MARK);
+                                               //begin new key-group
+                                               
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
+                                               //write the kev-state
+                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                               
outputView.writeShort(mergeIterator.kvStateId());
+                                       } else if 
(mergeIterator.isNewKeyValueState()) {
+                                               //write the k/v-state
+                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                               
outputView.writeShort(mergeIterator.kvStateId());
+                                       }
+
+                                       //request next k/v pair
+                                       previousKey = mergeIterator.key();
+                                       previousValue = mergeIterator.value();
+                                       mergeIterator.next();
+                               }
                        }
 
                        //epilogue: write last key-group
@@ -540,11 +579,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() 
throws IOException {
                        StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
                        outStream = null;
-                       if (stateHandle != null) {
-                               return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
-                       } else {
-                               throw new IOException("Output stream returned 
null on close.");
-                       }
+                       return stateHandle != null ? new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
                }
 
                private void writeKeyValuePair(byte[] key, byte[] value) throws 
IOException {
@@ -566,7 +601,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private static void checkInterrupted() throws 
InterruptedException {
                        if(Thread.currentThread().isInterrupted()) {
-                               throw new InterruptedException("Snapshot 
canceled.");
+                               throw new InterruptedException("RocksDB 
snapshot interrupted.");
                        }
                }
        }
@@ -655,7 +690,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        //restore the empty columns for the k/v states through 
the metadata
                        for (int i = 0; i < numColumns; i++) {
 
-                               StateDescriptor<?, ?> stateDescriptor = 
(StateDescriptor<?, ?>) InstantiationUtil.deserializeObject(
+                               StateDescriptor<?, ?> stateDescriptor = 
InstantiationUtil.deserializeObject(
                                                currentStateHandleInStream,
                                                
rocksDBKeyedStateBackend.userCodeClassLoader);
 
@@ -829,13 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                public int getKvStateId() {
                        return kvStateId;
                }
+
+               public void close() {
+                       this.iterator.close();
+               }
        }
 
        /**
         * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
         * The resulting iteration sequence is ordered by (key-group, kv-state).
         */
-       static final class RocksDBMergeIterator {
+       static final class RocksDBMergeIterator implements Closeable {
 
                private final PriorityQueue<MergeIterator> heap;
                private final int keyGroupPrefixByteCount;
@@ -845,18 +884,29 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private MergeIterator currentSubIterator;
 
-               RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) throws IOException {
+               private static final List<Comparator<MergeIterator>> 
COMPARATORS;
+
+               static {
+                       int maxBytes = 4;
+                       COMPARATORS = new ArrayList<>(maxBytes);
+                       for (int i = 0; i < maxBytes; ++i) {
+                               final int currentBytes = i;
+                               COMPARATORS.add(new Comparator<MergeIterator>() 
{
+                                       @Override
+                                       public int compare(MergeIterator o1, 
MergeIterator o2) {
+                                               int arrayCmpRes = 
compareKeyGroupsForByteArrays(
+                                                               o1.currentKey, 
o2.currentKey, currentBytes);
+                                               return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+                                       }
+                               });
+                       }
+               }
+
+               RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
                        Preconditions.checkNotNull(kvStateIterators);
                        this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
 
-                       Comparator<MergeIterator> iteratorComparator = new 
Comparator<MergeIterator>() {
-                               @Override
-                               public int compare(MergeIterator o1, 
MergeIterator o2) {
-                                       int arrayCmpRes = 
compareKeyGroupsForByteArrays(
-                                                       o1.currentKey, 
o2.currentKey, keyGroupPrefixByteCount);
-                                       return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-                               }
-                       };
+                       Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount);
 
                        if (kvStateIterators.size() > 0) {
                                this.heap = new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
@@ -866,8 +916,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        rocksIterator.seekToFirst();
                                        if (rocksIterator.isValid()) {
                                                heap.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+                                       } else {
+                                               rocksIterator.close();
                                        }
                                }
+
+                               kvStateIterators.clear();
+
                                this.valid = !heap.isEmpty();
                                this.currentSubIterator = heap.poll();
                        } else {
@@ -901,14 +956,18 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        newKVState = 
currentSubIterator.getIterator() != rocksIterator;
                                        detectNewKeyGroup(oldKey);
                                }
-                       } else if (heap.isEmpty()) {
-                               valid = false;
                        } else {
-                               currentSubIterator = heap.poll();
-                               newKVState = true;
-                               detectNewKeyGroup(oldKey);
-                       }
+                               rocksIterator.close();
 
+                               if (heap.isEmpty()) {
+                                       currentSubIterator = null;
+                                       valid = false;
+                               } else {
+                                       currentSubIterator = heap.poll();
+                                       newKVState = true;
+                                       detectNewKeyGroup(oldKey);
+                               }
+                       }
                }
 
                private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
@@ -986,6 +1045,21 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                        return 0;
                }
+
+               @Override
+               public void close() {
+
+                       if (null != currentSubIterator) {
+                               currentSubIterator.close();
+                               currentSubIterator = null;
+                       }
+
+                       for (MergeIterator iterator : heap) {
+                               iterator.close();
+                       }
+
+                       heap.clear();
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9d25434..314717b 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,26 +18,72 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksObject;
+import org.rocksdb.Snapshot;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
 
 /**
  * Tests for the partitioned state part of {@link RocksDBStateBackend}.
  */
 public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBackend> {
 
+       private OneShotLatch blocker;
+       private OneShotLatch waiter;
+       private BlockerCheckpointStreamFactory testStreamFactory;
+       private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
+       private List<RocksObject> allCreatedCloseables;
+       private ValueState<Integer> testState1;
+       private ValueState<String> testState2;
+
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
        @Before
-       public void checkOperatingSystem() {
+       public void checkOS() throws Exception {
                Assume.assumeTrue("This test can't run successfully on 
Windows.", !OperatingSystem.isWindows());
        }
 
@@ -49,4 +95,320 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                backend.setDbStoragePath(dbPath);
                return backend;
        }
+
+       public void setupRocksKeyedStateBackend() throws Exception {
+
+               blocker = new OneShotLatch();
+               waiter = new OneShotLatch();
+               testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 
1024);
+               testStreamFactory.setBlockerLatch(blocker);
+               testStreamFactory.setWaiterLatch(waiter);
+               testStreamFactory.setAfterNumberInvocations(100);
+
+               RocksDBStateBackend backend = getStateBackend();
+               Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+               keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) 
backend.createKeyedStateBackend(
+                               env,
+                               new JobID(),
+                               "Test",
+                               IntSerializer.INSTANCE,
+                               2,
+                               new KeyGroupRange(0, 1),
+                               mock(TaskKvStateRegistry.class));
+
+               testState1 = keyedStateBackend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("TestState-1", 
Integer.class, 0));
+
+               testState2 = keyedStateBackend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("TestState-2", 
String.class, ""));
+
+               allCreatedCloseables = new ArrayList<>();
+
+               keyedStateBackend.db = spy(keyedStateBackend.db);
+
+               doAnswer(new Answer<Object>() {
+
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               RocksIterator rocksIterator = 
spy((RocksIterator) invocationOnMock.callRealMethod());
+                               allCreatedCloseables.add(rocksIterator);
+                               return rocksIterator;
+                       }
+               
}).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), 
any(ReadOptions.class));
+
+               doAnswer(new Answer<Object>() {
+
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               Snapshot snapshot = spy((Snapshot) 
invocationOnMock.callRealMethod());
+                               allCreatedCloseables.add(snapshot);
+                               return snapshot;
+                       }
+               }).when(keyedStateBackend.db).getSnapshot();
+
+               doAnswer(new Answer<Object>() {
+
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               ColumnFamilyHandle snapshot = 
spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
+                               allCreatedCloseables.add(snapshot);
+                               return snapshot;
+                       }
+               
}).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class));
+
+               for (int i = 0; i < 100; ++i) {
+                       keyedStateBackend.setCurrentKey(i);
+                       testState1.update(4200 + i);
+                       testState2.update("S-" + (4200 + i));
+               }
+       }
+
+       @Test
+       public void testRunningSnapshotAfterBackendClosed() throws Exception {
+               setupRocksKeyedStateBackend();
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+
+               RocksDB spyDB = keyedStateBackend.db;
+
+               verify(spyDB, times(1)).getSnapshot();
+               verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+
+               this.keyedStateBackend.dispose();
+               verify(spyDB, times(1)).close();
+               assertEquals(null, keyedStateBackend.db);
+
+               //Ensure every RocksObjects not closed yet
+               for (RocksObject rocksCloseable : allCreatedCloseables) {
+                       verify(rocksCloseable, times(0)).close();
+               }
+
+               Thread asyncSnapshotThread = new Thread(snapshot);
+               asyncSnapshotThread.start();
+               try {
+                       snapshot.get();
+                       fail();
+               } catch (Exception ignored) {
+
+               }
+
+               asyncSnapshotThread.join();
+
+               //Ensure every RocksObject was closed exactly once
+               for (RocksObject rocksCloseable : allCreatedCloseables) {
+                       verify(rocksCloseable, times(1)).close();
+               }
+
+       }
+
+       @Test
+       public void testReleasingSnapshotAfterBackendClosed() throws Exception {
+               setupRocksKeyedStateBackend();
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+
+               RocksDB spyDB = keyedStateBackend.db;
+
+               verify(spyDB, times(1)).getSnapshot();
+               verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+
+               this.keyedStateBackend.dispose();
+               verify(spyDB, times(1)).close();
+               assertEquals(null, keyedStateBackend.db);
+
+               //Ensure every RocksObjects not closed yet
+               for (RocksObject rocksCloseable : allCreatedCloseables) {
+                       verify(rocksCloseable, times(0)).close();
+               }
+
+               snapshot.cancel(true);
+
+               //Ensure every RocksObjects was closed exactly once
+               for (RocksObject rocksCloseable : allCreatedCloseables) {
+                       verify(rocksCloseable, times(1)).close();
+               }
+
+       }
+
+       @Test
+       public void testDismissingSnapshot() throws Exception {
+               setupRocksKeyedStateBackend();
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               snapshot.cancel(true);
+               verifyRocksObjectsReleased();
+       }
+
+       @Test
+       public void testDismissingSnapshotNotRunnable() throws Exception {
+               setupRocksKeyedStateBackend();
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               snapshot.cancel(true);
+               Thread asyncSnapshotThread = new Thread(snapshot);
+               asyncSnapshotThread.start();
+               try {
+                       snapshot.get();
+                       fail();
+               } catch (Exception ignored) {
+
+               }
+               asyncSnapshotThread.join();
+               verifyRocksObjectsReleased();
+       }
+
+       @Test
+       public void testCompletingSnapshot() throws Exception {
+               setupRocksKeyedStateBackend();
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               Thread asyncSnapshotThread = new Thread(snapshot);
+               asyncSnapshotThread.start();
+               waiter.await(); // wait for snapshot to run
+               waiter.reset();
+               runStateUpdates();
+               blocker.trigger(); // allow checkpointing to start writing
+               waiter.await(); // wait for snapshot stream writing to run
+               KeyGroupsStateHandle keyGroupsStateHandle = snapshot.get();
+               assertNotNull(keyGroupsStateHandle);
+               assertTrue(keyGroupsStateHandle.getStateSize() > 0);
+               assertEquals(2, keyGroupsStateHandle.getNumberOfKeyGroups());
+               assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+               asyncSnapshotThread.join();
+               verifyRocksObjectsReleased();
+       }
+
+       @Test
+       public void testCancelRunningSnapshot() throws Exception {
+               setupRocksKeyedStateBackend();
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               Thread asyncSnapshotThread = new Thread(snapshot);
+               asyncSnapshotThread.start();
+               waiter.await(); // wait for snapshot to run
+               waiter.reset();
+               runStateUpdates();
+               blocker.trigger(); // allow checkpointing to start writing
+               snapshot.cancel(true);
+               assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+               waiter.await(); // wait for snapshot stream writing to run
+               try {
+                       snapshot.get();
+                       fail();
+               } catch (Exception ignored) {
+               }
+
+               verifyRocksObjectsReleased();
+               asyncSnapshotThread.join();
+       }
+
+       private void runStateUpdates() throws Exception{
+               for (int i = 50; i < 150; ++i) {
+                       if (i % 10 == 0) {
+                               Thread.sleep(1);
+                       }
+                       keyedStateBackend.setCurrentKey(i);
+                       testState1.update(4200 + i);
+                       testState2.update("S-" + (4200 + i));
+               }
+       }
+
+       private void verifyRocksObjectsReleased() {
+               //Ensure every RocksObject was closed exactly once
+               for (RocksObject rocksCloseable : allCreatedCloseables) {
+                       verify(rocksCloseable, times(1)).close();
+               }
+
+               assertNotNull(null, keyedStateBackend.db);
+               RocksDB spyDB = keyedStateBackend.db;
+
+               verify(spyDB, times(1)).getSnapshot();
+               verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class));
+
+               keyedStateBackend.dispose();
+               verify(spyDB, times(1)).close();
+               assertEquals(null, keyedStateBackend.db);
+       }
+
+       static class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+               private final int maxSize;
+               private int afterNumberInvocations;
+               private OneShotLatch blocker;
+               private OneShotLatch waiter;
+
+               MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
+
+               public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
+                       return lastCreatedStream;
+               }
+
+               public BlockerCheckpointStreamFactory(int maxSize) {
+                       this.maxSize = maxSize;
+               }
+
+               public void setAfterNumberInvocations(int 
afterNumberInvocations) {
+                       this.afterNumberInvocations = afterNumberInvocations;
+               }
+
+               public void setBlockerLatch(OneShotLatch latch) {
+                       this.blocker = latch;
+               }
+
+               public void setWaiterLatch(OneShotLatch latch) {
+                       this.waiter = latch;
+               }
+
+               @Override
+               public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+                       waiter.trigger();
+                       this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+                               private int afterNInvocations = 
afterNumberInvocations;
+                               private final OneShotLatch streamBlocker = 
blocker;
+                               private final OneShotLatch streamWaiter = 
waiter;
+
+                               @Override
+                               public void write(int b) throws IOException {
+
+                                       if (afterNInvocations > 0) {
+                                               --afterNInvocations;
+                                       }
+
+                                       if (0 == afterNInvocations && null != 
streamBlocker) {
+                                               try {
+                                                       streamBlocker.await();
+                                               } catch (InterruptedException 
ignored) {
+                                               }
+                                       }
+                                       try {
+                                               super.write(b);
+                                       } catch (IOException ex) {
+                                               if (null != streamWaiter) {
+                                                       streamWaiter.trigger();
+                                               }
+                                               throw ex;
+                                       }
+
+                                       if (0 == afterNInvocations && null != 
streamWaiter) {
+                                               streamWaiter.trigger();
+                                       }
+                               }
+
+                               @Override
+                               public void close() {
+                                       super.close();
+                                       if (null != streamWaiter) {
+                                               streamWaiter.trigger();
+                                       }
+                               }
+                       };
+
+                       return lastCreatedStream;
+               }
+
+               @Override
+               public void close() throws Exception {
+
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
index 989e868..1968d40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
@@ -131,7 +131,7 @@ public abstract class AbstractAsyncIOCallable<V, D extends 
Closeable> implements
         * it finished or was stopped.
         */
        @Override
-       public void done() {
+       public void done(boolean canceled) {
                //optional callback hook
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
index 13d9057..dcc5525 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
@@ -25,7 +25,9 @@ public interface AsyncDoneCallback {
 
        /**
         * the callback
+        *
+        * @param canceled true if the callback is done, but was canceled
         */
-       void done();
+       void done(boolean canceled);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
index 8316e4f..1ca109c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
@@ -36,17 +36,13 @@ public class AsyncStoppableTaskWithCallback<V> extends 
FutureTask<V> {
 
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
-               
-               if (mayInterruptIfRunning) {
-                       stoppableCallbackCallable.stop();
-               }
-
+               stoppableCallbackCallable.stop();
                return super.cancel(mayInterruptIfRunning);
        }
 
        @Override
        protected void done() {
-               stoppableCallbackCallable.done();
+               stoppableCallbackCallable.done(isCancelled());
        }
 
        public static <V> AsyncStoppableTaskWithCallback<V> 
from(StoppableCallbackCallable<V> callable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 30de638..9b2b46f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -144,7 +144,7 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                return bytes;
                        }
                        else {
-                               throw new IllegalStateException("stream has 
already been closed");
+                               throw new IOException("stream has already been 
closed");
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 52c89f8..4265edc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -78,4 +78,11 @@ public class OperatorSnapshotResult {
        public void 
setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
                this.operatorStateRawFuture = operatorStateRawFuture;
        }
-}
\ No newline at end of file
+
+       public void cancel() {
+               getKeyedStateManagedFuture().cancel(true);
+               getOperatorStateManagedFuture().cancel(true);
+               getKeyedStateRawFuture().cancel(true);
+               getOperatorStateRawFuture().cancel(true);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6595901..fac37c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -942,9 +942,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                @Override
                public void close() {
-                       //TODO Handle other state futures in case we actually 
run them. Currently they are just DoneFutures.
-                       if (futureKeyedBackendStateHandles != null) {
-                               futureKeyedBackendStateHandles.cancel(true);
+                       // cleanup/release ongoing snapshot operations
+                       for (OperatorSnapshotResult snapshotResult : 
snapshotInProgressList) {
+                               snapshotResult.cancel();
                        }
                }
        }
@@ -985,35 +985,55 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                        startSyncPartNano = System.nanoTime();
 
-                       for (StreamOperator<?> op : allOperators) {
+                       boolean failed = true;
+                       try {
 
-                               createStreamFactory(op);
-                               snapshotNonPartitionableState(op);
+                               for (StreamOperator<?> op : allOperators) {
 
-                               OperatorSnapshotResult snapshotInProgress =
-                                               
op.snapshotState(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), streamFactory);
+                                       createStreamFactory(op);
+                                       snapshotNonPartitionableState(op);
 
-                               snapshotInProgressList.add(snapshotInProgress);
-                       }
+                                       OperatorSnapshotResult 
snapshotInProgress =
+                                                       
op.snapshotState(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), streamFactory);
 
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Finished synchronous checkpoints for 
checkpoint {} on task {}",
-                                               
checkpointMetaData.getCheckpointId(), owner.getName());
-                       }
+                                       
snapshotInProgressList.add(snapshotInProgress);
+                               }
+
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Finished synchronous 
checkpoints for checkpoint {} on task {}",
+                                                       
checkpointMetaData.getCheckpointId(), owner.getName());
+                               }
+
+                               startAsyncPartNano = System.nanoTime();
 
-                       startAsyncPartNano= System.nanoTime();
+                               
checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - 
startSyncPartNano) / 1_000_000);
 
-                       
checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - 
startSyncPartNano) / 1_000_000);
+                               // at this point we are transferring ownership 
over snapshotInProgressList for cleanup to the thread
+                               runAsyncCheckpointingAndAcknowledge();
+                               failed = false;
 
-                       runAsyncCheckpointingAndAcknowledge();
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("{} - finished synchronous 
part of checkpoint {}." +
+                                                                       
"Alignment duration: {} ms, snapshot duration {} ms",
+                                                       owner.getName(), 
checkpointMetaData.getCheckpointId(),
+                                                       
checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+                                                       
checkpointMetaData.getSyncDurationMillis());
+                               }
+                       } finally {
+                               if (failed) {
+                                       // Cleanup to release resources
+                                       for (OperatorSnapshotResult 
operatorSnapshotResult : snapshotInProgressList) {
+                                               operatorSnapshotResult.cancel();
+                                       }
 
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{} - finished synchronous part of 
checkpoint {}." +
-                                                               "Alignment 
duration: {} ms, snapshot duration {} ms",
-                                               owner.getName(), 
checkpointMetaData.getCheckpointId(),
-                                               
checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-                                               
checkpointMetaData.getSyncDurationMillis());
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}." +
+                                                                               
"Alignment duration: {} ms, snapshot duration {} ms",
+                                                               
owner.getName(), checkpointMetaData.getCheckpointId());
+                                       }
+                               }
                        }
+
                }
 
                private void createStreamFactory(StreamOperator<?> operator) 
throws IOException {
@@ -1051,6 +1071,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
 
                public void runAsyncCheckpointingAndAcknowledge() throws 
IOException {
+
                        AsyncCheckpointRunnable asyncCheckpointRunnable = new 
AsyncCheckpointRunnable(
                                        owner,
                                        nonPartitionedStates,

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 0418bf5..b3d86e5 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -107,4 +107,13 @@ public final class OneShotLatch {
        public boolean isTriggered() {
                return triggered;
        }
+
+       /**
+        * resets the latch to triggered = false
+        */
+       public void reset() {
+               synchronized (lock) {
+                       triggered = false;
+               }
+       }
 }

Reply via email to