This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 71f6a39f79 [core] Introduce expireForEmptyCommit to InnerTableCommit 
(#6013)
71f6a39f79 is described below

commit 71f6a39f79e087db54cb6de49a39f796fc1898be
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Aug 1 18:41:34 2025 +0800

    [core] Introduce expireForEmptyCommit to InnerTableCommit (#6013)
---
 .../apache/paimon/operation/FileStoreCommit.java   |  4 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  6 +-
 .../apache/paimon/table/sink/InnerTableCommit.java |  2 +
 .../apache/paimon/table/sink/TableCommitImpl.java  | 73 +++++++++++++++-------
 .../apache/paimon/table/SimpleTableTestBase.java   |  2 +-
 .../apache/paimon/table/sink/TableCommitTest.java  | 59 ++++++++++++++++-
 6 files changed, 116 insertions(+), 30 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 1ab4ebe968..4156ce0a83 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -40,7 +40,7 @@ public interface FileStoreCommit extends AutoCloseable {
     List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables);
 
     /** Commit from manifest committable with checkAppendFiles. */
-    void commit(ManifestCommittable committable, boolean checkAppendFiles);
+    int commit(ManifestCommittable committable, boolean checkAppendFiles);
 
     /**
      * Overwrite from manifest committable and partition.
@@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable {
      *     note that this partition does not necessarily equal to the 
partitions of the newly added
      *     key-values. This is just the partition to be cleaned up.
      */
-    void overwrite(
+    int overwrite(
             Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 54888abe72..c82776a916 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -277,7 +277,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     @Override
-    public void commit(ManifestCommittable committable, boolean 
checkAppendFiles) {
+    public int commit(ManifestCommittable committable, boolean 
checkAppendFiles) {
         LOG.info(
                 "Ready to commit to table {}, number of commit messages: {}",
                 tableName,
@@ -399,6 +399,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                         attempts);
             }
         }
+        return generatedSnapshot;
     }
 
     private void reportCommit(
@@ -422,7 +423,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     @Override
-    public void overwrite(
+    public int overwrite(
             Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties) {
@@ -551,6 +552,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                         attempts);
             }
         }
+        return generatedSnapshot;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 1544375569..df6241086a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -44,6 +44,8 @@ public interface InnerTableCommit extends StreamTableCommit, 
BatchTableCommit {
      */
     InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
 
+    InnerTableCommit expireForEmptyCommit(boolean expireForEmptyCommit);
+
     @Override
     InnerTableCommit withMetricRegistry(MetricRegistry registry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e262c5f6bf..40c69289a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -30,7 +30,9 @@ import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.tag.TagAutoCreation;
 import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.tag.TagTimeExpire;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.PathFactory;
@@ -80,14 +82,15 @@ public class TableCommitImpl implements InnerTableCommit {
     @Nullable private final Duration consumerExpireTime;
     private final ConsumerManager consumerManager;
 
-    private final ExecutorService expireMainExecutor;
-    private final AtomicReference<Throwable> expireError;
+    private final ExecutorService maintainExecutor;
+    private final AtomicReference<Throwable> maintainError;
 
     private final String tableName;
 
     @Nullable private Map<String, String> overwritePartition = null;
     private boolean batchCommitted = false;
     private final boolean forceCreatingSnapshot;
+    private boolean expireForEmptyCommit = true;
 
     public TableCommitImpl(
             FileStoreCommit commit,
@@ -111,13 +114,13 @@ public class TableCommitImpl implements InnerTableCommit {
         this.consumerExpireTime = consumerExpireTime;
         this.consumerManager = consumerManager;
 
-        this.expireMainExecutor =
+        this.maintainExecutor =
                 expireExecutionMode == ExpireExecutionMode.SYNC
                         ? MoreExecutors.newDirectExecutorService()
                         : Executors.newSingleThreadExecutor(
                                 new ExecutorThreadFactory(
                                         Thread.currentThread().getName() + 
"expire-main-thread"));
-        this.expireError = new AtomicReference<>(null);
+        this.maintainError = new AtomicReference<>(null);
 
         this.tableName = tableName;
         this.forceCreatingSnapshot = forceCreatingSnapshot;
@@ -147,6 +150,12 @@ public class TableCommitImpl implements InnerTableCommit {
         return this;
     }
 
+    @Override
+    public TableCommitImpl expireForEmptyCommit(boolean expireForEmptyCommit) {
+        this.expireForEmptyCommit = expireForEmptyCommit;
+        return this;
+    }
+
     @Override
     public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
         commit.withMetrics(new CommitMetrics(registry, tableName));
@@ -213,11 +222,15 @@ public class TableCommitImpl implements InnerTableCommit {
 
     public void commitMultiple(List<ManifestCommittable> committables, boolean 
checkAppendFiles) {
         if (overwritePartition == null) {
+            int newSnapshots = 0;
             for (ManifestCommittable committable : committables) {
-                commit.commit(committable, checkAppendFiles);
+                newSnapshots += commit.commit(committable, checkAppendFiles);
             }
             if (!committables.isEmpty()) {
-                expire(committables.get(committables.size() - 1).identifier(), 
expireMainExecutor);
+                maintain(
+                        committables.get(committables.size() - 1).identifier(),
+                        maintainExecutor,
+                        newSnapshots > 0 || expireForEmptyCommit);
             }
         } else {
             ManifestCommittable committable;
@@ -233,8 +246,12 @@ public class TableCommitImpl implements InnerTableCommit {
                 // TODO maybe it can be produced by CommitterOperator
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
-            commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
-            expire(committable.identifier(), expireMainExecutor);
+            int newSnapshots =
+                    commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
+            maintain(
+                    committable.identifier(),
+                    maintainExecutor,
+                    newSnapshots > 0 || expireForEmptyCommit);
         }
     }
 
@@ -315,36 +332,46 @@ public class TableCommitImpl implements InnerTableCommit {
         }
     }
 
-    private void expire(long partitionExpireIdentifier, ExecutorService 
executor) {
-        if (expireError.get() != null) {
-            throw new RuntimeException(expireError.get());
+    private void maintain(long identifier, ExecutorService executor, boolean 
doExpire) {
+        if (maintainError.get() != null) {
+            throw new RuntimeException(maintainError.get());
         }
 
         executor.execute(
                 () -> {
                     try {
-                        expire(partitionExpireIdentifier);
+                        maintain(identifier, doExpire);
                     } catch (Throwable t) {
-                        LOG.error("Executing expire encountered an error.", t);
-                        expireError.compareAndSet(null, t);
+                        LOG.error("Executing maintain encountered an error.", 
t);
+                        maintainError.compareAndSet(null, t);
                     }
                 });
     }
 
-    private void expire(long partitionExpireIdentifier) {
+    private void maintain(long identifier, boolean doExpire) {
         // expire consumer first to avoid preventing snapshot expiration
-        if (consumerExpireTime != null) {
+        if (doExpire && consumerExpireTime != null) {
             
consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
         }
 
-        expireSnapshots();
+        if (doExpire && expireSnapshots != null) {
+            expireSnapshots.run();
+        }
 
-        if (partitionExpire != null) {
-            partitionExpire.expire(partitionExpireIdentifier);
+        if (doExpire && partitionExpire != null) {
+            partitionExpire.expire(identifier);
         }
 
         if (tagAutoManager != null) {
-            tagAutoManager.run();
+            TagAutoCreation tagAutoCreation = 
tagAutoManager.getTagAutoCreation();
+            if (tagAutoCreation != null) {
+                tagAutoCreation.run();
+            }
+
+            TagTimeExpire tagTimeExpire = tagAutoManager.getTagTimeExpire();
+            if (doExpire && tagTimeExpire != null) {
+                tagTimeExpire.expire();
+            }
         }
     }
 
@@ -357,7 +384,7 @@ public class TableCommitImpl implements InnerTableCommit {
     @Override
     public void close() throws Exception {
         commit.close();
-        expireMainExecutor.shutdownNow();
+        maintainExecutor.shutdownNow();
     }
 
     @Override
@@ -366,7 +393,7 @@ public class TableCommitImpl implements InnerTableCommit {
     }
 
     @VisibleForTesting
-    public ExecutorService getExpireMainExecutor() {
-        return expireMainExecutor;
+    public ExecutorService getMaintainExecutor() {
+        return maintainExecutor;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index cd11a2ddfd..457c1dcb16 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1345,7 +1345,7 @@ public abstract class SimpleTableTestBase {
         options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2");
 
         TableCommitImpl commit = table.copy(options).newCommit(commitUser);
-        ExecutorService executor = commit.getExpireMainExecutor();
+        ExecutorService executor = commit.getMaintainExecutor();
         CountDownLatch before = new CountDownLatch(1);
         CountDownLatch after = new CountDownLatch(1);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index f2dee4743f..8c81ec7c26 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -41,6 +41,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ExceptionUtils;
 import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import static java.util.Collections.singletonMap;
 import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -251,10 +253,10 @@ public class TableCommitTest {
         }
 
         // commit 0, fine, it will be filtered
-        commit.filterAndCommit(Collections.singletonMap(0L, messages0));
+        commit.filterAndCommit(singletonMap(0L, messages0));
 
         // commit 1, exception now.
-        assertThatThrownBy(() -> 
commit.filterAndCommit(Collections.singletonMap(1L, messages1)))
+        assertThatThrownBy(() -> commit.filterAndCommit(singletonMap(1L, 
messages1)))
                 .hasMessageContaining(
                         "Cannot recover from this checkpoint because some 
files in the"
                                 + " snapshot that need to be resubmitted have 
been deleted");
@@ -381,4 +383,57 @@ public class TableCommitTest {
                 .hasMessageContaining(
                         "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
     }
+
+    @Test
+    public void testExpireForEmptyCommit() throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+        SnapshotManager snapshotManager = table.snapshotManager();
+        String user1 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(user1);
+        TableCommitImpl commit = table.copy(singletonMap("write-only", 
"true")).newCommit(user1);
+
+        for (int i = 0; i < 5; i++) {
+            write.write(GenericRow.of(i, (long) i));
+            commit.commit(i, write.prepareCommit(true, i));
+        }
+        assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(1);
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
+
+        // expire for empty commit: false
+        commit = 
table.newCommit(user1).ignoreEmptyCommit(true).expireForEmptyCommit(false);
+        commit.commit(7, write.prepareCommit(true, 7));
+        assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(1);
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
+
+        // expire for empty commit: default true
+        commit = table.newCommit(user1).ignoreEmptyCommit(true);
+        commit.commit(7, write.prepareCommit(true, 7));
+        assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5);
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
+    }
 }

Reply via email to