Repository: flink
Updated Branches:
  refs/heads/master abc3e1c88 -> 7d026aa72


[FLINK-7757] [checkpointing] Introduce resource guard for 
RocksDBKeyedStateBackend to reduce locking and avoid blocking behavior.

This closes #4764.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d026aa7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d026aa7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d026aa7

Branch: refs/heads/master
Commit: 7d026aa72852f32c0ca29924c9e2b565de41b710
Parents: abc3e1c
Author: Stefan Richter <[email protected]>
Authored: Tue Oct 3 15:37:58 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Wed Oct 18 17:12:20 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java |   2 +-
 .../kafka/FlinkKafkaProducerBase.java           |   2 +-
 .../kafka/testutils/IntegerSource.java          |   2 +-
 .../state/RocksDBKeyedStateBackend.java         | 181 ++++++++-----------
 .../state/RocksDBStateBackendTest.java          |  54 +-----
 .../org/apache/flink/util/ResourceGuard.java    | 152 ++++++++++++++++
 .../apache/flink/util/SerializableObject.java   |  28 +++
 .../apache/flink/util/ResourceGuardTest.java    | 133 ++++++++++++++
 .../state/DefaultOperatorStateBackend.java      |  16 +-
 .../flink/runtime/util/SerializableObject.java  |  28 ---
 .../checkpoint/CheckpointStateRestoreTest.java  |   2 +-
 .../runtime/state/StateBackendTestBase.java     |  71 +++++++-
 .../api/functions/sink/SocketClientSink.java    |   2 +-
 13 files changed, 478 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 67e237d..8b1a62d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.util.SerializableObject;
 import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
@@ -42,6 +41,7 @@ import 
org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializableObject;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index befc1a1..cf07a23 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -33,6 +32,7 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegat
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializableObject;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
index ef50766..25a3cea 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.SerializableObject;
 
 import java.util.Collections;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 8236e5b..f67daab 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -72,11 +72,11 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.Checkpoint;
@@ -158,11 +158,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final File instanceRocksDBPath;
 
        /**
-        * Lock for protecting cleanup of the RocksDB against the checkpointing 
thread. We acquire this when doing
-        * asynchronous checkpoints and when disposing the DB. Otherwise, the 
asynchronous snapshot might try
-        * iterating over a disposed DB. After aquriring the lock, always first 
check if (db == null).
+        * Protects access to RocksDB in other threads, like the checkpointing 
thread from parallel call that dispose the
+        * RocksDb object.
         */
-       private final SerializableObject asyncSnapshotLock = new 
SerializableObject();
+       private final ResourceGuard rocksDBResourceGuard;
 
        /**
         * Our RocksDB data base, this is used by the actual subclasses of 
{@link AbstractRocksDBState}
@@ -225,6 +224,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
 
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
+               this.rocksDBResourceGuard = new ResourceGuard();
 
                // ensure that we use the right merge operator, because other 
code relies on this
                this.columnOptions = 
Preconditions.checkNotNull(columnFamilyOptions)
@@ -281,30 +281,29 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        public void dispose() {
                super.dispose();
 
-               // Acquire the lock, so that no ongoing snapshots access the db 
during cleanup
-               synchronized (asyncSnapshotLock) {
-                       // IMPORTANT: null reference to signal potential async 
checkpoint workers that the db was disposed, as
-                       // working on the disposed object results in SEGFAULTS. 
Other code has to check field #db for null
-                       // and access it in a synchronized block that locks on 
#dbDisposeLock.
-                       if (db != null) {
-
-                               // RocksDB's native memory management requires 
that *all* CFs (including default) are closed before the
-                               // DB is closed. So we start with the ones 
created by Flink...
-                               for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
-                                       kvStateInformation.values()) {
-                                       IOUtils.closeQuietly(columnMetaData.f0);
-                               }
+               // This call will block until all clients that still acquired 
access to the RocksDB instance have released it,
+               // so that we cannot release the native resources while clients 
are still working with it in parallel.
+               rocksDBResourceGuard.close();
+
+               // IMPORTANT: null reference to signal potential async 
checkpoint workers that the db was disposed, as
+               // working on the disposed object results in SEGFAULTS.
+               if (db != null) {
 
-                               // ... close the default CF ...
-                               IOUtils.closeQuietly(defaultColumnFamily);
+                       // RocksDB's native memory management requires that 
*all* CFs (including default) are closed before the
+                       // DB is closed. So we start with the ones created by 
Flink...
+                       for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
+                               kvStateInformation.values()) {
+                               IOUtils.closeQuietly(columnMetaData.f0);
+                       }
 
-                               // ... and finally close the DB instance ...
-                               IOUtils.closeQuietly(db);
+                       // ... close the default CF ...
+                       IOUtils.closeQuietly(defaultColumnFamily);
 
-                               // invalidate the reference before releasing 
the lock so that other accesses will not cause crashes
-                               db = null;
+                       // ... and finally close the DB instance ...
+                       IOUtils.closeQuietly(db);
 
-                       }
+                       // invalidate the reference before releasing the lock 
so that other accesses will not cause crashes
+                       db = null;
                }
 
                kvStateInformation.clear();
@@ -356,6 +355,18 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                final long checkpointTimestamp,
                final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
 
+               if (db == null) {
+                       throw new IOException("RocksDB closed.");
+               }
+
+               if (kvStateInformation.isEmpty()) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
+                                       checkpointTimestamp + " . Returning 
null.");
+                       }
+                       return DoneFuture.nullValue();
+               }
+
                final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
                        new RocksDBIncrementalSnapshotOperation<>(
                                this,
@@ -363,21 +374,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                checkpointId,
                                checkpointTimestamp);
 
-               synchronized (asyncSnapshotLock) {
-                       if (db == null) {
-                               throw new IOException("RocksDB closed.");
-                       }
-
-                       if (kvStateInformation.isEmpty()) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at " +
-                                               checkpointTimestamp + " . 
Returning null.");
-                               }
-                               return DoneFuture.nullValue();
-                       }
-
-                       snapshotOperation.takeSnapshot();
-               }
+               snapshotOperation.takeSnapshot();
 
                return new FutureTask<KeyedStateHandle>(
                        new Callable<KeyedStateHandle>() {
@@ -410,28 +407,18 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                final RocksDBFullSnapshotOperation<K> snapshotOperation;
 
-               // hold the db lock while operation on the db to guard us 
against async db disposal
-               synchronized (asyncSnapshotLock) {
-
-                       if (db != null) {
-
-                               if (kvStateInformation.isEmpty()) {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at " + timestamp +
-                                                       " . Returning null.");
-                                       }
-                                       return DoneFuture.nullValue();
-                               }
-
-                               snapshotOperation =
-                                       new 
RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
-
-                               snapshotOperation.takeDBSnapShot(checkpointId, 
timestamp);
-                       } else {
-                               throw new IOException("RocksDB closed.");
+               if (kvStateInformation.isEmpty()) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
+                                       " . Returning null.");
                        }
+
+                       return DoneFuture.nullValue();
                }
 
+               snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
+               snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
+
                // implementation of the async IO operation, based on FutureTask
                AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable 
=
                        new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
@@ -450,9 +437,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                private void 
releaseSnapshotOperationResources() {
                                        // hold the db lock while operation on 
the db to guard us against async db disposal
-                                       synchronized (asyncSnapshotLock) {
-                                               
snapshotOperation.releaseSnapshotResources();
-                                       }
+                                       
snapshotOperation.releaseSnapshotResources();
                                }
 
                                @Override
@@ -474,16 +459,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                public KeyGroupsStateHandle performOperation() 
throws Exception {
                                        long startTime = 
System.currentTimeMillis();
 
-                                       synchronized (asyncSnapshotLock) {
-                                               // hold the db lock while 
operation on the db to guard us against async db disposal
-                                               if (db == null) {
-                                                       throw new 
IOException("RocksDB closed.");
-                                               }
-
-                                               
snapshotOperation.writeDBSnapshot();
-                                               
snapshotOperation.createSnapshotResultStateHandleFromOutputStream();
+                                       if (isStopped()) {
+                                               throw new IOException("RocksDB 
closed.");
                                        }
 
+                                       snapshotOperation.writeDBSnapshot();
+
                                        LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
                                                streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
 
@@ -509,6 +490,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private final KeyGroupRangeOffsets keyGroupRangeOffsets;
                private final CheckpointStreamFactory checkpointStreamFactory;
                private final CloseableRegistry snapshotCloseableRegistry;
+               private final ResourceGuard.Lease dbLease;
 
                private long checkpointId;
                private long checkpointTimeStamp;
@@ -519,17 +501,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
                private DataOutputView outputView;
-               private KeyGroupsStateHandle snapshotResultStateHandle;
 
                RocksDBFullSnapshotOperation(
                        RocksDBKeyedStateBackend<K> stateBackend,
                        CheckpointStreamFactory checkpointStreamFactory,
-                       CloseableRegistry registry) {
+                       CloseableRegistry registry) throws IOException {
 
                        this.stateBackend = stateBackend;
                        this.checkpointStreamFactory = checkpointStreamFactory;
                        this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
                        this.snapshotCloseableRegistry = registry;
+                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
                }
 
                /**
@@ -575,11 +557,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                /**
-                * 4) Close the CheckpointStateOutputStream after writing and 
receive a state handle.
+                * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
                 *
-                * @throws IOException
+                * @return state handle to the completed snapshot
                 */
-               public void createSnapshotResultStateHandleFromOutputStream() 
throws IOException {
+               public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
 
                        if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
 
@@ -587,9 +569,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                outStream = null;
 
                                if (stateHandle != null) {
-                                       this.snapshotResultStateHandle = new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+                                       return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
                                }
                        }
+                       return null;
                }
 
                /**
@@ -618,38 +601,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                IOUtils.closeQuietly(readOptions);
                                readOptions = null;
                        }
-               }
-
-               /**
-                * Drop the created snapshot if we have ben cancelled.
-                */
-               public void dropSnapshotResult() {
-                       if (null != snapshotResultStateHandle) {
-                               try {
-                                       
snapshotResultStateHandle.discardState();
-                               } catch (Exception e) {
-                                       LOG.warn("Exception occurred during 
snapshot state handle cleanup.", e);
-                               }
-                       }
-               }
-
-               /**
-                * Returns the current CheckpointStateOutputStream (when it was 
opened and not yet closed) into which we write
-                * the state snapshot.
-                *
-                * @return the current CheckpointStateOutputStream
-                */
-               public CheckpointStreamFactory.CheckpointStateOutputStream 
getOutStream() {
-                       return outStream;
-               }
 
-               /**
-                * Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
-                *
-                * @return state handle to the completed snapshot
-                */
-               public KeyGroupsStateHandle getSnapshotResultStateHandle() {
-                       return snapshotResultStateHandle;
+                       this.dbLease.close();
                }
 
                private void writeKVStateMetaData() throws IOException {
@@ -827,18 +780,22 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                // handles to the misc files in the current snapshot
                private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
 
+               // This lease protects from concurrent disposal of the native 
rocksdb instance.
+               private final ResourceGuard.Lease dbLease;
+
                private StreamStateHandle metaStateHandle = null;
 
                private RocksDBIncrementalSnapshotOperation(
                        RocksDBKeyedStateBackend<K> stateBackend,
                        CheckpointStreamFactory checkpointStreamFactory,
                        long checkpointId,
-                       long checkpointTimestamp) {
+                       long checkpointTimestamp) throws IOException {
 
                        this.stateBackend = stateBackend;
                        this.checkpointStreamFactory = checkpointStreamFactory;
                        this.checkpointId = checkpointId;
                        this.checkpointTimestamp = checkpointTimestamp;
+                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
                }
 
                private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
@@ -919,7 +876,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                void takeSnapshot() throws Exception {
-                       assert 
(Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
                        final long lastCompletedCheckpoint;
 
@@ -940,6 +896,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        // save state data
                        backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+
+                       LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", backupPath);
+
                        backupFileSystem = backupPath.getFileSystem();
                        if (backupFileSystem.exists(backupPath)) {
                                throw new IllegalStateException("Unexpected 
existence of the backup directory.");
@@ -1013,6 +972,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                void releaseResources(boolean canceled) {
 
+                       dbLease.close();
+
                        if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
                                try {
                                        closeableRegistry.close();
@@ -1024,6 +985,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        if (backupPath != null) {
                                try {
                                        if 
(backupFileSystem.exists(backupPath)) {
+
+                                               LOG.trace("Deleting local 
RocksDB backup path {}.", backupPath);
                                                
backupFileSystem.delete(backupPath, true);
                                        }
                                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 08d661c..a2ec052 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -215,46 +215,6 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        }
 
        @Test
-       public void testRunningSnapshotAfterBackendClosed() throws Exception {
-               setupRocksKeyedStateBackend();
-               RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
-                       CheckpointOptions.forFullCheckpoint());
-
-               RocksDB spyDB = keyedStateBackend.db;
-
-               if (!enableIncrementalCheckpointing) {
-                       verify(spyDB, times(1)).getSnapshot();
-                       verify(spyDB, 
times(0)).releaseSnapshot(any(Snapshot.class));
-               }
-
-               this.keyedStateBackend.dispose();
-               verify(spyDB, times(1)).close();
-               assertEquals(null, keyedStateBackend.db);
-
-               //Ensure every RocksObjects not closed yet
-               for (RocksObject rocksCloseable : allCreatedCloseables) {
-                       verify(rocksCloseable, times(0)).close();
-               }
-
-               Thread asyncSnapshotThread = new Thread(snapshot);
-               asyncSnapshotThread.start();
-               try {
-                       snapshot.get();
-                       fail();
-               } catch (Exception ignored) {
-
-               }
-
-               asyncSnapshotThread.join();
-
-               //Ensure every RocksObject was closed exactly once
-               for (RocksObject rocksCloseable : allCreatedCloseables) {
-                       verify(rocksCloseable, times(1)).close();
-               }
-
-       }
-
-       @Test
        public void testCorrectMergeOperatorSet() throws IOException {
 
                final ColumnFamilyOptions columnFamilyOptions = spy(new 
ColumnFamilyOptions());
@@ -289,8 +249,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                setupRocksKeyedStateBackend();
 
                try {
-                       RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
-                               CheckpointOptions.forFullCheckpoint());
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forFullCheckpoint());
 
                        RocksDB spyDB = keyedStateBackend.db;
 
@@ -299,10 +259,6 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                verify(spyDB, 
times(0)).releaseSnapshot(any(Snapshot.class));
                        }
 
-                       this.keyedStateBackend.dispose();
-                       verify(spyDB, times(1)).close();
-                       assertEquals(null, keyedStateBackend.db);
-
                        //Ensure every RocksObjects not closed yet
                        for (RocksObject rocksCloseable : allCreatedCloseables) 
{
                                verify(rocksCloseable, times(0)).close();
@@ -310,10 +266,16 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
 
                        snapshot.cancel(true);
 
+                       this.keyedStateBackend.dispose();
+
+                       verify(spyDB, times(1)).close();
+                       assertEquals(null, keyedStateBackend.db);
+
                        //Ensure every RocksObjects was closed exactly once
                        for (RocksObject rocksCloseable : allCreatedCloseables) 
{
                                verify(rocksCloseable, times(1)).close();
                        }
+
                } finally {
                        keyedStateBackend.dispose();
                        keyedStateBackend = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java 
b/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
new file mode 100644
index 0000000..83660a6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is a guard for shared resources with the following invariants. 
The resource can be acquired by multiple
+ * clients in parallel through the {@link #acquireResource()} call. As a 
result of the call, each client gets a
+ * {@link Lease}. The {@link #close()} method of the lease releases the 
resources and reduces the client count in
+ * the {@link ResourceGuard} object.
+ * The protected resource should only be disposed once the corresponding 
resource guard is successfully closed, but
+ * the guard can only be closed once all clients that acquired a lease for the 
guarded resource have released it.
+ * Before this is happened, the call to {@link #close()} will block until the 
zero-open-leases condition is triggered.
+ * After the resource guard is closed, calls to {@link #acquireResource()} 
will fail with exception. Notice that,
+ * obviously clients are responsible to release the resource after usage. All 
clients are considered equal, i.e. there
+ * is only a global count maintained how many times the resource was acquired 
but not by whom.
+ */
+public class ResourceGuard implements AutoCloseable, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The object that serves as lock for count and the closed-flag.
+        */
+       private final SerializableObject lock;
+
+       /**
+        * Number of clients that have ongoing access to the resource.
+        */
+       private volatile int leaseCount;
+
+       /**
+        * This flag indicated if it is still possible to acquire access to the 
resource.
+        */
+       private volatile boolean closed;
+
+       public ResourceGuard() {
+               this.lock = new SerializableObject();
+               this.leaseCount = 0;
+               this.closed = false;
+       }
+
+       /**
+        * Acquired access from one new client for the guarded resource.
+        *
+        * @throws IOException when the resource guard is already closed.
+        */
+       public Lease acquireResource() throws IOException {
+
+               synchronized (lock) {
+
+                       if (closed) {
+                               throw new IOException("Resource guard was 
already closed.");
+                       }
+
+                       ++leaseCount;
+               }
+
+               return new Lease();
+       }
+
+       /**
+        * Releases access for one client of the guarded resource. This method 
must only be called after a matching call to
+        * {@link #acquireResource()}.
+        */
+       private void releaseResource() {
+
+               synchronized (lock) {
+
+                       --leaseCount;
+
+                       if (closed && leaseCount == 0) {
+                               lock.notifyAll();
+                       }
+               }
+       }
+
+       /**
+        * Closed the resource guard. This method will block until all calls to 
{@link #acquireResource()} have seen their
+        * matching call to {@link #releaseResource()}.
+        */
+       @Override
+       public void close() {
+
+               synchronized (lock) {
+
+                       closed = true;
+
+                       while (leaseCount > 0) {
+
+                               try {
+                                       lock.wait();
+                               } catch (InterruptedException ignore) {
+                                       // Even on interruption, we cannot 
terminate the loop until all open leases are closed.
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Returns true if the resource guard is closed, i.e. after {@link 
#close()} was called.
+        */
+       public boolean isClosed() {
+               return closed;
+       }
+
+       /**
+        * Returns the current count of open leases.
+        */
+       public int getLeaseCount() {
+               return leaseCount;
+       }
+
+       /**
+        * A lease is issued by the {@link ResourceGuard} as result of calls to 
{@link #acquireResource()}. The owner of the
+        * lease can release it via the {@link #close()} call.
+        */
+       public class Lease implements AutoCloseable {
+
+               private final AtomicBoolean closed;
+
+               private Lease() {
+                       this.closed = new AtomicBoolean(false);
+               }
+
+               @Override
+               public void close() {
+                       if (closed.compareAndSet(false, true)) {
+                               releaseResource();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java 
b/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java
new file mode 100644
index 0000000..ec01731
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * A simple object that only implements {@link java.io.Serializable}, so it 
can be used
+ * in serializable classes.
+ */
+public class SerializableObject implements java.io.Serializable {
+       
+       private static final long serialVersionUID = -7322636177391854669L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java 
b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
new file mode 100644
index 0000000..a030a81
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ResourceGuardTest {
+
+       @Test
+       public void testClose() {
+               ResourceGuard resourceGuard = new ResourceGuard();
+               Assert.assertFalse(resourceGuard.isClosed());
+               resourceGuard.close();
+               Assert.assertTrue(resourceGuard.isClosed());
+               try {
+                       resourceGuard.acquireResource();
+                       Assert.fail();
+               } catch (IOException ignore) {
+               }
+       }
+
+       @Test
+       public void testAcquireReleaseClose() throws IOException {
+               ResourceGuard resourceGuard = new ResourceGuard();
+               ResourceGuard.Lease lease = resourceGuard.acquireResource();
+               Assert.assertEquals(1, resourceGuard.getLeaseCount());
+               lease.close();
+               Assert.assertEquals(0, resourceGuard.getLeaseCount());
+               resourceGuard.close();
+               Assert.assertTrue(resourceGuard.isClosed());
+       }
+
+       @Test
+       public void testCloseBlockIfAcquired() throws Exception {
+               ResourceGuard resourceGuard = new ResourceGuard();
+               ResourceGuard.Lease lease_1 = resourceGuard.acquireResource();
+               AtomicBoolean checker = new AtomicBoolean(true);
+
+               Thread closerThread = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       // this line should block until all 
acquires are matched by releases.
+                                       resourceGuard.close();
+                                       checker.set(false);
+                               } catch (Exception ignore) {
+                                       checker.set(false);
+                               }
+                       }
+               };
+
+               closerThread.start();
+
+               ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
+               lease_2.close();
+               Assert.assertTrue(checker.get());
+
+               // this matches the first acquire and will unblock the close.
+               lease_1.close();
+               closerThread.join(60_000);
+               Assert.assertFalse(checker.get());
+       }
+
+       @Test
+       public void testInterruptHandledCorrectly() throws Exception {
+               ResourceGuard resourceGuard = new ResourceGuard();
+               ResourceGuard.Lease lease = resourceGuard.acquireResource();
+               AtomicBoolean checker = new AtomicBoolean(true);
+
+               Thread closerThread = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       // this line should block until all 
acquires are matched by releases.
+                                       resourceGuard.close();
+                                       checker.set(false);
+                               } catch (Exception ignore) {
+                                       checker.set(false);
+                               }
+                       }
+               };
+
+               closerThread.start();
+               closerThread.interrupt();
+
+               Assert.assertTrue(checker.get());
+
+               lease.close();
+               closerThread.join(60_000);
+               Assert.assertFalse(checker.get());
+       }
+
+       @Test
+       public void testLeaseCloseIsIdempotent() throws Exception {
+               ResourceGuard resourceGuard = new ResourceGuard();
+               ResourceGuard.Lease lease_1 = resourceGuard.acquireResource();
+               ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
+               Assert.assertEquals(2, resourceGuard.getLeaseCount());
+               lease_1.close();
+               Assert.assertEquals(1, resourceGuard.getLeaseCount());
+               lease_1.close();
+               Assert.assertEquals(1, resourceGuard.getLeaseCount());
+               lease_2.close();
+               Assert.assertEquals(0, resourceGuard.getLeaseCount());
+               ResourceGuard.Lease lease_3 = resourceGuard.acquireResource();
+               Assert.assertEquals(1, resourceGuard.getLeaseCount());
+               lease_2.close();
+               Assert.assertEquals(1, resourceGuard.getLeaseCount());
+               lease_3.close();
+               Assert.assertEquals(0, resourceGuard.getLeaseCount());
+               resourceGuard.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 1fb03d7..9edf8fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -233,19 +233,25 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                                @Override
                                protected void acquireResources() throws 
Exception {
-                                       out = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-                                       
closeStreamOnCancelRegistry.registerCloseable(out);
+                                       openOutStream();
                                }
 
                                @Override
                                protected void releaseResources() throws 
Exception {
-                                       if 
(closeStreamOnCancelRegistry.unregisterCloseable(out)) {
-                                               IOUtils.closeQuietly(out);
-                                       }
+                                       closeOutStream();
                                }
 
                                @Override
                                protected void stopOperation() throws Exception 
{
+                                       closeOutStream();
+                               }
+
+                               private void openOutStream() throws Exception {
+                                       out = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+                                       
closeStreamOnCancelRegistry.registerCloseable(out);
+                               }
+
+                               private void closeOutStream() {
                                        if 
(closeStreamOnCancelRegistry.unregisterCloseable(out)) {
                                                IOUtils.closeQuietly(out);
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
deleted file mode 100644
index af6fa16..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-/**
- * A simple object that only implements {@link java.io.Serializable}, so it 
can be used
- * in serializable classes.
- */
-public class SerializableObject implements java.io.Serializable {
-       
-       private static final long serialVersionUID = -7322636177391854669L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 1788434..47daa01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.util.SerializableObject;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index dbf131f..50b5e26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -66,14 +66,13 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
-
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -2914,6 +2913,74 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
        }
 
+       /**
+        * The purpose of this test is to check that parallel snapshots are 
possible, and work even if a previous snapshot
+        * is still running and blocking.
+        */
+       @Test
+       public void testParallelAsyncSnapshots() throws Exception {
+               OneShotLatch blocker = new OneShotLatch();
+               OneShotLatch waiter = new OneShotLatch();
+               BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+               streamFactory.setWaiterLatch(waiter);
+               streamFactory.setBlockerLatch(blocker);
+               streamFactory.setAfterNumberInvocations(10);
+
+               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               try {
+
+                       if (!backend.supportsAsynchronousSnapshots()) {
+                               return;
+                       }
+
+                       // insert some data to the backend.
+                       InternalValueState<VoidNamespace, Integer> valueState = 
backend.createValueState(
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       for (int i = 0; i < 10; ++i) {
+                               backend.setCurrentKey(i);
+                               valueState.update(i);
+                       }
+
+                       RunnableFuture<KeyedStateHandle> snapshot1 =
+                               backend.snapshot(0L, 0L, streamFactory, 
CheckpointOptions.forFullCheckpoint());
+
+                       Thread runner1 = new Thread(snapshot1, 
"snapshot-1-runner");
+                       runner1.start();
+                       // after this call returns, we have a running 
snapshot-1 that is blocked in IO.
+                       waiter.await();
+
+                       // do some updates in between the snapshots.
+                       for (int i = 5; i < 15; ++i) {
+                               backend.setCurrentKey(i);
+                               valueState.update(i + 1);
+                       }
+
+                       // we don't want to block the second snapshot.
+                       streamFactory.setWaiterLatch(null);
+                       streamFactory.setBlockerLatch(null);
+
+                       RunnableFuture<KeyedStateHandle> snapshot2 =
+                               backend.snapshot(1L, 1L, streamFactory, 
CheckpointOptions.forFullCheckpoint());
+
+                       Thread runner2 = new 
Thread(snapshot2,"snapshot-2-runner");
+                       runner2.start();
+                       // snapshot-2 should run and succeed, while snapshot-1 
is still running and blocked in IO.
+                       snapshot2.get();
+
+                       // we release the blocking IO so that snapshot-1 can 
also finish and succeed.
+                       blocker.trigger();
+                       snapshot1.get();
+
+               } finally {
+                       backend.dispose();
+               }
+       }
+
        @Test
        public void testAsyncSnapshot() throws Exception {
                OneShotLatch waiter = new OneShotLatch();

http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index c43345b..214d5c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.SerializableObject;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Reply via email to