This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 25bbe9fda [core] Close writer after commit identifier of snapshot is
strictly larger than last modified identifier (#3499)
25bbe9fda is described below
commit 25bbe9fdabfaa441157acab3d4153e69bf66b0cc
Author: tsreaper <[email protected]>
AuthorDate: Tue Jun 11 14:46:39 2024 +0800
[core] Close writer after commit identifier of snapshot is strictly larger
than last modified identifier (#3499)
---
.../paimon/operation/AbstractFileStoreWrite.java | 7 +++--
.../apache/paimon/table/sink/TableWriteTest.java | 34 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index e4ef21b55..193c346b8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -213,10 +213,13 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
result.add(committable);
if (committable.isEmpty()) {
- // Condition 1: There is no more record waiting to be
committed.
+ // Condition 1: There is no more record waiting to be
committed. Note that the
+ // condition is < (instead of <=), because each commit
identifier may have
+ // multiple snapshots. We must make sure all snapshots of
this identifier are
+ // committed.
// Condition 2: No compaction is in progress. That is, no
more changelog will be
// produced.
- if (writerContainer.lastModifiedCommitIdentifier <=
latestCommittedIdentifier
+ if (writerContainer.lastModifiedCommitIdentifier <
latestCommittedIdentifier
&& !writerContainer.writer.isCompacting()) {
// Clear writer if no update, and if its latest
modification has committed.
//
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index a97ed9c54..e49e802e0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -263,6 +263,40 @@ public class TableWriteTest {
assertThat(streamingRead(scan, read, latestSnapshotId)).hasSize(2);
}
+ @Test
+ public void testWaitAllSnapshotsOfSpecificIdentifier() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 1);
+
+ FileStoreTable table = createFileStoreTable(conf);
+ TableWriteImpl<?> write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 1, 10L));
+ write.write(GenericRow.of(1, 2, 20L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(GenericRow.of(1, 2, 21L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // we use the same commitIdentifier to mimic a long pause between
committing the APPEND
+ // snapshot and the COMPACT snapshot
+ write.compact(partition(1), 0, true);
+ List<CommitMessage> message1 = write.prepareCommit(true, 1);
+ // nothing is written, however message1 is not committed, so writer
should be kept
+ List<CommitMessage> message2 = write.prepareCommit(false, 2);
+
+ write.write(GenericRow.of(1, 3, 30L));
+ write.compact(partition(1), 0, true);
+ commit.commit(1, message1);
+ commit.commit(2, message2);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ write.close();
+ commit.close();
+ }
+
private BinaryRow partition(int x) {
BinaryRow partition = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(partition);