This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit d4e2fb19c1f0e48d2a1b3a8f5279ce34168628aa
Author: tsreaper <[email protected]>
AuthorDate: Tue Jun 11 14:46:39 2024 +0800

    [core] Close writer after commit identifier of snapshot is strictly larger 
than last modified identifier (#3499)
---
 .../paimon/operation/AbstractFileStoreWrite.java   |  7 +++--
 .../apache/paimon/table/sink/TableWriteTest.java   | 34 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index e4ef21b55..193c346b8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -213,10 +213,13 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                 result.add(committable);
 
                 if (committable.isEmpty()) {
-                    // Condition 1: There is no more record waiting to be 
committed.
+                    // Condition 1: There is no more record waiting to be 
committed. Note that the
+                    // condition is < (instead of <=), because each commit 
identifier may have
+                    // multiple snapshots. We must make sure all snapshots of 
this identifier are
+                    // committed.
                     // Condition 2: No compaction is in progress. That is, no 
more changelog will be
                     // produced.
-                    if (writerContainer.lastModifiedCommitIdentifier <= 
latestCommittedIdentifier
+                    if (writerContainer.lastModifiedCommitIdentifier < 
latestCommittedIdentifier
                             && !writerContainer.writer.isCompacting()) {
                         // Clear writer if no update, and if its latest 
modification has committed.
                         //
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index a97ed9c54..e49e802e0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -263,6 +263,40 @@ public class TableWriteTest {
         assertThat(streamingRead(scan, read, latestSnapshotId)).hasSize(2);
     }
 
+    @Test
+    public void testWaitAllSnapshotsOfSpecificIdentifier() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 1);
+
+        FileStoreTable table = createFileStoreTable(conf);
+        TableWriteImpl<?> write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 1, 10L));
+        write.write(GenericRow.of(1, 2, 20L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.write(GenericRow.of(1, 2, 21L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // we use the same commitIdentifier to mimic a long pause between 
committing the APPEND
+        // snapshot and the COMPACT snapshot
+        write.compact(partition(1), 0, true);
+        List<CommitMessage> message1 = write.prepareCommit(true, 1);
+        // nothing is written, however message1 is not committed, so writer 
should be kept
+        List<CommitMessage> message2 = write.prepareCommit(false, 2);
+
+        write.write(GenericRow.of(1, 3, 30L));
+        write.compact(partition(1), 0, true);
+        commit.commit(1, message1);
+        commit.commit(2, message2);
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        write.close();
+        commit.close();
+    }
+
     private BinaryRow partition(int x) {
         BinaryRow partition = new BinaryRow(1);
         BinaryRowWriter writer = new BinaryRowWriter(partition);

Reply via email to