This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d4e2fb19c1f0e48d2a1b3a8f5279ce34168628aa Author: tsreaper <[email protected]> AuthorDate: Tue Jun 11 14:46:39 2024 +0800 [core] Close writer after commit identifier of snapshot is strictly larger than last modified identifier (#3499) --- .../paimon/operation/AbstractFileStoreWrite.java | 7 +++-- .../apache/paimon/table/sink/TableWriteTest.java | 34 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index e4ef21b55..193c346b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -213,10 +213,13 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { result.add(committable); if (committable.isEmpty()) { - // Condition 1: There is no more record waiting to be committed. + // Condition 1: There is no more record waiting to be committed. Note that the + // condition is < (instead of <=), because each commit identifier may have + // multiple snapshots. We must make sure all snapshots of this identifier are + // committed. // Condition 2: No compaction is in progress. That is, no more changelog will be // produced. - if (writerContainer.lastModifiedCommitIdentifier <= latestCommittedIdentifier + if (writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier && !writerContainer.writer.isCompacting()) { // Clear writer if no update, and if its latest modification has committed. // diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java index a97ed9c54..e49e802e0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java @@ -263,6 +263,40 @@ public class TableWriteTest { assertThat(streamingRead(scan, read, latestSnapshotId)).hasSize(2); } + @Test + public void testWaitAllSnapshotsOfSpecificIdentifier() throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.BUCKET, 1); + + FileStoreTable table = createFileStoreTable(conf); + TableWriteImpl<?> write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 1, 10L)); + write.write(GenericRow.of(1, 2, 20L)); + commit.commit(0, write.prepareCommit(false, 0)); + + write.write(GenericRow.of(1, 2, 21L)); + commit.commit(1, write.prepareCommit(false, 1)); + + // we use the same commitIdentifier to mimic a long pause between committing the APPEND + // snapshot and the COMPACT snapshot + write.compact(partition(1), 0, true); + List<CommitMessage> message1 = write.prepareCommit(true, 1); + // nothing is written, however message1 is not committed, so writer should be kept + List<CommitMessage> message2 = write.prepareCommit(false, 2); + + write.write(GenericRow.of(1, 3, 30L)); + write.compact(partition(1), 0, true); + commit.commit(1, message1); + commit.commit(2, message2); + commit.commit(3, write.prepareCommit(true, 3)); + + write.close(); + commit.close(); + } + private BinaryRow partition(int x) { BinaryRow partition = new BinaryRow(1); BinaryRowWriter writer = new BinaryRowWriter(partition);
