This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit caa296b813b8a719910f4e1337e011c772a12868
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Mar 31 14:14:30 2022 +0200

    [FLINK-26957][runtime] Adds invariant to LocalDataOutputStream to verify 
that no operation is allowed on a closed stream
---
 .../flink/core/fs/local/LocalDataOutputStream.java | 18 ++++++++++
 .../flink/core/fs/local/LocalFileSystemTest.java   | 40 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index 14eaf70c159..a4da43416fd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -21,9 +21,12 @@ package org.apache.flink.core.fs.local;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 
 /**
  * The <code>LocalDataOutputStream</code> class is a wrapper class for a data 
output stream to the
@@ -35,6 +38,8 @@ public class LocalDataOutputStream extends FSDataOutputStream 
{
     /** The file output stream used to write data. */
     private final FileOutputStream fos;
 
+    private boolean closed = false;
+
     /**
      * Constructs a new <code>LocalDataOutputStream</code> object from a given 
{@link File} object.
      *
@@ -47,36 +52,49 @@ public class LocalDataOutputStream extends 
FSDataOutputStream {
 
     @Override
     public void write(final int b) throws IOException {
+        checkOpen();
         fos.write(b);
     }
 
     @Override
     public void write(@Nonnull final byte[] b) throws IOException {
+        checkOpen();
         fos.write(b);
     }
 
     @Override
     public void write(final byte[] b, final int off, final int len) throws 
IOException {
+        checkOpen();
         fos.write(b, off, len);
     }
 
     @Override
     public void close() throws IOException {
+        closed = true;
         fos.close();
     }
 
     @Override
     public void flush() throws IOException {
+        checkOpen();
         fos.flush();
     }
 
     @Override
     public void sync() throws IOException {
+        checkOpen();
         fos.getFD().sync();
     }
 
     @Override
     public long getPos() throws IOException {
+        checkOpen();
         return fos.getChannel().position();
     }
+
+    private void checkOpen() throws IOException {
+        if (closed) {
+            throw new ClosedChannelException();
+        }
+    }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index f985d5aca46..ca7cd5ce7bb 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.Assume;
@@ -38,6 +39,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -349,6 +351,44 @@ public class LocalFileSystemTest extends TestLogger {
         }
     }
 
+    @Test(expected = ClosedChannelException.class)
+    public void testFlushMethodFailsOnClosedOutputStream() throws IOException {
+        testMethodCallFailureOnClosedStream(FSDataOutputStream::flush);
+    }
+
+    @Test(expected = ClosedChannelException.class)
+    public void testWriteIntegerMethodFailsOnClosedOutputStream() throws 
IOException {
+        testMethodCallFailureOnClosedStream(os -> os.write(0));
+    }
+
+    @Test(expected = ClosedChannelException.class)
+    public void testWriteBytesMethodFailsOnClosedOutputStream() throws 
IOException {
+        testMethodCallFailureOnClosedStream(os -> os.write(new byte[0]));
+    }
+
+    @Test(expected = ClosedChannelException.class)
+    public void testWriteBytesSubArrayMethodFailsOnClosedOutputStream() throws 
IOException {
+        testMethodCallFailureOnClosedStream(os -> os.write(new byte[0], 0, 0));
+    }
+
+    @Test(expected = ClosedChannelException.class)
+    public void testGetPosMethodFailsOnClosedOutputStream() throws IOException 
{
+        testMethodCallFailureOnClosedStream(FSDataOutputStream::getPos);
+    }
+
+    private void testMethodCallFailureOnClosedStream(
+            ThrowingConsumer<FSDataOutputStream, IOException> callback) throws 
IOException {
+        final FileSystem fs = FileSystem.getLocalFileSystem();
+        final FSDataOutputStream outputStream =
+                fs.create(
+                        new Path(
+                                temporaryFolder.getRoot().toString(),
+                                "close_fs_test_" + UUID.randomUUID()),
+                        WriteMode.OVERWRITE);
+        outputStream.close();
+        callback.accept(outputStream);
+    }
+
     private Collection<File> createTargetDirectories(
             File root, int directoryDepth, int numberDirectories) {
         final StringBuilder stringBuilder = new StringBuilder();

Reply via email to