This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0b3509d55 [core] `commit.force-create-snapshot` to force a snapshot to
be generated when committing (#2964)
0b3509d55 is described below
commit 0b3509d553fcd973463416d4b6c0842ae196382c
Author: big face cat <[email protected]>
AuthorDate: Thu Mar 7 22:07:12 2024 +0800
[core] `commit.force-create-snapshot` to force a snapshot to be generated
when committing (#2964)
---
.../shortcodes/generated/core_configuration.html | 6 ++++++
.../src/main/java/org/apache/paimon/CoreOptions.java | 10 ++++++++++
.../apache/paimon/table/AbstractFileStoreTable.java | 5 +++--
.../org/apache/paimon/table/sink/TableCommitImpl.java | 8 ++++++--
.../paimon/flink/sink/CommitterOperatorTest.java | 18 ++++++++++++++++++
5 files changed, 43 insertions(+), 4 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index a123d3a77..a1bf28c62 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -80,6 +80,12 @@ under the License.
<td>Boolean</td>
<td>Whether to force a compaction before commit.</td>
</tr>
+ <tr>
+ <td><h5>commit.force-create-snapshot</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to force create snapshot on commit.</td>
+ </tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 201525ab0..f8eec84fb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1069,6 +1069,12 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.ofMebiBytes(10))
.withDescription("The threshold for read file async.");
+ public static final ConfigOption<Boolean> COMMIT_FORCE_CREATE_SNAPSHOT =
+ key("commit.force-create-snapshot")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to force create snapshot on
commit.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1563,6 +1569,10 @@ public class CoreOptions implements Serializable {
return options.get(SINK_WATERMARK_TIME_ZONE);
}
+ public boolean forceCreatingSnapshot() {
+ return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
+ }
+
public Map<String, String> getFieldDefaultValues() {
Map<String, String> defaultValues = new HashMap<>();
String fieldPrefix = FIELDS_PREFIX + ".";
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 186f8ed3b..c2756fbfc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -325,8 +325,9 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
- options.snapshotExpireExecutionMode(),
- name());
+ coreOptions().snapshotExpireExecutionMode(),
+ name(),
+ coreOptions().forceCreatingSnapshot());
}
private List<CommitCallback> createCommitCallbacks() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index fb7ab7974..ab01943bb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -88,6 +88,7 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable private Map<String, String> overwritePartition = null;
private boolean batchCommitted = false;
+ private final boolean forceCreatingSnapshot;
public TableCommitImpl(
FileStoreCommit commit,
@@ -99,7 +100,8 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode,
- String tableName) {
+ String tableName,
+ boolean forceCreatingSnapshot) {
commit.withLock(lock);
if (partitionExpire != null) {
partitionExpire.withLock(lock);
@@ -124,10 +126,12 @@ public class TableCommitImpl implements InnerTableCommit {
this.expireError = new AtomicReference<>(null);
this.tableName = tableName;
+ this.forceCreatingSnapshot = forceCreatingSnapshot;
}
public boolean forceCreatingSnapshot() {
- return tagAutoCreation != null &&
tagAutoCreation.forceCreatingSnapshot();
+ return this.forceCreatingSnapshot
+ || (tagAutoCreation != null &&
tagAutoCreation.forceCreatingSnapshot());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 44ceab84b..0a3a8fae7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -483,6 +483,24 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
assertThat(snapshot).isNull();
}
+ @Test
+ public void testForceCreateSnapshotCommit() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options ->
+ options.set(
+
CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true"));
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createRecoverableTestHarness(table);
+ testHarness.open();
+
+ testHarness.snapshot(1, 1);
+ testHarness.notifyOfCompletedCheckpoint(1);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+ }
+
@Test
public void testEmptyCommitWithProcessTimeTag() throws Exception {
FileStoreTable table =