Repository: flink
Updated Branches:
  refs/heads/master c590912c9 -> 4c23879a5


[FLINK-4910] Introduce safety net for closing file system streams

This closes #2691.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba8ed263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba8ed263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba8ed263

Branch: refs/heads/master
Commit: ba8ed263695d16eacb4bdfdf195dd22c83bb53ed
Parents: c590912
Author: Stefan Richter <[email protected]>
Authored: Mon Oct 24 17:49:54 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Nov 22 23:16:52 2016 +0100

----------------------------------------------------------------------
 .../common/operators/CollectionExecutor.java    |  31 +--
 .../apache/flink/core/fs/CloseableRegistry.java |  52 +++++
 .../flink/core/fs/ClosingFSDataInputStream.java |  97 ++++++++++
 .../core/fs/ClosingFSDataOutputStream.java      | 102 ++++++++++
 .../flink/core/fs/FSDataInputStreamWrapper.java |  96 +++++++++
 .../core/fs/FSDataOutputStreamWrapper.java      |  76 ++++++++
 .../org/apache/flink/core/fs/FileSystem.java    |  88 ++++++---
 .../core/fs/SafetyNetCloseableRegistry.java     | 181 +++++++++++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     | 150 ++++++++++++++
 .../flink/core/fs/WrappingProxyCloseable.java   |  30 +++
 .../flink/util/AbstractCloseableRegistry.java   | 114 +++++++++++
 .../java/org/apache/flink/util/IOUtils.java     |  15 +-
 .../org/apache/flink/util/WrappingProxy.java    |  25 +++
 .../apache/flink/util/WrappingProxyUtil.java    |  33 ++++
 .../apache/flink/core/fs/FileSystemTest.java    |  29 +--
 .../core/fs/SafetyNetCloseableRegistryTest.java | 193 +++++++++++++++++++
 .../flink/runtime/filecache/FileCache.java      |  42 ++--
 .../state/AbstractKeyedStateBackend.java        |   5 +-
 .../flink/runtime/state/ClosableRegistry.java   | 108 -----------
 .../state/DefaultOperatorStateBackend.java      |   5 +-
 .../state/StateInitializationContextImpl.java   |  15 +-
 .../StateSnapshotContextSynchronousImpl.java    |   5 +-
 .../state/filesystem/FileStateHandle.java       |  18 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   5 +
 .../streaming/runtime/tasks/StreamTask.java     |   6 +-
 .../StateInitializationContextImplTest.java     |   6 +-
 ...StateSnapshotContextSynchronousImplTest.java |   4 +-
 .../util/AbstractStreamOperatorTestHarness.java |  10 +-
 .../test/checkpointing/RescalingITCase.java     |   1 +
 29 files changed, 1324 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index a6fc17e..07f48fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -18,20 +18,6 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -58,6 +44,7 @@ import 
org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.MetricGroup;
@@ -65,6 +52,20 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * Execution utility for serial, local, collection-based executions of Flink 
programs.
  */
@@ -571,7 +572,7 @@ public class CollectionExecutor {
 
                public CompletedFuture(Path entry) {
                        try{
-                               LocalFileSystem fs = (LocalFileSystem) 
entry.getFileSystem();
+                               LocalFileSystem fs = (LocalFileSystem) 
FileSystem.getUnguardedFileSystem(entry.toUri());
                                result = entry.isAbsolute() ? new 
Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
                        } catch (Exception e){
                                throw new RuntimeException("DistributedCache 
supports only local files for Collection Environments");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
new file mode 100644
index 0000000..81ba7ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class allows to register instances of {@link Closeable}, which are all 
closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close 
the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, 
Object> {
+
+       private static final Object DUMMY = new Object();
+
+       public CloseableRegistry() {
+               super(new HashMap<Closeable, Object>());
+       }
+
+       @Override
+       protected void doRegister(Closeable closeable, Map<Closeable, Object> 
closeableMap) throws IOException {
+               closeableMap.put(closeable, DUMMY);
+       }
+
+       @Override
+       protected void doUnRegister(Closeable closeable, Map<Closeable, Object> 
closeableMap) {
+               closeableMap.remove(closeable);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
new file mode 100644
index 0000000..23ac4f2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link 
FSDataInputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is 
utilized.
+ */
+public class ClosingFSDataInputStream
+               extends FSDataInputStreamWrapper
+               implements WrappingProxyCloseable<FSDataInputStream> {
+
+       private final SafetyNetCloseableRegistry registry;
+       private final String debugInfo;
+
+       private volatile boolean closed;
+
+       private ClosingFSDataInputStream(
+                       FSDataInputStream delegate, SafetyNetCloseableRegistry 
registry, String debugInfo) throws IOException {
+               super(delegate);
+               this.registry = Preconditions.checkNotNull(registry);
+               this.debugInfo = Preconditions.checkNotNull(debugInfo);
+               this.closed = false;
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (!closed) {
+                       closed = true;
+                       registry.unregisterClosable(this);
+                       inputStream.close();
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return inputStream.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+
+               if (this == obj) {
+                       return true;
+               }
+
+               if (obj instanceof ClosingFSDataInputStream) {
+                       return inputStream.equals(((ClosingFSDataInputStream) 
obj).inputStream);
+               }
+
+               return false;
+       }
+
+       @Override
+       public String toString() {
+               return "ClosingFSDataInputStream(" + inputStream.toString() + 
") : " + debugInfo;
+       }
+
+       public static ClosingFSDataInputStream wrapSafe(
+                       FSDataInputStream delegate, SafetyNetCloseableRegistry 
registry) throws IOException{
+               return wrapSafe(delegate, registry, "");
+       }
+
+       public static ClosingFSDataInputStream wrapSafe(
+                       FSDataInputStream delegate, SafetyNetCloseableRegistry 
registry, String debugInfo) throws IOException{
+
+               ClosingFSDataInputStream inputStream = new 
ClosingFSDataInputStream(delegate, registry, debugInfo);
+               registry.registerClosable(inputStream);
+               return inputStream;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
new file mode 100644
index 0000000..120ca67
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link 
FSDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is 
utilized.
+ */
+public class ClosingFSDataOutputStream
+               extends FSDataOutputStreamWrapper
+               implements WrappingProxyCloseable<FSDataOutputStream> {
+
+       private final SafetyNetCloseableRegistry registry;
+       private final String debugString;
+
+       private volatile boolean closed;
+
+       public ClosingFSDataOutputStream(
+                       FSDataOutputStream delegate, SafetyNetCloseableRegistry 
registry) throws IOException {
+               this(delegate, registry, "");
+       }
+
+       private ClosingFSDataOutputStream(
+                       FSDataOutputStream delegate, SafetyNetCloseableRegistry 
registry, String debugString) throws IOException {
+               super(delegate);
+               this.registry = Preconditions.checkNotNull(registry);
+               this.debugString = Preconditions.checkNotNull(debugString);
+               this.closed = false;
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (!closed) {
+                       closed = true;
+                       registry.unregisterClosable(this);
+                       outputStream.close();
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return outputStream.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+
+               if (this == obj) {
+                       return true;
+               }
+
+               if (obj instanceof ClosingFSDataOutputStream) {
+                       return outputStream.equals(((ClosingFSDataOutputStream) 
obj).outputStream);
+               }
+
+               return false;
+       }
+
+       @Override
+       public String toString() {
+               return "ClosingFSDataOutputStream(" + outputStream.toString() + 
") : " + debugString;
+       }
+
+       public static ClosingFSDataOutputStream wrapSafe(
+                       FSDataOutputStream delegate, SafetyNetCloseableRegistry 
registry) throws IOException {
+               return wrapSafe(delegate, registry, "");
+       }
+
+       public static ClosingFSDataOutputStream wrapSafe(
+                       FSDataOutputStream delegate, SafetyNetCloseableRegistry 
registry, String debugInfo) throws IOException {
+
+               ClosingFSDataOutputStream inputStream = new 
ClosingFSDataOutputStream(delegate, registry, debugInfo);
+               registry.registerClosable(inputStream);
+               return inputStream;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..507b756
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataInputStreamWrapper extends FSDataInputStream implements 
WrappingProxy<FSDataInputStream> {
+
+       protected final FSDataInputStream inputStream;
+
+       public FSDataInputStreamWrapper(FSDataInputStream inputStream) {
+               this.inputStream = Preconditions.checkNotNull(inputStream);
+       }
+
+       @Override
+       public void seek(long desired) throws IOException {
+               inputStream.seek(desired);
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return inputStream.getPos();
+       }
+
+       @Override
+       public int read() throws IOException {
+               return inputStream.read();
+       }
+
+       @Override
+       public int read(byte[] b) throws IOException {
+               return inputStream.read(b);
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               return inputStream.read(b, off, len);
+       }
+
+       @Override
+       public long skip(long n) throws IOException {
+               return inputStream.skip(n);
+       }
+
+       @Override
+       public int available() throws IOException {
+               return inputStream.available();
+       }
+
+       @Override
+       public void close() throws IOException {
+               inputStream.close();
+       }
+
+       @Override
+       public void mark(int readlimit) {
+               inputStream.mark(readlimit);
+       }
+
+       @Override
+       public void reset() throws IOException {
+               inputStream.reset();
+       }
+
+       @Override
+       public boolean markSupported() {
+               return inputStream.markSupported();
+       }
+
+       @Override
+       public FSDataInputStream getWrappedDelegate() {
+               return inputStream;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
new file mode 100644
index 0000000..36ebe10
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataOutputStreamWrapper extends FSDataOutputStream implements 
WrappingProxy<FSDataOutputStream> {
+
+       protected final FSDataOutputStream outputStream;
+
+       public FSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+               this.outputStream = Preconditions.checkNotNull(outputStream);
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return outputStream.getPos();
+       }
+
+       @Override
+       public void flush() throws IOException {
+               outputStream.flush();
+       }
+
+       @Override
+       public void sync() throws IOException {
+               outputStream.sync();
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+               outputStream.write(b);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               outputStream.write(b);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               outputStream.write(b, off, len);
+       }
+
+       @Override
+       public void close() throws IOException {
+               outputStream.close();
+       }
+
+       @Override
+       public FSDataOutputStream getWrappedDelegate() {
+               return outputStream;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1844d64..5a608b5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -25,6 +25,14 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -34,11 +42,6 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-
 /**
  * An abstract base class for a fairly generic file system. It
  * may be implemented as a distributed file system, or as a local
@@ -47,6 +50,8 @@ import org.apache.flink.util.OperatingSystem;
 @Public
 public abstract class FileSystem {
 
+       private static final InheritableThreadLocal<SafetyNetCloseableRegistry> 
REGISTRIES = new InheritableThreadLocal<>();
+
        private static final String LOCAL_FILESYSTEM_CLASS = 
"org.apache.flink.core.fs.local.LocalFileSystem";
 
        private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -55,6 +60,39 @@ public abstract class FileSystem {
 
        private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileSystem.class);
+
+       /**
+        * Create a SafetyNetCloseableRegistry for a Task. This method should 
be called at the beginning of the task's
+        * main thread.
+        */
+       public static void createFileSystemCloseableRegistryForTask() {
+               SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+               if (null != oldRegistry) {
+                       IOUtils.closeQuietly(oldRegistry);
+                       LOG.warn("Found existing SafetyNetCloseableRegistry. 
Closed and replaced it.");
+               }
+               SafetyNetCloseableRegistry newRegistry = new 
SafetyNetCloseableRegistry();
+               REGISTRIES.set(newRegistry);
+       }
+
+       /**
+        * Create a SafetyNetCloseableRegistry for a Task. This method should 
be called at the end of the task's
+        * main thread or when the task should be canceled.
+        */
+       public static void disposeFileSystemCloseableRegistryForTask() {
+               SafetyNetCloseableRegistry registry = REGISTRIES.get();
+               if (null != registry) {
+                       LOG.info("Ensuring all FileSystem streams are closed");
+                       REGISTRIES.remove();
+                       IOUtils.closeQuietly(registry);
+               }
+       }
+
+       private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+               SafetyNetCloseableRegistry reg = REGISTRIES.get();
+               return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : 
fs;
+       }
 
        /** Object used to protect calls to specific methods.*/
        private static final Object SYNCHRONIZATION_OBJECT = new Object();
@@ -63,7 +101,7 @@ public abstract class FileSystem {
         * Enumeration for write modes.
         *
         */
-       public static enum WriteMode {
+       public enum WriteMode {
 
                /** Creates write path if it does not exist. Does not overwrite 
existing files and directories. */
                NO_OVERWRITE,
@@ -214,18 +252,7 @@ public abstract class FileSystem {
                }
        }
 
-       /**
-        * Returns a reference to the {@link FileSystem} instance for accessing 
the
-        * file system identified by the given {@link URI}.
-        *
-        * @param uri
-        *        the {@link URI} identifying the file system
-        * @return a reference to the {@link FileSystem} instance for accessing 
the file system identified by the given
-        *         {@link URI}.
-        * @throws IOException
-        *         thrown if a reference to the file system instance could not 
be obtained
-        */
-       public static FileSystem get(URI uri) throws IOException {
+       public static FileSystem getUnguardedFileSystem(URI uri) throws 
IOException {
                FileSystem fs;
 
                URI asked = uri;
@@ -238,13 +265,13 @@ public abstract class FileSystem {
                                        }
 
                                        uri = new 
URI(defaultScheme.getScheme(), null, defaultScheme.getHost(),
-                                               defaultScheme.getPort(), 
uri.getPath(), null, null);
+                                                       
defaultScheme.getPort(), uri.getPath(), null, null);
 
                                } catch (URISyntaxException e) {
                                        try {
                                                if 
(defaultScheme.getScheme().equals("file")) {
                                                        uri = new URI("file", 
null,
-                                                               new Path(new 
File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
+                                                                       new 
Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
                                                }
                                        } catch (URISyntaxException ex) {
                                                // we tried to repair it, but 
could not. report the scheme error
@@ -255,8 +282,8 @@ public abstract class FileSystem {
 
                        if(uri.getScheme() == null) {
                                throw new IOException("The URI '" + uri + "' is 
invalid.\n" +
-                                       "The fs.default-scheme = " + 
defaultScheme + ", the requested URI = " + asked +
-                                       ", and the final URI = " + uri + ".");
+                                               "The fs.default-scheme = " + 
defaultScheme + ", the requested URI = " + asked +
+                                               ", and the final URI = " + uri 
+ ".");
                        }
 
                        if (uri.getScheme().equals("file") && 
uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
@@ -294,7 +321,7 @@ public abstract class FileSystem {
                                } else {
                                        // we can not read from this file 
system.
                                        throw new IOException("No file system 
found with scheme " + uri.getScheme()
-                                               + ", referenced in file URI '" 
+ uri.toString() + "'.");
+                                                       + ", referenced in file 
URI '" + uri.toString() + "'.");
                                }
                        } else {
                                // we end up here if we have a file system with 
build-in flink support.
@@ -316,6 +343,21 @@ public abstract class FileSystem {
        }
 
        /**
+        * Returns a reference to the {@link FileSystem} instance for accessing 
the
+        * file system identified by the given {@link URI}.
+        *
+        * @param uri
+        *        the {@link URI} identifying the file system
+        * @return a reference to the {@link FileSystem} instance for accessing 
the file system identified by the given
+        *         {@link URI}.
+        * @throws IOException
+        *         thrown if a reference to the file system instance could not 
be obtained
+        */
+       public static FileSystem get(URI uri) throws IOException {
+               return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+       }
+
+       /**
         * Returns a boolean indicating whether a scheme has built-in Flink 
support.
         *
         * @param scheme

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
new file mode 100644
index 0000000..de4fb30
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This implementation of an {@link AbstractCloseableRegistry} registers 
{@link WrappingProxyCloseable}. When
+ * the proxy becomes subject to GC, this registry takes care of closing 
unclosed {@link Closeable}s.
+ * <p>
+ * Phantom references are used to track when {@link 
org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
+ * GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to 
avoid resource leaks.
+ * <p>
+ * Other than that, it works like a normal {@link CloseableRegistry}.
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class SafetyNetCloseableRegistry extends
+               AbstractCloseableRegistry<WrappingProxyCloseable<? extends 
Closeable>,
+                               
SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
+       private final ReferenceQueue<WrappingProxyCloseable<? extends 
Closeable>> referenceQueue;
+       private final Thread reaperThread;
+
+       public SafetyNetCloseableRegistry() {
+               super(new IdentityHashMap<Closeable, 
PhantomDelegatingCloseableRef>());
+               this.referenceQueue = new ReferenceQueue<>();
+               this.reaperThread = new CloseableReaperThread();
+               reaperThread.start();
+       }
+
+       @Override
+       protected void doRegister(
+                       WrappingProxyCloseable<? extends Closeable> 
wrappingProxyCloseable,
+                       Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) throws IOException {
+
+               Closeable innerCloseable = 
WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+
+               if (null == innerCloseable) {
+                       return;
+               }
+
+               PhantomDelegatingCloseableRef phantomRef =
+                               new 
PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+
+               closeableMap.put(innerCloseable, phantomRef);
+       }
+
+       @Override
+       protected void doUnRegister(
+                       WrappingProxyCloseable<? extends Closeable> closeable,
+                       Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) {
+
+               Closeable innerCloseable = 
WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+
+               if (null == innerCloseable) {
+                       return;
+               }
+
+               closeableMap.remove(innerCloseable);
+       }
+
+       /**
+        * Phantom reference to {@link WrappingProxyCloseable}.
+        */
+       static final class PhantomDelegatingCloseableRef
+                       extends PhantomReference<WrappingProxyCloseable<? 
extends Closeable>>
+                       implements Closeable {
+
+               private final Closeable innerCloseable;
+               private final String debugString;
+
+               public PhantomDelegatingCloseableRef(
+                               WrappingProxyCloseable<? extends Closeable> 
referent,
+                               ReferenceQueue<? super WrappingProxyCloseable<? 
extends Closeable>> q) {
+
+                       super(referent, q);
+                       this.innerCloseable = 
Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
+                       this.debugString = referent.toString();
+               }
+
+               public Closeable getInnerCloseable() {
+                       return innerCloseable;
+               }
+
+               public String getDebugString() {
+                       return debugString;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       innerCloseable.close();
+               }
+       }
+
+       /**
+        * Reaper runnable collects and closes leaking resources
+        */
+       final class CloseableReaperThread extends Thread {
+
+               public CloseableReaperThread() {
+                       super("CloseableReaperThread");
+                       this.running = false;
+               }
+
+               private volatile boolean running;
+
+               @Override
+               public void run() {
+                       this.running = true;
+                       try {
+                               List<PhantomDelegatingCloseableRef> 
closeableList = new LinkedList<>();
+                               while (running) {
+                                       PhantomDelegatingCloseableRef oldRef = 
(PhantomDelegatingCloseableRef) referenceQueue.remove();
+                                       synchronized (getSynchronizationLock()) 
{
+                                               do {
+                                                       
closeableList.add(oldRef);
+                                                       
closeableToRef.remove(oldRef.getInnerCloseable());
+                                               }
+                                               while ((oldRef = 
(PhantomDelegatingCloseableRef) referenceQueue.poll()) != null);
+                                       }
+
+                                       // close outside the synchronized block 
in case this is blocking
+                                       for (PhantomDelegatingCloseableRef 
closeableRef : closeableList) {
+                                               
IOUtils.closeQuietly(closeableRef);
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Closing 
unclosed resource: " + closeableRef.getDebugString());
+                                               }
+                                       }
+
+                                       closeableList.clear();
+                               }
+                       } catch (InterruptedException e) {
+                               // done
+                       }
+               }
+
+               @Override
+               public void interrupt() {
+                       this.running = false;
+                       super.interrupt();
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               super.close();
+               reaperThread.interrupt();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
new file mode 100644
index 0000000..bf30b4f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps 
all opened streams as
+ * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and 
(ii) registers them to
+ * a {@link SafetyNetCloseableRegistry}.
+ *
+ * Streams obtained by this are therefore managed by the {@link 
SafetyNetCloseableRegistry} to prevent resource leaks
+ * from unclosed streams.
+ */
+public class SafetyNetWrapperFileSystem extends FileSystem implements 
WrappingProxy<FileSystem> {
+
+       private final SafetyNetCloseableRegistry registry;
+       private final FileSystem unsafeFileSystem;
+
+       public SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, 
SafetyNetCloseableRegistry registry) {
+               this.registry = Preconditions.checkNotNull(registry);
+               this.unsafeFileSystem = 
Preconditions.checkNotNull(unsafeFileSystem);
+       }
+
+       @Override
+       public Path getWorkingDirectory() {
+               return unsafeFileSystem.getWorkingDirectory();
+       }
+
+       @Override
+       public Path getHomeDirectory() {
+               return unsafeFileSystem.getHomeDirectory();
+       }
+
+       @Override
+       public URI getUri() {
+               return unsafeFileSystem.getUri();
+       }
+
+       @Override
+       public void initialize(URI name) throws IOException {
+               unsafeFileSystem.initialize(name);
+       }
+
+       @Override
+       public FileStatus getFileStatus(Path f) throws IOException {
+               return unsafeFileSystem.getFileStatus(f);
+       }
+
+       @Override
+       public BlockLocation[] getFileBlockLocations(FileStatus file, long 
start, long len) throws IOException {
+               return unsafeFileSystem.getFileBlockLocations(file, start, len);
+       }
+
+       @Override
+       public FSDataInputStream open(Path f, int bufferSize) throws 
IOException {
+               FSDataInputStream innerStream = unsafeFileSystem.open(f, 
bufferSize);
+               return ClosingFSDataInputStream.wrapSafe(innerStream, registry, 
String.valueOf(f));
+       }
+
+       @Override
+       public FSDataInputStream open(Path f) throws IOException {
+               FSDataInputStream innerStream = unsafeFileSystem.open(f);
+               return ClosingFSDataInputStream.wrapSafe(innerStream, registry, 
String.valueOf(f));
+       }
+
+       @Override
+       public long getDefaultBlockSize() {
+               return unsafeFileSystem.getDefaultBlockSize();
+       }
+
+       @Override
+       public FileStatus[] listStatus(Path f) throws IOException {
+               return unsafeFileSystem.listStatus(f);
+       }
+
+       @Override
+       public boolean exists(Path f) throws IOException {
+               return unsafeFileSystem.exists(f);
+       }
+
+       @Override
+       public boolean delete(Path f, boolean recursive) throws IOException {
+               return unsafeFileSystem.delete(f, recursive);
+       }
+
+       @Override
+       public boolean mkdirs(Path f) throws IOException {
+               return unsafeFileSystem.mkdirs(f);
+       }
+
+       @Override
+       public FSDataOutputStream create(Path f, boolean overwrite, int 
bufferSize, short replication, long blockSize)
+                       throws IOException {
+
+               FSDataOutputStream innerStream = unsafeFileSystem.create(f, 
overwrite, bufferSize, replication, blockSize);
+               return ClosingFSDataOutputStream.wrapSafe(innerStream, 
registry, String.valueOf(f));
+       }
+
+       @Override
+       public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
+               FSDataOutputStream innerStream = unsafeFileSystem.create(f, 
overwrite);
+               return ClosingFSDataOutputStream.wrapSafe(innerStream, 
registry, String.valueOf(f));
+       }
+
+       @Override
+       public boolean rename(Path src, Path dst) throws IOException {
+               return unsafeFileSystem.rename(src, dst);
+       }
+
+       @Override
+       public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, 
boolean createDirectory) throws IOException {
+               return unsafeFileSystem.initOutPathLocalFS(outPath, writeMode, 
createDirectory);
+       }
+
+       @Override
+       public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, 
boolean createDirectory) throws IOException {
+               return unsafeFileSystem.initOutPathDistFS(outPath, writeMode, 
createDirectory);
+       }
+
+       @Override
+       public boolean isDistributedFS() {
+               return unsafeFileSystem.isDistributedFS();
+       }
+
+       @Override
+       public FileSystem getWrappedDelegate() {
+               return unsafeFileSystem;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
new file mode 100644
index 0000000..b74fc78
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.Closeable;
+
+/**
+ * {@link WrappingProxy} for {@link Closeable} that is also closeable.
+ */
+public interface WrappingProxyCloseable<T extends Closeable> extends 
Closeable, WrappingProxy<T> {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
new file mode 100644
index 0000000..7c0291c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This is the abstract base class for registries that allow to register 
instances of {@link Closeable}, which are all
+ * closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close 
the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ *
+ * @param <C> Type of the closeable this registers
+ * @param <T> Type for potential meta data associated with the registering 
closeables
+ */
+public abstract class AbstractCloseableRegistry<C extends Closeable, T> 
implements Closeable {
+
+       protected final Map<Closeable, T> closeableToRef;
+       private boolean closed;
+
+       public AbstractCloseableRegistry(Map<Closeable, T> closeableToRef) {
+               this.closeableToRef = closeableToRef;
+               this.closed = false;
+       }
+
+       /**
+        * Registers a {@link Closeable} with the registry. In case the 
registry is already closed, this method throws an
+        * {@link IllegalStateException} and closes the passed {@link 
Closeable}.
+        *
+        * @param closeable Closeable tor register
+        * @return true if the the Closeable was newly added to the registry
+        * @throws IOException exception when the registry was closed before
+        */
+       public final void registerClosable(C closeable) throws IOException {
+
+               if (null == closeable) {
+                       return;
+               }
+
+               synchronized (getSynchronizationLock()) {
+                       if (closed) {
+                               IOUtils.closeQuietly(closeable);
+                               throw new IOException("Cannot register 
Closeable, registry is already closed. Closing argument.");
+                       }
+
+                       doRegister(closeable, closeableToRef);
+               }
+       }
+
+       /**
+        * Removes a {@link Closeable} from the registry.
+        *
+        * @param closeable instance to remove from the registry.
+        * @return true, if the instance was actually registered and now removed
+        */
+       public final void unregisterClosable(C closeable) {
+
+               if (null == closeable) {
+                       return;
+               }
+
+               synchronized (getSynchronizationLock()) {
+                       doUnRegister(closeable, closeableToRef);
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               synchronized (getSynchronizationLock()) {
+
+                       for (Closeable closeable : closeableToRef.keySet()) {
+                               IOUtils.closeQuietly(closeable);
+                       }
+
+                       closeableToRef.clear();
+
+                       closed = true;
+               }
+       }
+
+       public boolean isClosed() {
+               synchronized (getSynchronizationLock()) {
+                       return closed;
+               }
+       }
+
+       protected final Object getSynchronizationLock() {
+               return closeableToRef;
+       }
+
+       protected abstract void doUnRegister(C closeable, Map<Closeable, T> 
closeableMap);
+
+       protected abstract void doRegister(C closeable, Map<Closeable, T> 
closeableMap) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 12d70ce..9810271 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.util;
 
+import org.slf4j.Logger;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.Socket;
 
-import org.slf4j.Logger;
-
 /**
  * An utility class for I/O related functionality.
  * 
@@ -213,6 +214,16 @@ public final class IOUtils {
                        }
                }
        }
+
+       public static void closeQuietly(Closeable closeable) {
+               try {
+                       if (closeable != null) {
+                               closeable.close();
+                       }
+               } catch (IOException ignored) {
+
+               }
+       }
        
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java 
b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
new file mode 100644
index 0000000..82fcf04
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public interface WrappingProxy<T> {
+
+       T getWrappedDelegate();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
new file mode 100644
index 0000000..0f62abd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public final class WrappingProxyUtil {
+
+       private WrappingProxyUtil() {
+               throw new AssertionError();
+       }
+
+       public static <T> T stripProxy(T object) {
+               while (object instanceof WrappingProxy) {
+                       object = ((WrappingProxy<T>) 
object).getWrappedDelegate();
+               }
+               return object;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java 
b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 04ebc0e..1bde2fb 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -17,32 +17,37 @@
  */
 package org.apache.flink.core.fs;
 
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertTrue;
 
 public class FileSystemTest {
+
        @Test
        public void testGet() throws URISyntaxException, IOException {
                String scheme = "file";
-               
-               assertTrue(FileSystem.get(new URI(scheme + ":///test/test")) 
instanceof LocalFileSystem);
-               
+
+               assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new 
URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+
                try {
                        FileSystem.get(new URI(scheme + "://test/test"));
                } catch (IOException ioe) {
                        assertTrue(ioe.getMessage().startsWith("Found local 
file path with authority '"));
                }
 
-               assertTrue(FileSystem.get(new URI(scheme + ":/test/test")) 
instanceof LocalFileSystem);
-               
-               assertTrue(FileSystem.get(new URI(scheme + ":test/test")) 
instanceof LocalFileSystem);
+               assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new 
URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
 
-               assertTrue(FileSystem.get(new URI("/test/test")) instanceof 
LocalFileSystem);
-               
-               assertTrue(FileSystem.get(new URI("test/test")) instanceof 
LocalFileSystem);
+               assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new 
URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+
+               assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new 
URI("/test/test"))) instanceof LocalFileSystem);
+
+               assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new 
URI("test/test"))) instanceof LocalFileSystem);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
new file mode 100644
index 0000000..6628407
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SafetyNetCloseableRegistryTest {
+
+       private ProducerThread[] streamOpenThreads;
+       private SafetyNetCloseableRegistry closeableRegistry;
+       private AtomicInteger unclosedCounter;
+
+       @Before
+       public void setup() {
+               this.closeableRegistry = new SafetyNetCloseableRegistry();
+               this.unclosedCounter = new AtomicInteger(0);
+               this.streamOpenThreads = new ProducerThread[10];
+               for (int i = 0; i < streamOpenThreads.length; ++i) {
+                       streamOpenThreads[i] = new 
ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
+               }
+       }
+
+       private void startThreads(int maxStreams) {
+               for (ProducerThread t : streamOpenThreads) {
+                       t.setMaxStreams(maxStreams);
+                       t.start();
+               }
+       }
+
+       private void joinThreads() throws InterruptedException {
+               for (Thread t : streamOpenThreads) {
+                       t.join();
+               }
+       }
+
+       @Test
+       public void testClose() throws Exception {
+
+               startThreads(Integer.MAX_VALUE);
+
+               for (int i = 0; i < 5; ++i) {
+                       System.gc();
+                       Thread.sleep(40);
+               }
+
+               closeableRegistry.close();
+
+               joinThreads();
+
+               Assert.assertEquals(0, unclosedCounter.get());
+
+               try {
+
+                       WrappingProxyCloseable<Closeable> testCloseable = new 
WrappingProxyCloseable<Closeable>() {
+                               @Override
+                               public Closeable getWrappedDelegate() {
+                                       return this;
+                               }
+
+                               @Override
+                               public void close() throws IOException {
+                                       unclosedCounter.incrementAndGet();
+                               }
+                       };
+
+                       closeableRegistry.registerClosable(testCloseable);
+
+                       Assert.fail("Closed registry should not accept 
closeables!");
+
+               } catch (IOException expected) {
+                       //expected
+               }
+
+               Assert.assertEquals(1, unclosedCounter.get());
+       }
+
+       @Test
+       public void testSafetyNetClose() throws Exception {
+
+               startThreads(20);
+
+               joinThreads();
+
+               for (int i = 0; i < 5 && unclosedCounter.get() > 0; ++i) {
+                       System.gc();
+                       Thread.sleep(50);
+               }
+
+               Assert.assertEquals(0, unclosedCounter.get());
+               closeableRegistry.close();
+       }
+
+       private static final class ProducerThread extends Thread {
+
+               private final SafetyNetCloseableRegistry registry;
+               private final AtomicInteger refCount;
+               private int maxStreams;
+
+               public ProducerThread(SafetyNetCloseableRegistry registry, 
AtomicInteger refCount, int maxStreams) {
+                       this.registry = registry;
+                       this.refCount = refCount;
+                       this.maxStreams = maxStreams;
+               }
+
+               public int getMaxStreams() {
+                       return maxStreams;
+               }
+
+               public void setMaxStreams(int maxStreams) {
+                       this.maxStreams = maxStreams;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int count = 0;
+                               while (maxStreams > 0) {
+                                       String debug = 
Thread.currentThread().getName() + " " + count;
+                                       TestStream testStream = new 
TestStream(refCount);
+                                       refCount.incrementAndGet();
+                                       ClosingFSDataInputStream pis = 
ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference 
dies here
+
+                                       try {
+                                               Thread.sleep(2);
+                                       } catch (InterruptedException e) {
+
+                                       }
+
+                                       if (maxStreams != Integer.MAX_VALUE) {
+                                               --maxStreams;
+                                       }
+                                       ++count;
+                               }
+                       } catch (Exception ex) {
+
+                       }
+               }
+       }
+
+       private static final class TestStream extends FSDataInputStream {
+
+               private AtomicInteger refCount;
+
+               public TestStream(AtomicInteger refCount) {
+                       this.refCount = refCount;
+               }
+
+               @Override
+               public void seek(long desired) throws IOException {
+
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public int read() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       if (refCount != null) {
+                               refCount.decrementAndGet();
+                               refCount = null;
+                       }
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b5bdcaf..d79be05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,19 +18,8 @@
 
 package org.apache.flink.runtime.filecache;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
@@ -40,13 +29,23 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * The FileCache is used to create the local files for the registered cache 
files when a task is deployed.
  * The files will be removed when the task is unregistered after a 5 second 
delay.
@@ -236,8 +235,10 @@ public class FileCache {
        // 
------------------------------------------------------------------------
 
        public static void copy(Path sourcePath, Path targetPath, boolean 
executable) throws IOException {
-               FileSystem sFS = sourcePath.getFileSystem();
-               FileSystem tFS = targetPath.getFileSystem();
+               // TODO rewrite this to make it participate in the closable 
registry and the lifecycle of a task.
+               // we unwrap the file system to get raw streams without safety 
net
+               FileSystem sFS = 
FileSystem.getUnguardedFileSystem(sourcePath.toUri());
+               FileSystem tFS = 
FileSystem.getUnguardedFileSystem(targetPath.toUri());
                if (!tFS.exists(targetPath)) {
                        if (sFS.getFileStatus(sourcePath).isDir()) {
                                tFS.mkdirs(targetPath);
@@ -253,16 +254,11 @@ public class FileCache {
                                        copy(content.getPath(), new 
Path(localPath), executable);
                                }
                        } else {
-                               try {
-                                       FSDataOutputStream lfsOutput = 
tFS.create(targetPath, false);
-                                       FSDataInputStream fsInput = 
sFS.open(sourcePath);
+                               try (FSDataOutputStream lfsOutput = 
tFS.create(targetPath, false); FSDataInputStream fsInput = 
sFS.open(sourcePath)) {
                                        IOUtils.copyBytes(fsInput, lfsOutput);
                                        //noinspection ResultOfMethodCallIgnored
                                        new 
File(targetPath.toString()).setExecutable(executable);
-                                       // closing the FSDataOutputStream
-                                       lfsOutput.close();
-                               }
-                               catch (IOException ioe) {
+                               } catch (IOException ioe) {
                                        LOG.error("could not copy file to local 
file cache.", ioe);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e5d9b2b..ae71c7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.util.Preconditions;
 
@@ -80,7 +81,7 @@ public abstract class AbstractKeyedStateBackend<K>
        protected final TaskKvStateRegistry kvStateRegistry;
 
        /** Registry for all opened streams, so they can be closed if the task 
using this backend is closed */
-       protected ClosableRegistry cancelStreamRegistry;
+       protected CloseableRegistry cancelStreamRegistry;
 
        protected final ClassLoader userCodeClassLoader;
 
@@ -96,7 +97,7 @@ public abstract class AbstractKeyedStateBackend<K>
                this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-               this.cancelStreamRegistry = new ClosableRegistry();
+               this.cancelStreamRegistry = new CloseableRegistry();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
deleted file mode 100644
index b5f7dad..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.commons.io.IOUtils;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This class allows to register instances of {@link Closeable}, which are all 
closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close 
the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
- */
-public class ClosableRegistry implements Closeable {
-
-       private final Set<Closeable> registeredCloseables;
-       private boolean closed;
-
-       public ClosableRegistry() {
-               this.registeredCloseables = new HashSet<>();
-               this.closed = false;
-       }
-
-       /**
-        * Registers a {@link Closeable} with the registry. In case the 
registry is already closed, this method throws an
-        * {@link IllegalStateException} and closes the passed {@link 
Closeable}.
-        *
-        * @param closeable Closable tor register
-        * @return true if the the Closable was newly added to the registry
-        * @throws IOException exception when the registry was closed before
-        */
-       public boolean registerClosable(Closeable closeable) throws IOException 
{
-
-               if (null == closeable) {
-                       return false;
-               }
-
-               synchronized (getSynchronizationLock()) {
-                       if (closed) {
-                               IOUtils.closeQuietly(closeable);
-                               throw new IOException("Cannot register 
Closable, registry is already closed. Closed passed closable.");
-                       }
-
-                       return registeredCloseables.add(closeable);
-               }
-       }
-
-       /**
-        * Removes a {@link Closeable} from the registry.
-        *
-        * @param closeable instance to remove from the registry.
-        * @return true, if the instance was actually registered and now removed
-        */
-       public boolean unregisterClosable(Closeable closeable) {
-
-               if (null == closeable) {
-                       return false;
-               }
-
-               synchronized (getSynchronizationLock()) {
-                       return registeredCloseables.remove(closeable);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               synchronized (getSynchronizationLock()) {
-
-                       for (Closeable closeable : registeredCloseables) {
-                               IOUtils.closeQuietly(closeable);
-                       }
-
-                       registeredCloseables.clear();
-                       closed = true;
-               }
-       }
-
-       public boolean isClosed() {
-               synchronized (getSynchronizationLock()) {
-                       return closed;
-               }
-       }
-
-       private Object getSynchronizationLock() {
-               return registeredCloseables;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 2f5d3cb..5b47362 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -49,7 +50,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        
        private final Map<String, PartitionableListState<?>> registeredStates;
        private final Collection<OperatorStateHandle> restoreSnapshots;
-       private final ClosableRegistry closeStreamOnCancelRegistry;
+       private final CloseableRegistry closeStreamOnCancelRegistry;
        private final JavaSerializer<Serializable> javaSerializer;
 
        /**
@@ -65,7 +66,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                this.javaSerializer = new JavaSerializer<>(userClassLoader);
                this.restoreSnapshots = restoreSnapshots;
                this.registeredStates = new HashMap<>();
-               this.closeStreamOnCancelRegistry = new ClosableRegistry();
+               this.closeStreamOnCancelRegistry = new CloseableRegistry();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 8fbde05..b131d14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -36,7 +37,7 @@ import java.util.Iterator;
 public class StateInitializationContextImpl implements 
StateInitializationContext {
 
        /** Closable registry to participate in the operator's cancel/close 
methods */
-       private final ClosableRegistry closableRegistry;
+       private final CloseableRegistry closableRegistry;
 
        /** Signal whether any state to restore was found */
        private final boolean restored;
@@ -55,7 +56,7 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                        KeyedStateStore keyedStateStore,
                        Collection<KeyGroupsStateHandle> keyGroupsStateHandles,
                        Collection<OperatorStateHandle> operatorStateHandles,
-                       ClosableRegistry closableRegistry) {
+                       CloseableRegistry closableRegistry) {
 
                this.restored = restored;
                this.closableRegistry = 
Preconditions.checkNotNull(closableRegistry);
@@ -87,7 +88,7 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                return keyGroupsStateHandles;
        }
 
-       public ClosableRegistry getClosableRegistry() {
+       public CloseableRegistry getClosableRegistry() {
                return closableRegistry;
        }
 
@@ -137,14 +138,14 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
        private static class KeyGroupStreamIterator implements 
Iterator<KeyGroupStatePartitionStreamProvider> {
 
                private final Iterator<KeyGroupsStateHandle> 
stateHandleIterator;
-               private final ClosableRegistry closableRegistry;
+               private final CloseableRegistry closableRegistry;
 
                private KeyGroupsStateHandle currentStateHandle;
                private FSDataInputStream currentStream;
                private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
 
                public KeyGroupStreamIterator(
-                               Iterator<KeyGroupsStateHandle> 
stateHandleIterator, ClosableRegistry closableRegistry) {
+                               Iterator<KeyGroupsStateHandle> 
stateHandleIterator, CloseableRegistry closableRegistry) {
 
                        this.stateHandleIterator = 
Preconditions.checkNotNull(stateHandleIterator);
                        this.closableRegistry = 
Preconditions.checkNotNull(closableRegistry);
@@ -200,7 +201,7 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                private final String stateName; //TODO since we only support a 
single named state in raw, this could be dropped
 
                private final Iterator<OperatorStateHandle> stateHandleIterator;
-               private final ClosableRegistry closableRegistry;
+               private final CloseableRegistry closableRegistry;
 
                private OperatorStateHandle currentStateHandle;
                private FSDataInputStream currentStream;
@@ -210,7 +211,7 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                public OperatorStateStreamIterator(
                                String stateName,
                                Iterator<OperatorStateHandle> 
stateHandleIterator,
-                               ClosableRegistry closableRegistry) {
+                               CloseableRegistry closableRegistry) {
 
                        this.stateName = Preconditions.checkNotNull(stateName);
                        this.stateHandleIterator = 
Preconditions.checkNotNull(stateHandleIterator);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index d632529..ce8a6c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -42,7 +43,7 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
         * Registry for opened streams to participate in the lifecycle of the 
stream task. Hence, this registry should be 
         * obtained from and managed by the stream task.
         */
-       private final ClosableRegistry closableRegistry;
+       private final CloseableRegistry closableRegistry;
 
        private KeyedStateCheckpointOutputStream 
keyedStateCheckpointOutputStream;
        private OperatorStateCheckpointOutputStream 
operatorStateCheckpointOutputStream;
@@ -62,7 +63,7 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
                        long checkpointTimestamp,
                        CheckpointStreamFactory streamFactory,
                        KeyGroupRange keyGroupRange,
-                       ClosableRegistry closableRegistry) {
+                       CloseableRegistry closableRegistry) {
 
                this.checkpointId = checkpointId;
                this.checkpointTimestamp = checkpointTimestamp;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 29e905c..b61c52d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -43,9 +43,6 @@ public class FileStateHandle implements StreamStateHandle {
        /** The size of the state in the file */
        private final long stateSize;
 
-       /** Cached file system handle */
-       private transient FileSystem fs;
-
        /**
         * Creates a new file state for the given file path.
         *
@@ -79,13 +76,17 @@ public class FileStateHandle implements StreamStateHandle {
         */
        @Override
        public void discardState() throws Exception {
-               getFileSystem().delete(filePath, false);
+
+               FileSystem fs = getFileSystem();
+
+               fs.delete(filePath, false);
 
                // send a call to delete the checkpoint directory containing 
the file. This will
                // fail (and be ignored) when some files still exist
                try {
-                       getFileSystem().delete(filePath.getParent(), false);
-               } catch (IOException ignored) {}
+                       fs.delete(filePath.getParent(), false);
+               } catch (IOException ignored) {
+               }
        }
 
        /**
@@ -106,10 +107,7 @@ public class FileStateHandle implements StreamStateHandle {
         * @throws IOException Thrown if the file system cannot be accessed.
         */
        private FileSystem getFileSystem() throws IOException {
-               if (fs == null) {
-                       fs = FileSystem.get(filePath.toUri());
-               }
-               return fs;
+               return FileSystem.get(filePath.toUri());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3254fc1..c794f56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -538,6 +539,9 @@ public class Task implements Runnable, TaskActions {
                        //  check for canceling as a shortcut
                        // ----------------------------
 
+                       // init closeable registry for this task
+                       FileSystem.createFileSystemCloseableRegistryForTask();
+
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
                        LOG.info("Loading JAR files for task " + 
taskNameWithSubtask);
@@ -758,6 +762,7 @@ public class Task implements Runnable, TaskActions {
 
                                // remove all files in the distributed cache
                                removeCachedFiles(distributedCacheEntries, 
fileCache);
+                               
FileSystem.disposeFileSystemCloseableRegistryForTask();
 
                                notifyFinalState();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4aaad71..6595901 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -36,7 +37,6 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -161,7 +161,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
 
        /** The currently active background materialization threads */
-       private final ClosableRegistry cancelables = new ClosableRegistry();
+       private final CloseableRegistry cancelables = new CloseableRegistry();
 
        /** Flag to mark the task "in operation", in which case check
         * needs to be initialized to true, so that early cancel() before 
invoke() behaves correctly */
@@ -949,7 +949,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       public ClosableRegistry getCancelables() {
+       public CloseableRegistry getCancelables() {
                return cancelables;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 75c2261..cd94076 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -58,7 +58,7 @@ public class StateInitializationContextImplTest {
        static final int NUM_HANDLES = 10;
 
        private StateInitializationContextImpl initializationContext;
-       private ClosableRegistry closableRegistry;
+       private CloseableRegistry closableRegistry;
 
        private int writtenKeyGroups;
        private Set<Integer> writtenOperatorStates;
@@ -70,7 +70,7 @@ public class StateInitializationContextImplTest {
                this.writtenKeyGroups = 0;
                this.writtenOperatorStates = new HashSet<>();
 
-               this.closableRegistry = new ClosableRegistry();
+               this.closableRegistry = new CloseableRegistry();
                OperatorStateStore stateStore = mock(OperatorStateStore.class);
 
                ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos(64);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 0ee839e..2b2df4c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
@@ -35,7 +35,7 @@ public class StateSnapshotContextSynchronousImplTest {
 
        @Before
        public void setUp() throws Exception {
-               ClosableRegistry closableRegistry = new ClosableRegistry();
+               CloseableRegistry closableRegistry = new CloseableRegistry();
                CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(1024);
                KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
                this.snapshotContext = new 
StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, 
closableRegistry);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 23a31d5..830cd6f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
@@ -32,7 +33,6 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -65,7 +65,9 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Base class for {@code AbstractStreamOperator} test harnesses.
@@ -86,7 +88,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
        final Environment environment;
 
-       ClosableRegistry closableRegistry;
+       CloseableRegistry closableRegistry;
 
        // use this as default for tests
        protected AbstractStateBackend stateBackend = new MemoryStateBackend();
@@ -115,7 +117,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                this.config = new StreamConfig(underlyingConfig);
                this.config.setCheckpointingEnabled(true);
                this.executionConfig = new ExecutionConfig();
-               this.closableRegistry = new ClosableRegistry();
+               this.closableRegistry = new CloseableRegistry();
                this.checkpointLock = new Object();
 
                environment = new MockEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 5a64173..09de67f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -114,6 +114,7 @@ public class RescalingITCase extends TestLogger {
        public static void teardown() {
                if (cluster != null) {
                        cluster.shutdown();
+                       cluster.awaitTermination();
                }
        }
 

Reply via email to