This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6f46484668 [core] Support enable and disable tag time expiration
(#6884)
6f46484668 is described below
commit 6f464846680b8b4ae815b6a27dcefb18f46eff18
Author: yuzelin <[email protected]>
AuthorDate: Thu Dec 25 10:39:22 2025 +0800
[core] Support enable and disable tag time expiration (#6884)
---
docs/layouts/shortcodes/generated/core_configuration.html | 6 ++++++
.../src/main/java/org/apache/paimon/CoreOptions.java | 10 ++++++++++
.../main/java/org/apache/paimon/tag/TagAutoManager.java | 15 +++++++++++----
.../paimon/flink/procedure/ExpireTagsProcedure.java | 6 ++++++
.../procedure/ProcedurePositionalArgumentsITCase.java | 9 +++++++--
.../paimon/flink/procedure/ExpireTagsProcedure.java | 6 ++++++
.../apache/paimon/flink/action/ExpireTagsActionTest.java | 6 +++++-
.../paimon/flink/procedure/ExpireTagsProcedureITCase.java | 5 ++++-
.../paimon/spark/procedure/ExpireTagsProcedure.java | 6 ++++++
.../paimon/spark/procedure/ExpireTagsProcedureTest.scala | 2 ++
10 files changed, 63 insertions(+), 8 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 027d798c06..fc7a0c3839 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1362,6 +1362,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td><p>Enum</p></td>
<td>The date format for tag periods.<br /><br />Possible
values:<ul><li>"with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd
HH'</li><li>"without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd
HH'</li><li>"without_dashes_and_spaces": Dates and hours without dashes and
spaces, e.g., 'yyyyMMddHH'</li></ul></td>
</tr>
+ <tr>
+ <td><h5>tag.time-expire-enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to enable tag expiration by retained time.</td>
+ </tr>
<tr>
<td><h5>target-file-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d5bd5789e8..20aa5182d6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1635,6 +1635,12 @@ public class CoreOptions implements Serializable {
"The default maximum time retained for newly
created tags. "
+ "It affects both auto-created tags and
manually created (by procedure) tags.");
+ public static final ConfigOption<Boolean> TAG_TIME_EXPIRE_ENABLED =
+ key("tag.time-expire-enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to enable tag expiration by
retained time.");
+
public static final ConfigOption<Boolean> TAG_AUTOMATIC_COMPLETION =
key("tag.automatic-completion")
.booleanType()
@@ -3053,6 +3059,10 @@ public class CoreOptions implements Serializable {
return options.get(TAG_DEFAULT_TIME_RETAINED);
}
+ public boolean tagTimeExpireEnabled() {
+ return options.get(TAG_TIME_EXPIRE_ENABLED);
+ }
+
public boolean tagAutomaticCompletion() {
return options.get(TAG_AUTOMATIC_COMPLETION);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
index 817c20af46..31ca385d1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
@@ -24,15 +24,18 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import javax.annotation.Nullable;
+
import java.util.List;
/** A manager to create and expire tags. */
public class TagAutoManager {
- private final TagAutoCreation tagAutoCreation;
- private final TagTimeExpire tagTimeExpire;
+ @Nullable private final TagAutoCreation tagAutoCreation;
+ @Nullable private final TagTimeExpire tagTimeExpire;
- private TagAutoManager(TagAutoCreation tagAutoCreation, TagTimeExpire
tagTimeExpire) {
+ private TagAutoManager(
+ @Nullable TagAutoCreation tagAutoCreation, @Nullable TagTimeExpire
tagTimeExpire) {
this.tagAutoCreation = tagAutoCreation;
this.tagTimeExpire = tagTimeExpire;
}
@@ -60,13 +63,17 @@ public class TagAutoManager {
? null
: TagAutoCreation.create(
options, snapshotManager, tagManager,
tagDeletion, callbacks),
- TagTimeExpire.create(snapshotManager, tagManager, tagDeletion,
callbacks));
+ options.tagTimeExpireEnabled()
+ ? TagTimeExpire.create(snapshotManager, tagManager,
tagDeletion, callbacks)
+ : null);
}
+ @Nullable
public TagAutoCreation getTagAutoCreation() {
return tagAutoCreation;
}
+ @Nullable
public TagTimeExpire getTagTimeExpire() {
return tagTimeExpire;
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index d2b649bd0f..037c4bb71d 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
@@ -26,6 +27,7 @@ import org.apache.paimon.utils.DateTimeUtils;
import org.apache.flink.table.procedure.ProcedureContext;
import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
@@ -42,6 +44,10 @@ public class ExpireTagsProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String tableId,
String olderThanStr)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+ fileStoreTable =
+ fileStoreTable.copy(
+ Collections.singletonMap(
+ CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(),
"true"));
TagTimeExpire tagTimeExpire =
fileStoreTable.store().newTagAutoManager(fileStoreTable).getTagTimeExpire();
if (olderThanStr != null) {
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 2f32c326a7..5dac8cd200 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -499,14 +500,18 @@ public class ProcedurePositionalArgumentsITCase extends
CatalogITCaseBase {
@Test
public void testExpireTags() throws Exception {
+ boolean tagTimeExpireEnabled =
ThreadLocalRandom.current().nextBoolean();
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
- + " 'bucket' = '1'"
- + ")");
+ + " 'bucket' = '1',"
+ + " 'tag.time-expire-enabled' = '%s'"
+ + ")",
+ tagTimeExpireEnabled);
+
FileStoreTable table = paimonTable("T");
for (int i = 1; i <= 3; i++) {
sql("INSERT INTO T VALUES ('" + i + "', '" + i + "')");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index 3fdb36a18b..2c03cf31d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
@@ -32,6 +33,7 @@ import org.apache.flink.types.Row;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
@@ -52,6 +54,10 @@ public class ExpireTagsProcedure extends ProcedureBase {
ProcedureContext procedureContext, String tableId, @Nullable
String olderThanStr)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+ fileStoreTable =
+ fileStoreTable.copy(
+ Collections.singletonMap(
+ CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(),
"true"));
TagTimeExpire tagTimeExpire =
fileStoreTable.store().newTagAutoManager(fileStoreTable).getTagTimeExpire();
if (olderThanStr != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
index 73a6b10f48..b07cf50816 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.nio.file.Path;
import java.time.LocalDateTime;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
@@ -47,10 +48,13 @@ public class ExpireTagsActionTest extends ActionITCaseBase {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testExpireTags(boolean startFlinkJob) throws Exception {
+ boolean tagTimeExpireEnabled =
ThreadLocalRandom.current().nextBoolean();
bEnv.executeSql(
"CREATE TABLE T (id STRING, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
- + " WITH ('bucket'='1', 'write-only'='true')");
+ + " WITH ('bucket'='1', 'write-only'='true',
'tag.time-expire-enabled' = '"
+ + tagTimeExpireEnabled
+ + "')");
expireTags(startFlinkJob);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
index 90e5bc6702..e44769e648 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -37,10 +38,12 @@ public class ExpireTagsProcedureITCase extends
CatalogITCaseBase {
@Test
public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws
Exception {
+ boolean tagTimeExpireEnabled =
ThreadLocalRandom.current().nextBoolean();
sql(
"CREATE TABLE T (id STRING, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
- + " WITH ('bucket'='1', 'write-only'='true')");
+ + " WITH ('bucket'='1', 'write-only'='true',
'tag.time-expire-enabled'='%s')",
+ tagTimeExpireEnabled);
FileStoreTable table = paimonTable("T");
SnapshotManager snapshotManager = table.snapshotManager();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
index 17c56b7077..f8e685cf2e 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DateTimeUtils;
@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
@@ -73,6 +75,10 @@ public class ExpireTagsProcedure extends BaseProcedure {
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
+ fileStoreTable =
+ fileStoreTable.copy(
+ Collections.singletonMap(
+
CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), "true"));
TagTimeExpire tagTimeExpire =
fileStoreTable
.store()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
index 1ac9709c87..d1e1d3f52d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
@@ -28,9 +28,11 @@ import org.assertj.core.api.Assertions.assertThat
class ExpireTagsProcedureTest extends PaimonSparkTestBase {
test("Paimon procedure: expire tags that reached its timeRetained") {
+ val tagTimeExpireEnabled = scala.util.Random.nextBoolean()
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
+ |TBLPROPERTIES ('tag.time-expire-enabled' =
'$tagTimeExpireEnabled')
|""".stripMargin)
val table = loadTable("T")