This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3509d55 [core] `commit.force-create-snapshot` to force a snapshot to 
be generated when committing (#2964)
0b3509d55 is described below

commit 0b3509d553fcd973463416d4b6c0842ae196382c
Author: big face cat <[email protected]>
AuthorDate: Thu Mar 7 22:07:12 2024 +0800

    [core] `commit.force-create-snapshot` to force a snapshot to be generated 
when committing (#2964)
---
 .../shortcodes/generated/core_configuration.html       |  6 ++++++
 .../src/main/java/org/apache/paimon/CoreOptions.java   | 10 ++++++++++
 .../apache/paimon/table/AbstractFileStoreTable.java    |  5 +++--
 .../org/apache/paimon/table/sink/TableCommitImpl.java  |  8 ++++++--
 .../paimon/flink/sink/CommitterOperatorTest.java       | 18 ++++++++++++++++++
 5 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a123d3a77..a1bf28c62 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -80,6 +80,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to force a compaction before commit.</td>
         </tr>
+        <tr>
+            <td><h5>commit.force-create-snapshot</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to force create snapshot on commit.</td>
+        </tr>
         <tr>
             <td><h5>compaction.max-size-amplification-percent</h5></td>
             <td style="word-wrap: break-word;">200</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 201525ab0..f8eec84fb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1069,6 +1069,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(MemorySize.ofMebiBytes(10))
                     .withDescription("The threshold for read file async.");
 
+    public static final ConfigOption<Boolean> COMMIT_FORCE_CREATE_SNAPSHOT =
+            key("commit.force-create-snapshot")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to force create snapshot on 
commit.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1563,6 +1569,10 @@ public class CoreOptions implements Serializable {
         return options.get(SINK_WATERMARK_TIME_ZONE);
     }
 
+    public boolean forceCreatingSnapshot() {
+        return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
+    }
+
     public Map<String, String> getFieldDefaultValues() {
         Map<String, String> defaultValues = new HashMap<>();
         String fieldPrefix = FIELDS_PREFIX + ".";
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 186f8ed3b..c2756fbfc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -325,8 +325,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path),
-                options.snapshotExpireExecutionMode(),
-                name());
+                coreOptions().snapshotExpireExecutionMode(),
+                name(),
+                coreOptions().forceCreatingSnapshot());
     }
 
     private List<CommitCallback> createCommitCallbacks() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index fb7ab7974..ab01943bb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -88,6 +88,7 @@ public class TableCommitImpl implements InnerTableCommit {
 
     @Nullable private Map<String, String> overwritePartition = null;
     private boolean batchCommitted = false;
+    private final boolean forceCreatingSnapshot;
 
     public TableCommitImpl(
             FileStoreCommit commit,
@@ -99,7 +100,8 @@ public class TableCommitImpl implements InnerTableCommit {
             @Nullable Duration consumerExpireTime,
             ConsumerManager consumerManager,
             ExpireExecutionMode expireExecutionMode,
-            String tableName) {
+            String tableName,
+            boolean forceCreatingSnapshot) {
         commit.withLock(lock);
         if (partitionExpire != null) {
             partitionExpire.withLock(lock);
@@ -124,10 +126,12 @@ public class TableCommitImpl implements InnerTableCommit {
         this.expireError = new AtomicReference<>(null);
 
         this.tableName = tableName;
+        this.forceCreatingSnapshot = forceCreatingSnapshot;
     }
 
     public boolean forceCreatingSnapshot() {
-        return tagAutoCreation != null && 
tagAutoCreation.forceCreatingSnapshot();
+        return this.forceCreatingSnapshot
+                || (tagAutoCreation != null && 
tagAutoCreation.forceCreatingSnapshot());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 44ceab84b..0a3a8fae7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -483,6 +483,24 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         assertThat(snapshot).isNull();
     }
 
+    @Test
+    public void testForceCreateSnapshotCommit() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options ->
+                                options.set(
+                                        
CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true"));
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createRecoverableTestHarness(table);
+        testHarness.open();
+
+        testHarness.snapshot(1, 1);
+        testHarness.notifyOfCompletedCheckpoint(1);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+    }
+
     @Test
     public void testEmptyCommitWithProcessTimeTag() throws Exception {
         FileStoreTable table =

Reply via email to