This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 71f6a39f79 [core] Introduce expireForEmptyCommit to InnerTableCommit (#6013) 71f6a39f79 is described below commit 71f6a39f79e087db54cb6de49a39f796fc1898be Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Aug 1 18:41:34 2025 +0800 [core] Introduce expireForEmptyCommit to InnerTableCommit (#6013) --- .../apache/paimon/operation/FileStoreCommit.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 6 +- .../apache/paimon/table/sink/InnerTableCommit.java | 2 + .../apache/paimon/table/sink/TableCommitImpl.java | 73 +++++++++++++++------- .../apache/paimon/table/SimpleTableTestBase.java | 2 +- .../apache/paimon/table/sink/TableCommitTest.java | 59 ++++++++++++++++- 6 files changed, 116 insertions(+), 30 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 1ab4ebe968..4156ce0a83 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -40,7 +40,7 @@ public interface FileStoreCommit extends AutoCloseable { List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables); /** Commit from manifest committable with checkAppendFiles. */ - void commit(ManifestCommittable committable, boolean checkAppendFiles); + int commit(ManifestCommittable committable, boolean checkAppendFiles); /** * Overwrite from manifest committable and partition. @@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable { * note that this partition does not necessarily equal to the partitions of the newly added * key-values. This is just the partition to be cleaned up. */ - void overwrite( + int overwrite( Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 54888abe72..c82776a916 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -277,7 +277,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { } @Override - public void commit(ManifestCommittable committable, boolean checkAppendFiles) { + public int commit(ManifestCommittable committable, boolean checkAppendFiles) { LOG.info( "Ready to commit to table {}, number of commit messages: {}", tableName, @@ -399,6 +399,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { attempts); } } + return generatedSnapshot; } private void reportCommit( @@ -422,7 +423,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { } @Override - public void overwrite( + public int overwrite( Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties) { @@ -551,6 +552,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { attempts); } } + return generatedSnapshot; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java index 1544375569..df6241086a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java @@ -44,6 +44,8 @@ public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit { */ InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit); + InnerTableCommit expireForEmptyCommit(boolean expireForEmptyCommit); + @Override InnerTableCommit withMetricRegistry(MetricRegistry registry); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index e262c5f6bf..40c69289a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -30,7 +30,9 @@ import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.stats.Statistics; +import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.tag.TagAutoManager; +import org.apache.paimon.tag.TagTimeExpire; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.PathFactory; @@ -80,14 +82,15 @@ public class TableCommitImpl implements InnerTableCommit { @Nullable private final Duration consumerExpireTime; private final ConsumerManager consumerManager; - private final ExecutorService expireMainExecutor; - private final AtomicReference<Throwable> expireError; + private final ExecutorService maintainExecutor; + private final AtomicReference<Throwable> maintainError; private final String tableName; @Nullable private Map<String, String> overwritePartition = null; private boolean batchCommitted = false; private final boolean forceCreatingSnapshot; + private boolean expireForEmptyCommit = true; public TableCommitImpl( FileStoreCommit commit, @@ -111,13 +114,13 @@ public class TableCommitImpl implements InnerTableCommit { this.consumerExpireTime = consumerExpireTime; this.consumerManager = consumerManager; - this.expireMainExecutor = + this.maintainExecutor = expireExecutionMode == ExpireExecutionMode.SYNC ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor( new ExecutorThreadFactory( Thread.currentThread().getName() + "expire-main-thread")); - this.expireError = new AtomicReference<>(null); + this.maintainError = new AtomicReference<>(null); this.tableName = tableName; this.forceCreatingSnapshot = forceCreatingSnapshot; @@ -147,6 +150,12 @@ public class TableCommitImpl implements InnerTableCommit { return this; } + @Override + public TableCommitImpl expireForEmptyCommit(boolean expireForEmptyCommit) { + this.expireForEmptyCommit = expireForEmptyCommit; + return this; + } + @Override public InnerTableCommit withMetricRegistry(MetricRegistry registry) { commit.withMetrics(new CommitMetrics(registry, tableName)); @@ -213,11 +222,15 @@ public class TableCommitImpl implements InnerTableCommit { public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) { if (overwritePartition == null) { + int newSnapshots = 0; for (ManifestCommittable committable : committables) { - commit.commit(committable, checkAppendFiles); + newSnapshots += commit.commit(committable, checkAppendFiles); } if (!committables.isEmpty()) { - expire(committables.get(committables.size() - 1).identifier(), expireMainExecutor); + maintain( + committables.get(committables.size() - 1).identifier(), + maintainExecutor, + newSnapshots > 0 || expireForEmptyCommit); } } else { ManifestCommittable committable; @@ -233,8 +246,12 @@ public class TableCommitImpl implements InnerTableCommit { // TODO maybe it can be produced by CommitterOperator committable = new ManifestCommittable(Long.MAX_VALUE); } - commit.overwrite(overwritePartition, committable, Collections.emptyMap()); - expire(committable.identifier(), expireMainExecutor); + int newSnapshots = + commit.overwrite(overwritePartition, committable, Collections.emptyMap()); + maintain( + committable.identifier(), + maintainExecutor, + newSnapshots > 0 || expireForEmptyCommit); } } @@ -315,36 +332,46 @@ public class TableCommitImpl implements InnerTableCommit { } } - private void expire(long partitionExpireIdentifier, ExecutorService executor) { - if (expireError.get() != null) { - throw new RuntimeException(expireError.get()); + private void maintain(long identifier, ExecutorService executor, boolean doExpire) { + if (maintainError.get() != null) { + throw new RuntimeException(maintainError.get()); } executor.execute( () -> { try { - expire(partitionExpireIdentifier); + maintain(identifier, doExpire); } catch (Throwable t) { - LOG.error("Executing expire encountered an error.", t); - expireError.compareAndSet(null, t); + LOG.error("Executing maintain encountered an error.", t); + maintainError.compareAndSet(null, t); } }); } - private void expire(long partitionExpireIdentifier) { + private void maintain(long identifier, boolean doExpire) { // expire consumer first to avoid preventing snapshot expiration - if (consumerExpireTime != null) { + if (doExpire && consumerExpireTime != null) { consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime)); } - expireSnapshots(); + if (doExpire && expireSnapshots != null) { + expireSnapshots.run(); + } - if (partitionExpire != null) { - partitionExpire.expire(partitionExpireIdentifier); + if (doExpire && partitionExpire != null) { + partitionExpire.expire(identifier); } if (tagAutoManager != null) { - tagAutoManager.run(); + TagAutoCreation tagAutoCreation = tagAutoManager.getTagAutoCreation(); + if (tagAutoCreation != null) { + tagAutoCreation.run(); + } + + TagTimeExpire tagTimeExpire = tagAutoManager.getTagTimeExpire(); + if (doExpire && tagTimeExpire != null) { + tagTimeExpire.expire(); + } } } @@ -357,7 +384,7 @@ public class TableCommitImpl implements InnerTableCommit { @Override public void close() throws Exception { commit.close(); - expireMainExecutor.shutdownNow(); + maintainExecutor.shutdownNow(); } @Override @@ -366,7 +393,7 @@ public class TableCommitImpl implements InnerTableCommit { } @VisibleForTesting - public ExecutorService getExpireMainExecutor() { - return expireMainExecutor; + public ExecutorService getMaintainExecutor() { + return maintainExecutor; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java index cd11a2ddfd..457c1dcb16 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java @@ -1345,7 +1345,7 @@ public abstract class SimpleTableTestBase { options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2"); TableCommitImpl commit = table.copy(options).newCommit(commitUser); - ExecutorService executor = commit.getExpireMainExecutor(); + ExecutorService executor = commit.getMaintainExecutor(); CountDownLatch before = new CountDownLatch(1); CountDownLatch after = new CountDownLatch(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index f2dee4743f..8c81ec7c26 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ExceptionUtils; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.LongStream; +import static java.util.Collections.singletonMap; import static org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -251,10 +253,10 @@ public class TableCommitTest { } // commit 0, fine, it will be filtered - commit.filterAndCommit(Collections.singletonMap(0L, messages0)); + commit.filterAndCommit(singletonMap(0L, messages0)); // commit 1, exception now. - assertThatThrownBy(() -> commit.filterAndCommit(Collections.singletonMap(1L, messages1))) + assertThatThrownBy(() -> commit.filterAndCommit(singletonMap(1L, messages1))) .hasMessageContaining( "Cannot recover from this checkpoint because some files in the" + " snapshot that need to be resubmitted have been deleted"); @@ -381,4 +383,57 @@ public class TableCommitTest { .hasMessageContaining( "Giving up committing as commit.strict-mode.last-safe-snapshot is set."); } + + @Test + public void testExpireForEmptyCommit() throws Exception { + String path = tempDir.toString(); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT()}, + new String[] {"k", "v"}); + + Options options = new Options(); + options.set(CoreOptions.PATH, path); + options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), new Path(path)), + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.singletonList("k"), + options.toMap(), + "")); + FileStoreTable table = + FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(path), + tableSchema, + CatalogEnvironment.empty()); + SnapshotManager snapshotManager = table.snapshotManager(); + String user1 = UUID.randomUUID().toString(); + TableWriteImpl<?> write = table.newWrite(user1); + TableCommitImpl commit = table.copy(singletonMap("write-only", "true")).newCommit(user1); + + for (int i = 0; i < 5; i++) { + write.write(GenericRow.of(i, (long) i)); + commit.commit(i, write.prepareCommit(true, i)); + } + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(1); + assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6); + + // expire for empty commit: false + commit = table.newCommit(user1).ignoreEmptyCommit(true).expireForEmptyCommit(false); + commit.commit(7, write.prepareCommit(true, 7)); + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(1); + assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6); + + // expire for empty commit: default true + commit = table.newCommit(user1).ignoreEmptyCommit(true); + commit.commit(7, write.prepareCommit(true, 7)); + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5); + assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6); + } }