This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b4fe87  [FLINK-17820][checkpointing] Respect fileSizeThreshold in 
FsCheckpointStateOutputStream.flush()
8b4fe87 is described below

commit 8b4fe87a74d3ec631350ebac4dfdf69094c802e3
Author: Roman <[email protected]>
AuthorDate: Wed May 27 08:39:43 2020 +0200

    [FLINK-17820][checkpointing] Respect fileSizeThreshold in 
FsCheckpointStateOutputStream.flush()
    
    This closes #12332
---
 .../filesystem/FsCheckpointStreamFactory.java      | 34 +++++++++------
 .../channel/ChannelStateCheckpointWriterTest.java  | 49 +++++++++++++++-------
 .../state/filesystem/FsCheckpointStorageTest.java  |  8 ++--
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 31 +++++++++++---
 .../filesystem/FsStateBackendEntropyTest.java      |  7 ++--
 5 files changed, 90 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 4a4db0e..9234c9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -208,8 +208,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
                @Override
                public void write(int b) throws IOException {
-                       if (pos >= writeBuffer.length) {
-                               flush();
+                       if (outStream != null || pos >= writeBuffer.length) {
+                               flushToFile();
                        }
                        writeBuffer[pos++] = (byte) b;
                }
@@ -226,8 +226,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                        len -= remaining;
                                        pos += remaining;
 
-                                       // flush the write buffer to make it 
clear again
-                                       flush();
+                                       // flushToFile the write buffer to make 
it clear again
+                                       flushToFile();
                                }
 
                                // copy what is in the buffer
@@ -235,8 +235,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                pos += len;
                        }
                        else {
-                               // flush the current buffer
-                               flush();
+                               // flushToFile the current buffer
+                               flushToFile();
                                // write the bytes directly
                                outStream.write(b, off, len);
                        }
@@ -247,15 +247,13 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        return pos + (outStream == null ? 0 : 
outStream.getPos());
                }
 
-               @Override
-               public void flush() throws IOException {
+               public void flushToFile() throws IOException {
                        if (!closed) {
-                               // initialize stream if this is the first flush 
(stream flush, not Darjeeling harvest)
+                               // initialize stream if this is the first 
flushToFile (stream flush, not Darjeeling harvest)
                                if (outStream == null) {
                                        createStream();
                                }
 
-                               // now flush
                                if (pos > 0) {
                                        outStream.write(writeBuffer, 0, pos);
                                        pos = 0;
@@ -266,6 +264,16 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        }
                }
 
+               /**
+                * Flush buffers to file if their size is above {@link 
#localStateThreshold}.
+                */
+               @Override
+               public void flush() throws IOException {
+                       if (pos > localStateThreshold) {
+                               flushToFile();
+                       }
+               }
+
                @Override
                public void sync() throws IOException {
                        outStream.sync();
@@ -289,7 +297,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        if (!closed) {
                                closed = true;
 
-                               // make sure write requests need to go to 
'flush()' where they recognized
+                               // make sure write requests need to go to 
'flushToFile()' where they recognized
                                // that the stream is closed
                                pos = writeBuffer.length;
 
@@ -327,7 +335,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                        }
                                        else {
                                                try {
-                                                       flush();
+                                                       flushToFile();
 
                                                        pos = 
writeBuffer.length;
 
@@ -354,7 +362,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                                                        
statePath, deleteException);
                                                        }
 
-                                                       throw new 
IOException("Could not flush and close the file system " +
+                                                       throw new 
IOException("Could not flush to file and close the file system " +
                                                                "output stream 
to " + statePath + " in order to obtain the " +
                                                                "stream state 
handle", exception);
                                                } finally {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 1d501f3..e0cf26a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -21,20 +21,28 @@ import org.apache.flink.core.memory.HeapMemorySegment;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
 import 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 import org.apache.flink.util.function.RunnableWithException;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -48,6 +56,27 @@ public class ChannelStateCheckpointWriterTest {
        };
        private final Random random = new Random();
 
+       @Rule
+       public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Test
+       @SuppressWarnings("ConstantConditions")
+       public void testSmallFilesNotWritten() throws Exception {
+               int threshold = 100;
+               File checkpointsDir = 
temporaryFolder.newFolder("checkpointsDir");
+               File sharedStateDir = 
temporaryFolder.newFolder("sharedStateDir");
+               FsCheckpointStreamFactory checkpointStreamFactory = new 
FsCheckpointStreamFactory(getSharedInstance(), fromLocalFile(checkpointsDir), 
fromLocalFile(sharedStateDir), threshold, threshold);
+               ChannelStateWriteResult result = new ChannelStateWriteResult();
+               ChannelStateCheckpointWriter writer = createWriter(result, 
checkpointStreamFactory.createCheckpointStateOutputStream(EXCLUSIVE));
+               NetworkBuffer buffer = new 
NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(threshold / 2, 
null), FreeingBufferRecycler.INSTANCE);
+               writer.writeInput(new InputChannelInfo(1, 2), buffer);
+               writer.completeOutput();
+               writer.completeInput();
+               assertTrue(result.isDone());
+               assertEquals(0, checkpointsDir.list().length);
+               assertEquals(0, sharedStateDir.list().length);
+       }
+
        @Test
        public void testEmptyState() throws Exception {
                MemoryCheckpointOutputStream stream = new 
MemoryCheckpointOutputStream(1000) {
@@ -57,13 +86,7 @@ public class ChannelStateCheckpointWriterTest {
                                return null;
                        }
                };
-               ChannelStateCheckpointWriter writer = new 
ChannelStateCheckpointWriter(
-                               1L,
-                               new ChannelStateWriteResult(),
-                               stream,
-                               new ChannelStateSerializerImpl(),
-                               NO_OP_RUNNABLE
-               );
+               ChannelStateCheckpointWriter writer = createWriter(new 
ChannelStateWriteResult(), stream);
                writer.completeOutput();
                writer.completeInput();
                assertTrue(stream.isClosed());
@@ -155,13 +178,11 @@ public class ChannelStateCheckpointWriterTest {
        }
 
        private ChannelStateCheckpointWriter 
createWriter(ChannelStateWriteResult result) throws Exception {
-               return new ChannelStateCheckpointWriter(
-                       1L,
-                       result,
-                       new MemoryCheckpointOutputStream(1000),
-                       new ChannelStateSerializerImpl(),
-                       NO_OP_RUNNABLE
-               );
+               return createWriter(result, new 
MemoryCheckpointOutputStream(1000));
+       }
+
+       private ChannelStateCheckpointWriter 
createWriter(ChannelStateWriteResult result, CheckpointStateOutputStream 
stream) throws Exception {
+               return new ChannelStateCheckpointWriter(1L, result, stream, new 
ChannelStateSerializerImpl(), NO_OP_RUNNABLE);
        }
 
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
index 4ec4de0..51bd981 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
@@ -162,11 +162,11 @@ public class FsCheckpointStorageTest extends 
AbstractFileCheckpointStorageTestBa
 
                // create exclusive state
 
-               CheckpointStateOutputStream exclusiveStream =
+               FsCheckpointStateOutputStream exclusiveStream =
                                
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
 
                exclusiveStream.write(42);
-               exclusiveStream.flush();
+               exclusiveStream.flushToFile();
                StreamStateHandle exclusiveHandle = 
exclusiveStream.closeAndGetHandle();
 
                assertEquals(1, 
fs.listStatus(storageLocation.getCheckpointDirectory()).length);
@@ -174,11 +174,11 @@ public class FsCheckpointStorageTest extends 
AbstractFileCheckpointStorageTestBa
 
                // create shared state
 
-               CheckpointStateOutputStream sharedStream =
+               FsCheckpointStateOutputStream sharedStream =
                                
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
 
                sharedStream.write(42);
-               sharedStream.flush();
+               sharedStream.flushToFile();
                StreamStateHandle sharedHandle = 
sharedStream.closeAndGetHandle();
 
                assertEquals(1, 
fs.listStatus(storageLocation.getCheckpointDirectory()).length);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
index 1fa5cc0..88fea13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
@@ -28,6 +28,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 
 import static org.hamcrest.Matchers.instanceOf;
@@ -58,7 +59,7 @@ public class FsCheckpointStreamFactoryTest {
 
        @Test
        public void testExclusiveStateHasRelativePathHandles() throws 
IOException {
-               final FsCheckpointStreamFactory factory = 
createFactory(FileSystem.getLocalFileSystem());
+               final FsCheckpointStreamFactory factory = 
createFactory(FileSystem.getLocalFileSystem(), 0);
 
                final FsCheckpointStreamFactory.FsCheckpointStateOutputStream 
stream =
                                
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
@@ -71,7 +72,7 @@ public class FsCheckpointStreamFactoryTest {
 
        @Test
        public void testSharedStateHasAbsolutePathHandles() throws IOException {
-               final FsCheckpointStreamFactory factory = 
createFactory(FileSystem.getLocalFileSystem());
+               final FsCheckpointStreamFactory factory = 
createFactory(FileSystem.getLocalFileSystem(), 0);
 
                final FsCheckpointStreamFactory.FsCheckpointStateOutputStream 
stream =
                        
factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
@@ -85,7 +86,7 @@ public class FsCheckpointStreamFactoryTest {
 
        @Test
        public void testEntropyMakesExclusiveStateAbsolutePaths() throws 
IOException{
-               final FsCheckpointStreamFactory factory = createFactory(new 
FsStateBackendEntropyTest.TestEntropyAwareFs());
+               final FsCheckpointStreamFactory factory = createFactory(new 
FsStateBackendEntropyTest.TestEntropyAwareFs(), 0);
 
                final FsCheckpointStreamFactory.FsCheckpointStateOutputStream 
stream =
                        
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
@@ -97,6 +98,26 @@ public class FsCheckpointStreamFactoryTest {
                assertPathsEqual(exclusiveStateDir, ((FileStateHandle) 
handle).getFilePath().getParent());
        }
 
+       @Test
+       public void testFlushUnderThreshold() throws IOException {
+               flushAndVerify(10, 10, true);
+       }
+
+       @Test
+       public void testFlushAboveThreshold() throws IOException {
+               flushAndVerify(10, 11, false);
+       }
+
+       private void flushAndVerify(int minFileSize, int bytesToFlush, boolean 
expectEmpty) throws IOException {
+               FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+                               createFactory(new 
FsStateBackendEntropyTest.TestEntropyAwareFs(), minFileSize)
+                                               
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+
+               stream.write(new byte[bytesToFlush], 0, bytesToFlush);
+               stream.flush();
+               assertEquals(expectEmpty ? 0 : 1, new 
File(exclusiveStateDir.toUri()).listFiles().length);
+       }
+
        // 
------------------------------------------------------------------------
        //  test utils
        // 
------------------------------------------------------------------------
@@ -107,7 +128,7 @@ public class FsCheckpointStreamFactoryTest {
                assertEquals(reNormalizedExpected, reNormalizedActual);
        }
 
-       private FsCheckpointStreamFactory createFactory(FileSystem fs) {
-               return new FsCheckpointStreamFactory(fs, exclusiveStateDir, 
sharedStateDir, 0, 4096);
+       private FsCheckpointStreamFactory createFactory(FileSystem fs, int 
fileSizeThreshold) {
+               return new FsCheckpointStreamFactory(fs, exclusiveStateDir, 
sharedStateDir, fileSizeThreshold, 4096);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index 72ed803..3b4a9e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -52,13 +52,14 @@ public class FsStateBackendEntropyTest {
 
        @Test
        public void testEntropyInjection() throws Exception {
+               final int fileSizeThreshold = 1024;
                final FileSystem fs = new TestEntropyAwareFs();
 
                final Path checkpointDir = new 
Path(Path.fromLocalFile(tmp.newFolder()), ENTROPY_MARKER + "/checkpoints");
                final String checkpointDirStr = checkpointDir.toString();
 
                final FsCheckpointStorage storage = new FsCheckpointStorage(
-                               fs, checkpointDir, null, new JobID(), 1024, 
4096);
+                               fs, checkpointDir, null, new JobID(), 
fileSizeThreshold, 4096);
                storage.initializeBaseLocations();
 
                final FsCheckpointStorageLocation location = 
(FsCheckpointStorageLocation)
@@ -71,7 +72,7 @@ public class FsStateBackendEntropyTest {
 
                // check entropy in task-owned state
                try (CheckpointStateOutputStream stream = 
storage.createTaskOwnedStateStream()) {
-                       stream.flush();
+                       stream.write(new byte[fileSizeThreshold + 1], 0, 
fileSizeThreshold + 1);
                        FileStateHandle handle = (FileStateHandle) 
stream.closeAndGetHandle();
 
                        assertNotNull(handle);
@@ -82,8 +83,8 @@ public class FsStateBackendEntropyTest {
                // check entropy in the exclusive/shared state
                try (CheckpointStateOutputStream stream =
                                
location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+                       stream.write(new byte[fileSizeThreshold + 1], 0, 
fileSizeThreshold + 1);
 
-                       stream.flush();
                        FileStateHandle handle = (FileStateHandle) 
stream.closeAndGetHandle();
 
                        assertNotNull(handle);

Reply via email to