[FLINK-5218] [state backends] Eagerly close pending 
FsCheckpointStateOutputStream on task cancellation

This fix contains two modifications:
  1. State backends implement 'Closeable' and register themselves at the 
'canceleables'
  2. The FsStateBackend tracks all it unclosed FsCheckpointOutputStreams and 
closes them on 'close()'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f61bf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f61bf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f61bf6

Branch: refs/heads/release-1.1
Commit: 59f61bf6cb8351cec9369e2de39c6eeffbda10ea
Parents: e475eb2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 22:38:23 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:09:43 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  86 ++++--
 .../runtime/state/AbstractStateBackend.java     |  58 +++-
 .../state/filesystem/FsStateBackend.java        | 159 ++++++++--
 .../state/memory/MemoryStateBackend.java        |   2 +-
 .../FsCheckpointStateOutputStreamTest.java      | 132 ++++++++-
 .../state/FsStateBackendClosingTest.java        |  65 +++++
 .../streaming/runtime/tasks/StreamTask.java     |   7 +
 .../runtime/tasks/BlockingCheckpointsTest.java  | 290 +++++++++++++++++++
 .../streaming/runtime/StateBackendITCase.java   |   2 +-
 9 files changed, 731 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index deba9f9..0412a4a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -312,50 +312,74 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public void dispose() {
-               super.dispose();
-               nonPartitionedStateBackend.dispose();
+       public void dispose() throws Exception {
+               Throwable exception = null;
 
-               // we have to lock because we might have an asynchronous 
checkpoint going on
-               synchronized (dbCleanupLock) {
-                       if (db != null) {
-                               if (this.dbOptions != null) {
-                                       this.dbOptions.dispose();
-                                       this.dbOptions = null;
-                               }
-
-                               for (Tuple2<ColumnFamilyHandle, 
StateDescriptor> column : kvStateInformation.values()) {
-                                       column.f0.dispose();
-                               }
+               // make sure the individual states are disposed
+               try {
+                       super.dispose();
+               }
+               catch (Throwable t) {
+                       exception = t;
+               }
 
-                               db.dispose();
-                               db = null;
+               try {
+                       nonPartitionedStateBackend.dispose();
+               }
+               catch (Throwable t) {
+                       if (exception == null) {
+                               exception = t;
+                       } else {
+                               exception.addSuppressed(t);
                        }
                }
-       }
-
-       @Override
-       public void close() throws Exception {
-               nonPartitionedStateBackend.close();
 
                // we have to lock because we might have an asynchronous 
checkpoint going on
-               synchronized (dbCleanupLock) {
-                       if (db != null) {
-                               if (this.dbOptions != null) {
-                                       this.dbOptions.dispose();
-                                       this.dbOptions = null;
-                               }
+               // this must also happen in any case, regardless of earlier 
exceptions
+               try {
+                       synchronized (dbCleanupLock) {
+                               if (db != null) {
+                                       if (this.dbOptions != null) {
+                                               this.dbOptions.dispose();
+                                               this.dbOptions = null;
+                                       }
+
+                                       for (Tuple2<ColumnFamilyHandle, 
StateDescriptor> column : kvStateInformation.values()) {
+                                               column.f0.dispose();
+                                       }
 
-                               for (Tuple2<ColumnFamilyHandle, 
StateDescriptor> column : kvStateInformation.values()) {
-                                       column.f0.dispose();
+                                       db.dispose();
+                                       db = null;
                                }
+                       }
+               }
+               catch (Throwable t) {
+                       if (exception == null) {
+                               exception = t;
+                       } else {
+                               exception.addSuppressed(t);
+                       }
+               }
 
-                               db.dispose();
-                               db = null;
+               if (exception != null) {
+                       if (exception instanceof Exception) {
+                               throw (Exception) exception;
+                       } else if (exception instanceof Error) {
+                               throw (Error) exception;
+                       } else {
+                               throw new Exception(exception.getMessage(), 
exception);
                        }
                }
        }
 
+       @Override
+       public void close() throws IOException {
+               // we only close all I/O streams here and do not yet dispose of 
the native resources
+               // otherwise this can lead to SEGFAULT problems
+               // native resources must only be released in the 'dispose()' 
method.
+               nonPartitionedStateBackend.close();
+       }
+
        private File getDbPath(String stateName) {
                return new File(new File(new File(getNextStoragePath(), 
jobId.toString()), operatorIdentifier), stateName);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index b86688b..ab9854c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.execution.Environment;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -51,8 +52,8 @@ import java.util.Map;
 /**
  * A state backend defines how state is stored and snapshotted during 
checkpoints.
  */
-public abstract class AbstractStateBackend implements java.io.Serializable {
-       
+public abstract class AbstractStateBackend implements java.io.Serializable, 
Closeable {
+
        private static final long serialVersionUID = 4620413814639220247L;
 
        protected transient TypeSerializer<?> keySerializer;
@@ -102,23 +103,61 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
        public abstract void disposeAllStateForCurrentJob() throws Exception;
 
        /**
-        * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
-        * checkpoint data.
+        * Closes the state backend, dropping and aborting all I/O operations 
that are currently
+        * pending.
         *
-        * @throws Exception Exceptions can be forwarded and will be logged by 
the system
+        * @throws IOException Exceptions can be forwarded and will be logged 
by the system
         */
-       public abstract void close() throws Exception;
+       public abstract void close() throws IOException;
+
+       /**
+        * Releases all resources held by this state backend.
+        * 
+        * <p>This method must make sure that all resources are disposed, even 
if an exception happens
+        * on the way.
+        * 
+        * @throws Exception This method should report exceptions that occur.
+        */
+       public void dispose() throws Exception {
+               Throwable exception = null;
+
+               // make sure things are closed
+               try {
+                       close();
+               }
+               catch (Throwable t) {
+                       exception = t;
+               }
 
-       public void dispose() {
+               // now actually dispose things
                lastName = null;
                lastState = null;
                if (keyValueStates != null) {
                        for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
-                               state.dispose();
+                               try {
+                                       state.dispose();
+                               }
+                               catch (Throwable t) {
+                                       if (exception == null) {
+                                               exception = t;
+                                       } else {
+                                               exception.addSuppressed(t);
+                                       }
+                               }
                        }
                }
                keyValueStates = null;
                keyValueStatesByName = null;
+
+               if (exception != null) {
+                       if (exception instanceof Exception) {
+                               throw (Exception) exception;
+                       } else if (exception instanceof Error) {
+                               throw (Error) exception;
+                       } else {
+                               throw new Exception(exception.getMessage(), 
exception);
+                       }
+               }
        }
        
        // 
------------------------------------------------------------------------
@@ -444,6 +483,9 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                 * @throws IOException Thrown, if the stream cannot be closed.
                 */
                public StateHandle<DataInputView> closeAndGetHandle() throws 
IOException {
+                       // we do not flush() here, because that forces the 
'CheckpointStateOutputStream' to files,
+                       // even when it could stay in a 'small chunk' memory 
handle.
+                       // the 'DataOutputViewStreamWrapper' does not buffer 
data anyways
                        return new DataInputViewHandle(out.closeAndGetHandle());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 8a8a26d..446f3ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -33,9 +33,10 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +45,13 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
  *
@@ -86,6 +91,10 @@ public class FsStateBackend extends AbstractStateBackend {
        /** Cached handle to the file system for file operations */
        private transient FileSystem filesystem;
 
+       /** Set of currently open streams */
+       private transient HashSet<FsCheckpointStateOutputStream> openStreams;
+
+       private transient volatile boolean closed;
 
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
@@ -236,9 +245,11 @@ public class FsStateBackend extends AbstractStateBackend {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void initializeForJob(Environment env,
-               String operatorIdentifier,
-               TypeSerializer<?> keySerializer) throws Exception {
+       public void initializeForJob(
+                       Environment env,
+                       String operatorIdentifier,
+                       TypeSerializer<?> keySerializer) throws Exception {
+
                super.initializeForJob(env, operatorIdentifier, keySerializer);
 
                Path dir = new Path(basePath, env.getJobID().toString());
@@ -249,6 +260,7 @@ public class FsStateBackend extends AbstractStateBackend {
                filesystem.mkdirs(dir);
 
                checkpointDirectory = dir;
+               openStreams = new HashSet<>();
        }
 
        @Override
@@ -267,7 +279,42 @@ public class FsStateBackend extends AbstractStateBackend {
        }
 
        @Override
-       public void close() throws Exception {}
+       public void close() throws IOException {
+               closed = true;
+
+               // cache a copy on the heap for safety 
+               final HashSet<FsCheckpointStateOutputStream> openStreams = 
this.openStreams;
+               if (openStreams != null) {
+
+                       // we need to draw a copy of the set, since the closing 
concurrently modifies the set
+                       final ArrayList<FsCheckpointStateOutputStream> streams;
+
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (openStreams) {
+                               streams = new ArrayList<>(openStreams);
+                               openStreams.clear();
+                       }
+
+                       // close all the streams, collect exceptions and record 
all but the first as suppressed
+                       Throwable exception = null;
+                       for (FsCheckpointStateOutputStream stream : streams) {
+                               try {
+                                       stream.close();
+                               }
+                               catch (Throwable t) {
+                                       if (exception == null) {
+                                               exception = t;
+                                       } else {
+                                               exception.addSuppressed(t);
+                                       }
+                               }
+                       }
+
+                       if (exception != null) {
+                               ExceptionUtils.rethrowIOException(exception);
+                       }
+               }
+       }
 
        // 
------------------------------------------------------------------------
        //  state backend operations
@@ -299,15 +346,22 @@ public class FsStateBackend extends AbstractStateBackend {
                        S state, long checkpointID, long timestamp) throws 
Exception
        {
                checkFileSystemInitialized();
-               
+
                Path checkpointDir = createCheckpointDirPath(checkpointID);
                int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
 
-               FsCheckpointStateOutputStream stream = 
-                       new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
-               
-               try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
+               try (FsCheckpointStateOutputStream stream = new 
FsCheckpointStateOutputStream(
+                               checkpointDir, filesystem, openStreams, 
bufferSize, fileStateThreshold))
+               {
+                       // perform the closing double-check AFTER! the creation 
of the stream
+                       if (closed) {
+                               throw new IOException("closed");
+                       }
+
+                       ObjectOutputStream os = new ObjectOutputStream(stream);
                        os.writeObject(state);
+                       os.flush();
+
                        return 
stream.closeAndGetHandle().toSerializableHandle();
                }
        }
@@ -318,7 +372,16 @@ public class FsStateBackend extends AbstractStateBackend {
 
                Path checkpointDir = createCheckpointDirPath(checkpointID);
                int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
-               return new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
+               FsCheckpointStateOutputStream stream = new 
FsCheckpointStateOutputStream(
+                               checkpointDir, filesystem, openStreams, 
bufferSize, fileStateThreshold);
+
+               // perform the closing double-check AFTER! the creation of the 
stream
+               if (closed) {
+                       stream.close();
+                       throw new IOException("closed");
+               }
+
+               return stream;
        }
 
        // 
------------------------------------------------------------------------
@@ -426,29 +489,40 @@ public class FsStateBackend extends AbstractStateBackend {
                private int pos;
 
                private FSDataOutputStream outStream;
-               
+
                private final int localStateThreshold;
 
                private final Path basePath;
 
                private final FileSystem fs;
-               
+
+               private final HashSet<FsCheckpointStateOutputStream> 
openStreams;
+
                private Path statePath;
-               
-               private boolean closed;
+
+               private volatile boolean closed;
 
                public FsCheckpointStateOutputStream(
-                                       Path basePath, FileSystem fs,
-                                       int bufferSize, int localStateThreshold)
+                                       Path basePath,
+                                       FileSystem fs,
+                                       HashSet<FsCheckpointStateOutputStream> 
openStreams,
+                                       int bufferSize,
+                                       int localStateThreshold)
                {
                        if (bufferSize < localStateThreshold) {
                                throw new IllegalArgumentException();
                        }
-                       
+
                        this.basePath = basePath;
                        this.fs = fs;
+                       this.openStreams = checkNotNull(openStreams);
                        this.writeBuffer = new byte[bufferSize];
                        this.localStateThreshold = localStateThreshold;
+
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (openStreams) {
+                               openStreams.add(this);
+                       }
                }
 
 
@@ -519,6 +593,9 @@ public class FsStateBackend extends AbstractStateBackend {
                                        pos = 0;
                                }
                        }
+                       else {
+                               throw new IOException("stream is closed");
+                       }
                }
 
                /**
@@ -530,6 +607,16 @@ public class FsStateBackend extends AbstractStateBackend {
                public void close() {
                        if (!closed) {
                                closed = true;
+
+                               // make sure that the write() methods cannot 
succeed any more
+                               pos = writeBuffer.length;
+
+                               // remove the stream from the open streams set
+                               synchronized (openStreams) {
+                                       openStreams.remove(this);
+                               }
+
+                               // close all resources
                                if (outStream != null) {
                                        try {
                                                outStream.close();
@@ -551,15 +638,24 @@ public class FsStateBackend extends AbstractStateBackend {
                public StreamStateHandle closeAndGetHandle() throws IOException 
{
                        synchronized (this) {
                                if (!closed) {
+
+                                       // remove the stream from the open 
streams set
+                                       synchronized (openStreams) {
+                                               openStreams.remove(this);
+                                       }
+
+                                       // close all resources
                                        if (outStream == null && pos <= 
localStateThreshold) {
                                                closed = true;
                                                byte[] bytes = 
Arrays.copyOf(writeBuffer, pos);
+                                               pos = writeBuffer.length;
                                                return new 
ByteStreamStateHandle(bytes);
                                        }
                                        else {
                                                flush();
                                                outStream.close();
                                                closed = true;
+                                               pos = writeBuffer.length;
                                                return new 
FileStreamStateHandle(statePath);
                                        }
                                }
@@ -577,9 +673,17 @@ public class FsStateBackend extends AbstractStateBackend {
                public Path closeAndGetPath() throws IOException {
                        synchronized (this) {
                                if (!closed) {
-                                       closed = true;
+
+                                       // remove the stream from the open 
streams set
+                                       synchronized (openStreams) {
+                                               openStreams.remove(this);
+                                       }
+
+                                       // close all resources
                                        flush();
                                        outStream.close();
+                                       closed = true;
+                                       pos = writeBuffer.length;
                                        return statePath;
                                }
                                else {
@@ -587,5 +691,22 @@ public class FsStateBackend extends AbstractStateBackend {
                                }
                        }
                }
+
+               public boolean isClosed() {
+                       return closed;
+               }
+
+               // we need referential identity on the streams for the closing 
set to work
+               // properly, so we implement that via final methods here
+
+               @Override
+               public final int hashCode() {
+                       return super.hashCode();
+               }
+
+               @Override
+               public final boolean equals(Object obj) {
+                       return this == obj;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..b155244 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -78,7 +78,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
        }
 
        @Override
-       public void close() throws Exception {}
+       public void close() {}
 
        // 
------------------------------------------------------------------------
        //  State backend operations

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 5964b72..3aba9e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -22,34 +22,36 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import 
org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashSet;
 import java.util.Random;
 
 import static org.junit.Assert.*;
 
 public class FsCheckpointStateOutputStreamTest {
-       
+
        /** The temp dir, obtained in a platform neutral way */
        private static final Path TEMP_DIR_PATH = new Path(new 
File(System.getProperty("java.io.tmpdir")).toURI());
-       
-       
+
+
        @Test(expected = IllegalArgumentException.class)
        public void testWrongParameters() {
                // this should fail
                new FsStateBackend.FsCheckpointStateOutputStream(
-                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 
5000);
+                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new 
HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
        }
 
-
        @Test
        public void testEmptyState() throws Exception {
                AbstractStateBackend.CheckpointStateOutputStream stream = new 
FsStateBackend.FsCheckpointStateOutputStream(
-                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 
512);
+                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new 
HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
                
                StreamStateHandle handle = stream.closeAndGetHandle();
                assertTrue(handle instanceof ByteStreamStateHandle);
@@ -57,7 +59,101 @@ public class FsCheckpointStateOutputStreamTest {
                InputStream inStream = 
handle.getState(ClassLoader.getSystemClassLoader());
                assertEquals(-1, inStream.read());
        }
-       
+
+       @Test
+       public void testCloseAndGetPath() throws Exception {
+               FsCheckpointStateOutputStream stream = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH,
+                               FileSystem.getLocalFileSystem(),
+                               new HashSet<FsCheckpointStateOutputStream>(),
+                               1024,
+                               512);
+
+               stream.write(1);
+
+               Path path = stream.closeAndGetPath();
+               assertNotNull(path);
+
+               // cleanup
+               FileSystem.getLocalFileSystem().delete(path, false);
+       }
+
+       @Test
+       public void testWriteFailsFastWhenClosed() throws Exception {
+               final HashSet<FsCheckpointStateOutputStream> openStreams = new 
HashSet<>();
+
+               FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
openStreams, 1024, 512);
+
+               FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
openStreams, 1024, 512);
+
+               FsCheckpointStateOutputStream stream3 = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
openStreams, 1024, 512);
+
+               assertFalse(stream1.isClosed());
+               assertFalse(stream2.isClosed());
+               assertFalse(stream3.isClosed());
+
+               // simple close
+               stream1.close();
+
+               // close with handle
+               StreamStateHandle handle = stream2.closeAndGetHandle();
+
+               // close with path
+               Path path = stream3.closeAndGetPath();
+
+               assertTrue(stream1.isClosed());
+               assertTrue(stream2.isClosed());
+               assertTrue(stream3.isClosed());
+
+               validateStreamNotWritable(stream1);
+               validateStreamNotWritable(stream2);
+               validateStreamNotWritable(stream3);
+
+               // clean up
+               handle.discardState();
+               FileSystem.getLocalFileSystem().delete(path, false);
+       }
+
+       @Test
+       public void testAddAndRemoveFromOpenStreamsSet() throws Exception {
+               final HashSet<FsCheckpointStateOutputStream> openStreams = new 
HashSet<>();
+
+               FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
openStreams, 1024, 512);
+
+               FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
openStreams, 1024, 512);
+
+               FsCheckpointStateOutputStream stream3 = new 
FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
openStreams, 1024, 512);
+
+               assertTrue(openStreams.contains(stream1));
+               assertTrue(openStreams.contains(stream2));
+               assertTrue(openStreams.contains(stream3));
+               assertEquals(3, openStreams.size());
+
+               // simple close
+               stream1.close();
+
+               // close with handle
+               StreamStateHandle handle = stream2.closeAndGetHandle();
+
+               // close with path
+               Path path = stream3.closeAndGetPath();
+
+               assertFalse(openStreams.contains(stream1));
+               assertFalse(openStreams.contains(stream2));
+               assertFalse(openStreams.contains(stream3));
+               assertEquals(0, openStreams.size());
+
+               // clean up
+               handle.discardState();
+               FileSystem.getLocalFileSystem().delete(path, false);
+       }
+
        @Test
        public void testStateBlowMemThreshold() throws Exception {
                runTest(222, 999, 512, false);
@@ -72,16 +168,17 @@ public class FsCheckpointStateOutputStreamTest {
        public void testStateAboveMemThreshold() throws Exception {
                runTest(576446, 259, 17, true);
        }
-       
+
        @Test
        public void testZeroThreshold() throws Exception {
                runTest(16678, 4096, 0, true);
        }
-       
+
        private void runTest(int numBytes, int bufferSize, int threshold, 
boolean expectFile) throws Exception {
                AbstractStateBackend.CheckpointStateOutputStream stream =
                        new FsStateBackend.FsCheckpointStateOutputStream(
-                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
bufferSize, threshold);
+                                       TEMP_DIR_PATH, 
FileSystem.getLocalFileSystem(),
+                                       new 
HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
                
                Random rnd = new Random();
                byte[] original = new byte[numBytes];
@@ -125,4 +222,19 @@ public class FsCheckpointStateOutputStreamTest {
                
                handle.discardState();
        }
+
+       private void validateStreamNotWritable(FsCheckpointStateOutputStream 
stream) {
+               try {
+                       stream.write(1);
+                       fail();
+               } catch (IOException e) {
+                       // expected
+               }
+               try {
+                       stream.write(new byte[4], 1, 2);
+                       fail();
+               } catch (IOException e) {
+                       // expected
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
new file mode 100644
index 0000000..6df488d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import 
org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FsStateBackendClosingTest {
+
+       @Test
+       public void testStateBackendClosesStreams() throws Exception {
+               final URI tempFolder = new 
File(EnvironmentInformation.getTemporaryFileDirectory()).toURI();
+               final FsStateBackend backend = new FsStateBackend(tempFolder);
+
+               backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
+
+               FsCheckpointStateOutputStream stream = 
backend.createCheckpointStateOutputStream(17L, 12345L);
+
+               // stream is open, this should succeed
+               assertFalse(stream.isClosed());
+               stream.write(1);
+
+               // close the backend - that should close the stream
+               backend.close();
+
+               assertTrue(stream.isClosed());
+
+               try {
+                       stream.write(2);
+                       fail("stream is closed, 'write(int)' should fail with 
an exception");
+               }
+               catch (IOException e) {
+                       // expected
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aaaead0..99df060 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,7 +772,14 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        }
                        }
                }
+
                stateBackend.initializeForJob(getEnvironment(), 
operatorIdentifier, keySerializer);
+
+               // make sure the state backend is closed eagerly in case of 
cancellation
+               synchronized (cancelables) {
+                       cancelables.add(stateBackend);
+               }
+
                return stateBackend;
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..ed993c7
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import 
org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This test checks that task checkpoints that block and do not react to 
thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+       private static final OneShotLatch IN_CHECKPOINT_LATCH = new 
OneShotLatch();
+
+       @Test
+       public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+               Configuration taskConfig = new Configuration();
+               StreamConfig cfg = new StreamConfig(taskConfig);
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+               cfg.setStreamOperator(new TestOperator());
+               cfg.setStateBackend(new LockingStreamStateBackend());
+
+               Task task = createTask(taskConfig);
+
+               // start the task and wait until it is in "restore"
+               task.startTaskThread();
+               IN_CHECKPOINT_LATCH.await();
+
+               // cancel the task and wait. unless cancellation properly closes
+               // the streams, this will never terminate
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+               assertNull(task.getFailureCause());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static Task createTask(Configuration taskConfig) throws 
IOException {
+
+               JobInformation jobInformation = new JobInformation(
+                       new JobID(),
+                       "test job name",
+                       new SerializedValue<>(new ExecutionConfig()),
+                       new Configuration(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList());
+
+               TaskInformation taskInformation = new TaskInformation(
+                       new JobVertexID(),
+                       "test task name",
+                       1,
+                       TestStreamTask.class.getName(),
+                       taskConfig);
+
+               return new Task(
+                       jobInformation,
+                       taskInformation,
+                       new ExecutionAttemptID(),
+                       0,
+                       0,
+                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                       Collections.<InputGateDeploymentDescriptor>emptyList(),
+                       0,
+                       null,
+                       mock(MemoryManager.class),
+                       mock(IOManager.class),
+                       mock(NetworkEnvironment.class),
+                       mock(BroadcastVariableManager.class),
+                       mock(ActorGateway.class),
+                       mock(ActorGateway.class),
+                       new FiniteDuration(10, TimeUnit.SECONDS),
+                       new FallbackLibraryCacheManager(),
+                       new FileCache(new Configuration()),
+                       new TaskManagerRuntimeInfo(
+                                       "localhost", new Configuration(), 
EnvironmentInformation.getTemporaryFileDirectory()),
+                       new UnregisteredTaskMetricsGroup());
+               
+       }
+
+       // 
------------------------------------------------------------------------
+       //  state backend with locking output stream
+       // 
------------------------------------------------------------------------
+
+       private static class LockingStreamStateBackend extends 
AbstractStateBackend {
+
+               private static final long serialVersionUID = 1L;
+
+               private final LockingOutputStream out = new 
LockingOutputStream();
+
+               @Override
+               public void disposeAllStateForCurrentJob() {}
+
+               @Override
+               public void close() throws IOException {
+                       out.close();
+               }
+
+               @Override
+               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+                       return out;
+               }
+
+               @Override
+               protected <N, T> ValueState<T> 
createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> 
stateDesc) throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               protected <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               protected <N, T> ReducingState<T> 
createReducingState(TypeSerializer<N> namespaceSerializer, 
ReducingStateDescriptor<T> stateDesc) throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(S state, long checkpointID, long timestamp) throws 
Exception {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       private static final class LockingOutputStream extends 
CheckpointStateOutputStream implements Serializable {
+               private static final long serialVersionUID = 1L;
+               
+               private final SerializableObject lock = new 
SerializableObject();
+               private volatile boolean closed;
+
+               @Override
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void write(int b) throws IOException {
+                       // this needs to not react to interrupts until the 
handle is closed
+                       synchronized (lock) {
+                               while (!closed) {
+                                       try {
+                                               lock.wait();
+                                       }
+                                       catch (InterruptedException ignored) {}
+                               }
+                       }
+               }
+
+               @Override
+               public void close() throws IOException {
+                       synchronized (lock) {
+                               closed = true;
+                               lock.notifyAll();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test source operator that calls into the locking checkpoint output 
stream
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class TestOperator extends StreamFilter<Object> {
+               private static final long serialVersionUID = 1L;
+
+               public TestOperator() {
+                       super(new FilterFunction<Object>() {
+                               @Override
+                               public boolean filter(Object value) {
+                                       return false;
+                               }
+                       });
+               }
+
+               @Override
+               public StreamTaskState snapshotOperatorState(long checkpointId, 
long timestamp) throws Exception {
+                       AbstractStateBackend stateBackend = getStateBackend();
+                       CheckpointStateOutputStream outStream = 
stateBackend.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+                       IN_CHECKPOINT_LATCH.trigger();
+
+                       // this should lock
+                       outStream.write(1);
+
+                       // this should be unreachable
+                       return null;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  stream task that simply triggers a checkpoint
+       // 
------------------------------------------------------------------------
+
+       public static final class TestStreamTask extends 
OneInputStreamTask<Object, Object> {
+
+               @Override
+               public void init() {}
+
+               @Override
+               protected void run() throws Exception {
+                       triggerCheckpointOnBarrier(11L, 
System.currentTimeMillis());
+               }
+
+               @Override
+               protected void cleanup() {}
+
+               @Override
+               protected void cancelTask() {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..4eb8b4a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -104,7 +104,7 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                public void disposeAllStateForCurrentJob() throws Exception {}
 
                @Override
-               public void close() throws Exception {}
+               public void close() {}
 
                @Override
                protected <N, T> ValueState<T> 
createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> 
stateDesc) throws Exception {

Reply via email to