This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit caa296b813b8a719910f4e1337e011c772a12868 Author: Matthias Pohl <[email protected]> AuthorDate: Thu Mar 31 14:14:30 2022 +0200 [FLINK-26957][runtime] Adds invariant to LocalDataOutputStream to verify that no operation is allowed on a closed stream --- .../flink/core/fs/local/LocalDataOutputStream.java | 18 ++++++++++ .../flink/core/fs/local/LocalFileSystemTest.java | 40 ++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java index 14eaf70c159..a4da43416fd 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java @@ -21,9 +21,12 @@ package org.apache.flink.core.fs.local; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.FSDataOutputStream; +import javax.annotation.Nonnull; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.ClosedChannelException; /** * The <code>LocalDataOutputStream</code> class is a wrapper class for a data output stream to the @@ -35,6 +38,8 @@ public class LocalDataOutputStream extends FSDataOutputStream { /** The file output stream used to write data. */ private final FileOutputStream fos; + private boolean closed = false; + /** * Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object. * @@ -47,36 +52,49 @@ public class LocalDataOutputStream extends FSDataOutputStream { @Override public void write(final int b) throws IOException { + checkOpen(); fos.write(b); } @Override public void write(@Nonnull final byte[] b) throws IOException { + checkOpen(); fos.write(b); } @Override public void write(final byte[] b, final int off, final int len) throws IOException { + checkOpen(); fos.write(b, off, len); } @Override public void close() throws IOException { + closed = true; fos.close(); } @Override public void flush() throws IOException { + checkOpen(); fos.flush(); } @Override public void sync() throws IOException { + checkOpen(); fos.getFD().sync(); } @Override public long getPos() throws IOException { + checkOpen(); return fos.getChannel().position(); } + + private void checkOpen() throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + } } diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java index f985d5aca46..ca7cd5ce7bb 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Assume; @@ -38,6 +39,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -349,6 +351,44 @@ public class LocalFileSystemTest extends TestLogger { } } + @Test(expected = ClosedChannelException.class) + public void testFlushMethodFailsOnClosedOutputStream() throws IOException { + testMethodCallFailureOnClosedStream(FSDataOutputStream::flush); + } + + @Test(expected = ClosedChannelException.class) + public void testWriteIntegerMethodFailsOnClosedOutputStream() throws IOException { + testMethodCallFailureOnClosedStream(os -> os.write(0)); + } + + @Test(expected = ClosedChannelException.class) + public void testWriteBytesMethodFailsOnClosedOutputStream() throws IOException { + testMethodCallFailureOnClosedStream(os -> os.write(new byte[0])); + } + + @Test(expected = ClosedChannelException.class) + public void testWriteBytesSubArrayMethodFailsOnClosedOutputStream() throws IOException { + testMethodCallFailureOnClosedStream(os -> os.write(new byte[0], 0, 0)); + } + + @Test(expected = ClosedChannelException.class) + public void testGetPosMethodFailsOnClosedOutputStream() throws IOException { + testMethodCallFailureOnClosedStream(FSDataOutputStream::getPos); + } + + private void testMethodCallFailureOnClosedStream( + ThrowingConsumer<FSDataOutputStream, IOException> callback) throws IOException { + final FileSystem fs = FileSystem.getLocalFileSystem(); + final FSDataOutputStream outputStream = + fs.create( + new Path( + temporaryFolder.getRoot().toString(), + "close_fs_test_" + UUID.randomUUID()), + WriteMode.OVERWRITE); + outputStream.close(); + callback.accept(outputStream); + } + private Collection<File> createTargetDirectories( File root, int directoryDepth, int numberDirectories) { final StringBuilder stringBuilder = new StringBuilder();
