AlexYinHan commented on code in PR #23514:
URL: https://github.com/apache/flink/pull/23514#discussion_r1366803327


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java:
##########
@@ -249,6 +261,104 @@ void testSizeStatsInPhysicalFile() throws IOException {
         }
     }
 
+    @Test
+    public void testReusedFileWriting() throws Exception {
+        long checkpointId = 1;
+        int streamNum = 10;
+        int perStreamWriteNum = 128;
+
+        // write random bytes and then read them from the file
+        byte[] bytes = new byte[streamNum * perStreamWriteNum];
+        Random rd = new Random();
+        rd.nextBytes(bytes);
+        int byteIndex = 0;
+
+        SegmentFileStateHandle[] handles = new 
SegmentFileStateHandle[streamNum];
+        try (FileMergingSnapshotManager fmsm = 
createFileMergingSnapshotManager(checkpointBaseDir);
+                CloseableRegistry closeableRegistry = new CloseableRegistry()) 
{
+
+            // repeatedly get-write-close streams
+            for (int i = 0; i < streamNum; i++) {
+                FileMergingCheckpointStateOutputStream stream =
+                        fmsm.createCheckpointStateOutputStream(
+                                subtaskKey1, checkpointId, 
CheckpointedStateScope.EXCLUSIVE);
+                try {
+                    closeableRegistry.registerCloseable(stream);
+                    for (int j = 0; j < perStreamWriteNum; j++) {
+                        stream.write(bytes[byteIndex++]);
+                    }
+                    handles[i] = stream.closeAndGetHandle();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            // assert the streams writes to the same file correctly
+            byteIndex = 0;
+            Path filePath = null;
+            for (SegmentFileStateHandle handle : handles) {
+                // check file path
+                Path thisFilePath = handle.getFilePath();
+                Assertions.assertTrue(filePath == null || 
filePath.equals(thisFilePath));
+                filePath = thisFilePath;
+                // check file content
+                FSDataInputStream is = handle.openInputStream();
+
+                closeableRegistry.registerCloseable(is);
+                int readValue;
+
+                while ((readValue = is.read()) != -1) {
+                    assertThat((byte) readValue).isEqualTo(bytes[byteIndex++]);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testConcurrentWriting() throws Exception {

Review Comment:
   It is guaranteed by ```FileMergingSnapshotManager```. Only after the 
previous "Stream" is closed, will it create another "Stream" reusing the same 
physical file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to