This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d89eb1a2e8dd035d672fda3e5d31f5828241071d Author: Roman Khachatryan <[email protected]> AuthorDate: Wed May 27 18:50:54 2020 +0200 [FLINK-17986] Fix check in FsCheckpointStateOutputStream.write --- .../filesystem/FsCheckpointStreamFactory.java | 4 ++-- .../filesystem/FsCheckpointStreamFactoryTest.java | 22 +++++++++++++++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 9234c9f..c37fef0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -208,7 +208,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { @Override public void write(int b) throws IOException { - if (outStream != null || pos >= writeBuffer.length) { + if (pos >= writeBuffer.length) { flushToFile(); } writeBuffer[pos++] = (byte) b; @@ -269,7 +269,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { */ @Override public void flush() throws IOException { - if (pos > localStateThreshold) { + if (outStream != null || pos > localStateThreshold) { flushToFile(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java index 88fea13..53d7746 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java @@ -58,6 +58,22 @@ public class FsCheckpointStreamFactoryTest { // ------------------------------------------------------------------------ @Test + @SuppressWarnings("ConstantConditions") + public void testWriteFlushesIfAboveThreshold() throws IOException { + int fileSizeThreshold = 100; + final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), fileSizeThreshold, fileSizeThreshold); + final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); + stream.write(new byte[fileSizeThreshold]); + File[] files = new File(exclusiveStateDir.toUri()).listFiles(); + assertEquals(1, files.length); + File file = files[0]; + assertEquals(fileSizeThreshold, file.length()); + stream.write(new byte[fileSizeThreshold - 1]); // should buffer without flushing + stream.write(127); // should buffer without flushing + assertEquals(fileSizeThreshold, file.length()); + } + + @Test public void testExclusiveStateHasRelativePathHandles() throws IOException { final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), 0); @@ -129,6 +145,10 @@ public class FsCheckpointStreamFactoryTest { } private FsCheckpointStreamFactory createFactory(FileSystem fs, int fileSizeThreshold) { - return new FsCheckpointStreamFactory(fs, exclusiveStateDir, sharedStateDir, fileSizeThreshold, 4096); + return createFactory(fs, fileSizeThreshold, 4096); + } + + private FsCheckpointStreamFactory createFactory(FileSystem fs, int fileSizeThreshold, int bufferSize) { + return new FsCheckpointStreamFactory(fs, exclusiveStateDir, sharedStateDir, fileSizeThreshold, bufferSize); } }
