Repository: flink Updated Branches: refs/heads/master d5b97b07b -> 2b31ec93a
[FLINK-4843] Test for FsCheckpointStateOutputStream::getPos() This closes #2646. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b31ec93 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b31ec93 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b31ec93 Branch: refs/heads/master Commit: 2b31ec93a6245de2e30a6314a12fdbdcfb585440 Parents: d5b97b0 Author: Stefan Richter <[email protected]> Authored: Thu Oct 13 12:05:55 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Mon Oct 24 11:11:59 2016 +0200 ---------------------------------------------------------------------- .../FsCheckpointStateOutputStreamTest.java | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b31ec93/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java index d484f2e..00d8ca8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.junit.Assert; import org.junit.Test; import java.io.DataInputStream; @@ -75,6 +76,33 @@ public class FsCheckpointStateOutputStreamTest { runTest(16678, 4096, 0, true); } + @Test + public void testGetPos() throws Exception { + FsCheckpointStreamFactory.CheckpointStateOutputStream stream = + new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17); + + for (int i = 0; i < 64; ++i) { + Assert.assertEquals(i, stream.getPos()); + stream.write(0x42); + } + + stream.closeAndGetHandle(); + + // ---------------------------------------------------- + + stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17); + + byte[] data = "testme!".getBytes(); + + for (int i = 0; i < 7; ++i) { + Assert.assertEquals(i * (1 + data.length), stream.getPos()); + stream.write(0x42); + stream.write(data); + } + + stream.closeAndGetHandle(); + } + private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception { FsCheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
