This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8b4fe87 [FLINK-17820][checkpointing] Respect fileSizeThreshold in
FsCheckpointStateOutputStream.flush()
8b4fe87 is described below
commit 8b4fe87a74d3ec631350ebac4dfdf69094c802e3
Author: Roman <[email protected]>
AuthorDate: Wed May 27 08:39:43 2020 +0200
[FLINK-17820][checkpointing] Respect fileSizeThreshold in
FsCheckpointStateOutputStream.flush()
This closes #12332
---
.../filesystem/FsCheckpointStreamFactory.java | 34 +++++++++------
.../channel/ChannelStateCheckpointWriterTest.java | 49 +++++++++++++++-------
.../state/filesystem/FsCheckpointStorageTest.java | 8 ++--
.../filesystem/FsCheckpointStreamFactoryTest.java | 31 +++++++++++---
.../filesystem/FsStateBackendEntropyTest.java | 7 ++--
5 files changed, 90 insertions(+), 39 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 4a4db0e..9234c9f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -208,8 +208,8 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
@Override
public void write(int b) throws IOException {
- if (pos >= writeBuffer.length) {
- flush();
+ if (outStream != null || pos >= writeBuffer.length) {
+ flushToFile();
}
writeBuffer[pos++] = (byte) b;
}
@@ -226,8 +226,8 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
len -= remaining;
pos += remaining;
- // flush the write buffer to make it
clear again
- flush();
+ // flushToFile the write buffer to make
it clear again
+ flushToFile();
}
// copy what is in the buffer
@@ -235,8 +235,8 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
pos += len;
}
else {
- // flush the current buffer
- flush();
+ // flushToFile the current buffer
+ flushToFile();
// write the bytes directly
outStream.write(b, off, len);
}
@@ -247,15 +247,13 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
return pos + (outStream == null ? 0 :
outStream.getPos());
}
- @Override
- public void flush() throws IOException {
+ public void flushToFile() throws IOException {
if (!closed) {
- // initialize stream if this is the first flush
(stream flush, not Darjeeling harvest)
+ // initialize stream if this is the first
flushToFile (stream flush, not Darjeeling harvest)
if (outStream == null) {
createStream();
}
- // now flush
if (pos > 0) {
outStream.write(writeBuffer, 0, pos);
pos = 0;
@@ -266,6 +264,16 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
}
}
+ /**
+ * Flush buffers to file if their size is above {@link
#localStateThreshold}.
+ */
+ @Override
+ public void flush() throws IOException {
+ if (pos > localStateThreshold) {
+ flushToFile();
+ }
+ }
+
@Override
public void sync() throws IOException {
outStream.sync();
@@ -289,7 +297,7 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
if (!closed) {
closed = true;
- // make sure write requests need to go to
'flush()' where they recognized
+ // make sure write requests need to go to
'flushToFile()' where they recognized
// that the stream is closed
pos = writeBuffer.length;
@@ -327,7 +335,7 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
}
else {
try {
- flush();
+ flushToFile();
pos =
writeBuffer.length;
@@ -354,7 +362,7 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
statePath, deleteException);
}
- throw new
IOException("Could not flush and close the file system " +
+ throw new
IOException("Could not flush to file and close the file system " +
"output stream
to " + statePath + " in order to obtain the " +
"stream state
handle", exception);
} finally {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 1d501f3..e0cf26a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -21,20 +21,28 @@ import org.apache.flink.core.memory.HeapMemorySegment;
import
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
import org.apache.flink.util.function.RunnableWithException;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -48,6 +56,27 @@ public class ChannelStateCheckpointWriterTest {
};
private final Random random = new Random();
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ @SuppressWarnings("ConstantConditions")
+ public void testSmallFilesNotWritten() throws Exception {
+ int threshold = 100;
+ File checkpointsDir =
temporaryFolder.newFolder("checkpointsDir");
+ File sharedStateDir =
temporaryFolder.newFolder("sharedStateDir");
+ FsCheckpointStreamFactory checkpointStreamFactory = new
FsCheckpointStreamFactory(getSharedInstance(), fromLocalFile(checkpointsDir),
fromLocalFile(sharedStateDir), threshold, threshold);
+ ChannelStateWriteResult result = new ChannelStateWriteResult();
+ ChannelStateCheckpointWriter writer = createWriter(result,
checkpointStreamFactory.createCheckpointStateOutputStream(EXCLUSIVE));
+ NetworkBuffer buffer = new
NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(threshold / 2,
null), FreeingBufferRecycler.INSTANCE);
+ writer.writeInput(new InputChannelInfo(1, 2), buffer);
+ writer.completeOutput();
+ writer.completeInput();
+ assertTrue(result.isDone());
+ assertEquals(0, checkpointsDir.list().length);
+ assertEquals(0, sharedStateDir.list().length);
+ }
+
@Test
public void testEmptyState() throws Exception {
MemoryCheckpointOutputStream stream = new
MemoryCheckpointOutputStream(1000) {
@@ -57,13 +86,7 @@ public class ChannelStateCheckpointWriterTest {
return null;
}
};
- ChannelStateCheckpointWriter writer = new
ChannelStateCheckpointWriter(
- 1L,
- new ChannelStateWriteResult(),
- stream,
- new ChannelStateSerializerImpl(),
- NO_OP_RUNNABLE
- );
+ ChannelStateCheckpointWriter writer = createWriter(new
ChannelStateWriteResult(), stream);
writer.completeOutput();
writer.completeInput();
assertTrue(stream.isClosed());
@@ -155,13 +178,11 @@ public class ChannelStateCheckpointWriterTest {
}
private ChannelStateCheckpointWriter
createWriter(ChannelStateWriteResult result) throws Exception {
- return new ChannelStateCheckpointWriter(
- 1L,
- result,
- new MemoryCheckpointOutputStream(1000),
- new ChannelStateSerializerImpl(),
- NO_OP_RUNNABLE
- );
+ return createWriter(result, new
MemoryCheckpointOutputStream(1000));
+ }
+
+ private ChannelStateCheckpointWriter
createWriter(ChannelStateWriteResult result, CheckpointStateOutputStream
stream) throws Exception {
+ return new ChannelStateCheckpointWriter(1L, result, stream, new
ChannelStateSerializerImpl(), NO_OP_RUNNABLE);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
index 4ec4de0..51bd981 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
@@ -162,11 +162,11 @@ public class FsCheckpointStorageTest extends
AbstractFileCheckpointStorageTestBa
// create exclusive state
- CheckpointStateOutputStream exclusiveStream =
+ FsCheckpointStateOutputStream exclusiveStream =
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
exclusiveStream.write(42);
- exclusiveStream.flush();
+ exclusiveStream.flushToFile();
StreamStateHandle exclusiveHandle =
exclusiveStream.closeAndGetHandle();
assertEquals(1,
fs.listStatus(storageLocation.getCheckpointDirectory()).length);
@@ -174,11 +174,11 @@ public class FsCheckpointStorageTest extends
AbstractFileCheckpointStorageTestBa
// create shared state
- CheckpointStateOutputStream sharedStream =
+ FsCheckpointStateOutputStream sharedStream =
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
sharedStream.write(42);
- sharedStream.flush();
+ sharedStream.flushToFile();
StreamStateHandle sharedHandle =
sharedStream.closeAndGetHandle();
assertEquals(1,
fs.listStatus(storageLocation.getCheckpointDirectory()).length);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
index 1fa5cc0..88fea13 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
@@ -28,6 +28,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
import static org.hamcrest.Matchers.instanceOf;
@@ -58,7 +59,7 @@ public class FsCheckpointStreamFactoryTest {
@Test
public void testExclusiveStateHasRelativePathHandles() throws
IOException {
- final FsCheckpointStreamFactory factory =
createFactory(FileSystem.getLocalFileSystem());
+ final FsCheckpointStreamFactory factory =
createFactory(FileSystem.getLocalFileSystem(), 0);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream
stream =
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
@@ -71,7 +72,7 @@ public class FsCheckpointStreamFactoryTest {
@Test
public void testSharedStateHasAbsolutePathHandles() throws IOException {
- final FsCheckpointStreamFactory factory =
createFactory(FileSystem.getLocalFileSystem());
+ final FsCheckpointStreamFactory factory =
createFactory(FileSystem.getLocalFileSystem(), 0);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream
stream =
factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
@@ -85,7 +86,7 @@ public class FsCheckpointStreamFactoryTest {
@Test
public void testEntropyMakesExclusiveStateAbsolutePaths() throws
IOException{
- final FsCheckpointStreamFactory factory = createFactory(new
FsStateBackendEntropyTest.TestEntropyAwareFs());
+ final FsCheckpointStreamFactory factory = createFactory(new
FsStateBackendEntropyTest.TestEntropyAwareFs(), 0);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream
stream =
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
@@ -97,6 +98,26 @@ public class FsCheckpointStreamFactoryTest {
assertPathsEqual(exclusiveStateDir, ((FileStateHandle)
handle).getFilePath().getParent());
}
+ @Test
+ public void testFlushUnderThreshold() throws IOException {
+ flushAndVerify(10, 10, true);
+ }
+
+ @Test
+ public void testFlushAboveThreshold() throws IOException {
+ flushAndVerify(10, 11, false);
+ }
+
+ private void flushAndVerify(int minFileSize, int bytesToFlush, boolean
expectEmpty) throws IOException {
+ FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+ createFactory(new
FsStateBackendEntropyTest.TestEntropyAwareFs(), minFileSize)
+
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+
+ stream.write(new byte[bytesToFlush], 0, bytesToFlush);
+ stream.flush();
+ assertEquals(expectEmpty ? 0 : 1, new
File(exclusiveStateDir.toUri()).listFiles().length);
+ }
+
//
------------------------------------------------------------------------
// test utils
//
------------------------------------------------------------------------
@@ -107,7 +128,7 @@ public class FsCheckpointStreamFactoryTest {
assertEquals(reNormalizedExpected, reNormalizedActual);
}
- private FsCheckpointStreamFactory createFactory(FileSystem fs) {
- return new FsCheckpointStreamFactory(fs, exclusiveStateDir,
sharedStateDir, 0, 4096);
+ private FsCheckpointStreamFactory createFactory(FileSystem fs, int
fileSizeThreshold) {
+ return new FsCheckpointStreamFactory(fs, exclusiveStateDir,
sharedStateDir, fileSizeThreshold, 4096);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index 72ed803..3b4a9e8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -52,13 +52,14 @@ public class FsStateBackendEntropyTest {
@Test
public void testEntropyInjection() throws Exception {
+ final int fileSizeThreshold = 1024;
final FileSystem fs = new TestEntropyAwareFs();
final Path checkpointDir = new
Path(Path.fromLocalFile(tmp.newFolder()), ENTROPY_MARKER + "/checkpoints");
final String checkpointDirStr = checkpointDir.toString();
final FsCheckpointStorage storage = new FsCheckpointStorage(
- fs, checkpointDir, null, new JobID(), 1024,
4096);
+ fs, checkpointDir, null, new JobID(),
fileSizeThreshold, 4096);
storage.initializeBaseLocations();
final FsCheckpointStorageLocation location =
(FsCheckpointStorageLocation)
@@ -71,7 +72,7 @@ public class FsStateBackendEntropyTest {
// check entropy in task-owned state
try (CheckpointStateOutputStream stream =
storage.createTaskOwnedStateStream()) {
- stream.flush();
+ stream.write(new byte[fileSizeThreshold + 1], 0,
fileSizeThreshold + 1);
FileStateHandle handle = (FileStateHandle)
stream.closeAndGetHandle();
assertNotNull(handle);
@@ -82,8 +83,8 @@ public class FsStateBackendEntropyTest {
// check entropy in the exclusive/shared state
try (CheckpointStateOutputStream stream =
location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+ stream.write(new byte[fileSizeThreshold + 1], 0,
fileSizeThreshold + 1);
- stream.flush();
FileStateHandle handle = (FileStateHandle)
stream.closeAndGetHandle();
assertNotNull(handle);