Repository: flink
Updated Branches:
  refs/heads/master d5b97b07b -> 2b31ec93a


[FLINK-4843] Test for FsCheckpointStateOutputStream::getPos()

This closes #2646.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b31ec93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b31ec93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b31ec93

Branch: refs/heads/master
Commit: 2b31ec93a6245de2e30a6314a12fdbdcfb585440
Parents: d5b97b0
Author: Stefan Richter <[email protected]>
Authored: Thu Oct 13 12:05:55 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Oct 24 11:11:59 2016 +0200

----------------------------------------------------------------------
 .../FsCheckpointStateOutputStreamTest.java      | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b31ec93/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index d484f2e..00d8ca8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.DataInputStream;
@@ -75,6 +76,33 @@ public class FsCheckpointStateOutputStreamTest {
                runTest(16678, 4096, 0, true);
        }
 
+       @Test
+       public void testGetPos() throws Exception {
+               FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+                               new 
FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, 
FileSystem.getLocalFileSystem(), 31, 17);
+
+               for (int i = 0; i < 64; ++i) {
+                       Assert.assertEquals(i, stream.getPos());
+                       stream.write(0x42);
+               }
+
+               stream.closeAndGetHandle();
+
+               // ----------------------------------------------------
+
+               stream = new 
FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, 
FileSystem.getLocalFileSystem(), 31, 17);
+
+               byte[] data = "testme!".getBytes();
+
+               for (int i = 0; i < 7; ++i) {
+                       Assert.assertEquals(i * (1 + data.length), 
stream.getPos());
+                       stream.write(0x42);
+                       stream.write(data);
+               }
+
+               stream.closeAndGetHandle();
+       }
+
        private void runTest(int numBytes, int bufferSize, int threshold, 
boolean expectFile) throws Exception {
                FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
                        new 
FsCheckpointStreamFactory.FsCheckpointStateOutputStream(

Reply via email to