AlexYinHan commented on code in PR #23514: URL: https://github.com/apache/flink/pull/23514#discussion_r1366803327
########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java: ########## @@ -249,6 +261,104 @@ void testSizeStatsInPhysicalFile() throws IOException { } } + @Test + public void testReusedFileWriting() throws Exception { + long checkpointId = 1; + int streamNum = 10; + int perStreamWriteNum = 128; + + // write random bytes and then read them from the file + byte[] bytes = new byte[streamNum * perStreamWriteNum]; + Random rd = new Random(); + rd.nextBytes(bytes); + int byteIndex = 0; + + SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum]; + try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + // repeatedly get-write-close streams + for (int i = 0; i < streamNum; i++) { + FileMergingCheckpointStateOutputStream stream = + fmsm.createCheckpointStateOutputStream( + subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE); + try { + closeableRegistry.registerCloseable(stream); + for (int j = 0; j < perStreamWriteNum; j++) { + stream.write(bytes[byteIndex++]); + } + handles[i] = stream.closeAndGetHandle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // assert the streams writes to the same file correctly + byteIndex = 0; + Path filePath = null; + for (SegmentFileStateHandle handle : handles) { + // check file path + Path thisFilePath = handle.getFilePath(); + Assertions.assertTrue(filePath == null || filePath.equals(thisFilePath)); + filePath = thisFilePath; + // check file content + FSDataInputStream is = handle.openInputStream(); + + closeableRegistry.registerCloseable(is); + int readValue; + + while ((readValue = is.read()) != -1) { + assertThat((byte) readValue).isEqualTo(bytes[byteIndex++]); + } + } + } + } + + @Test + public void testConcurrentWriting() throws Exception { Review Comment: It is guaranteed by ```FileMergingSnapshotManager```. Only after the previous "Stream" is closed, will it create another "Stream" reusing the same physical file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org