This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d89eb1a2e8dd035d672fda3e5d31f5828241071d
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed May 27 18:50:54 2020 +0200

    [FLINK-17986] Fix check in FsCheckpointStateOutputStream.write
---
 .../filesystem/FsCheckpointStreamFactory.java      |  4 ++--
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 22 +++++++++++++++++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 9234c9f..c37fef0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -208,7 +208,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
                @Override
                public void write(int b) throws IOException {
-                       if (outStream != null || pos >= writeBuffer.length) {
+                       if (pos >= writeBuffer.length) {
                                flushToFile();
                        }
                        writeBuffer[pos++] = (byte) b;
@@ -269,7 +269,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                 */
                @Override
                public void flush() throws IOException {
-                       if (pos > localStateThreshold) {
+                       if (outStream != null || pos > localStateThreshold) {
                                flushToFile();
                        }
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
index 88fea13..53d7746 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
@@ -58,6 +58,22 @@ public class FsCheckpointStreamFactoryTest {
        // 
------------------------------------------------------------------------
 
        @Test
+       @SuppressWarnings("ConstantConditions")
+       public void testWriteFlushesIfAboveThreshold() throws IOException {
+               int fileSizeThreshold = 100;
+               final FsCheckpointStreamFactory factory = 
createFactory(FileSystem.getLocalFileSystem(), fileSizeThreshold, 
fileSizeThreshold);
+               final FsCheckpointStreamFactory.FsCheckpointStateOutputStream 
stream = 
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+               stream.write(new byte[fileSizeThreshold]);
+               File[] files = new File(exclusiveStateDir.toUri()).listFiles();
+               assertEquals(1, files.length);
+               File file = files[0];
+               assertEquals(fileSizeThreshold, file.length());
+               stream.write(new byte[fileSizeThreshold - 1]); // should buffer 
without flushing
+               stream.write(127); // should buffer without flushing
+               assertEquals(fileSizeThreshold, file.length());
+       }
+
+       @Test
        public void testExclusiveStateHasRelativePathHandles() throws 
IOException {
                final FsCheckpointStreamFactory factory = 
createFactory(FileSystem.getLocalFileSystem(), 0);
 
@@ -129,6 +145,10 @@ public class FsCheckpointStreamFactoryTest {
        }
 
        private FsCheckpointStreamFactory createFactory(FileSystem fs, int 
fileSizeThreshold) {
-               return new FsCheckpointStreamFactory(fs, exclusiveStateDir, 
sharedStateDir, fileSizeThreshold, 4096);
+               return createFactory(fs, fileSizeThreshold, 4096);
+       }
+
+       private FsCheckpointStreamFactory createFactory(FileSystem fs, int 
fileSizeThreshold, int bufferSize) {
+               return new FsCheckpointStreamFactory(fs, exclusiveStateDir, 
sharedStateDir, fileSizeThreshold, bufferSize);
        }
 }

Reply via email to