This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit eb4086beccc283b93a39c214baf39db7b6086df6
Author: wangwj <[email protected]>
AuthorDate: Mon Apr 15 16:39:33 2024 +0800

    [core] Introduce TTL for tag
---
 docs/content/maintenance/manage-tags.md            |   7 +
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  10 +
 .../java/org/apache/paimon/AbstractFileStore.java  |   6 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   4 +-
 .../src/main/java/org/apache/paimon/Snapshot.java  |  34 +-
 .../paimon/operation/FileStoreCommitImpl.java      |   2 +-
 .../paimon/table/AbstractFileStoreTable.java       |  30 +-
 .../org/apache/paimon/table/ReadonlyTable.java     |  20 ++
 .../main/java/org/apache/paimon/table/Table.java   |   9 +
 .../apache/paimon/table/sink/TableCommitImpl.java  |  22 +-
 .../org/apache/paimon/table/system/TagsTable.java  |  39 ++-
 .../src/main/java/org/apache/paimon/tag/Tag.java   | 355 +++++++++++++++++++++
 .../org/apache/paimon/tag/TagAutoCreation.java     |  40 +--
 .../java/org/apache/paimon/tag/TagAutoExpire.java  | 140 ++++++++
 .../java/org/apache/paimon/tag/TagAutoManager.java |  67 ++++
 .../org/apache/paimon/utils/JsonSerdeUtil.java     |   2 +
 .../java/org/apache/paimon/utils/TagManager.java   |  67 ++--
 .../paimon/operation/ExpireSnapshotsTest.java      |   6 +-
 .../apache/paimon/operation/FileDeletionTest.java  |  19 +-
 .../apache/paimon/table/system/TagsTableTest.java  |  26 +-
 ...toCreationTest.java => TagAutoManagerTest.java} | 103 +++++-
 .../test/java/org/apache/paimon/tag/TagTest.java   | 181 +++++++++++
 .../org/apache/paimon/utils/TagManagerTest.java    | 198 ++++++++++++
 .../paimon/flink/action/CreateTagAction.java       |  10 +-
 .../flink/action/CreateTagActionFactory.java       |  17 +-
 .../paimon/flink/procedure/CreateTagProcedure.java |  23 +-
 .../sink/AutoTagForSavepointCommitterOperator.java |   9 +-
 .../flink/sink/BatchWriteGeneratorTagOperator.java |   6 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   3 +-
 .../paimon/flink/action/BranchActionITCase.java    |   4 +-
 .../paimon/flink/action/TagActionITCase.java       |   6 +-
 .../AutoTagForSavepointCommitterOperatorTest.java  |   6 +-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  12 +-
 .../paimon/spark/procedure/CreateTagProcedure.java |  13 +-
 .../CreateAndDeleteTagProcedureTest.scala          |   3 +-
 36 files changed, 1353 insertions(+), 152 deletions(-)

diff --git a/docs/content/maintenance/manage-tags.md 
b/docs/content/maintenance/manage-tags.md
index fc499f607..26b657ebb 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID.
     --database <database-name> \ 
     --table <table-name> \
     --tag_name <tag-name> \
+    --time_retained <time-retained> \
     [--snapshot <snapshot_id>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
 ```
@@ -126,6 +127,7 @@ public class CreateTag {
     public static void main(String[] args) {
         Table table = ...;
         table.createTag("my-tag", 1);
+        table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1);
     }
 }
 ```
@@ -138,6 +140,11 @@ Run the following sql:
 CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);
 ```
 
+To create a tag with retained 1 day, run the following sql:
+```sql
+CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', 
snapshot => 2);
+```
+
 To create a tag based on the latest snapshot id, run the following sql:
 ```sql
 CALL create_tag(table => 'test.t', tag => 'test_tag');
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 901a3b66b..2e7c4ae17 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -678,6 +678,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td><p>Enum</p></td>
             <td>What frequency is used to generate tags.<br /><br />Possible 
values:<ul><li>"daily": Generate a tag every day.</li><li>"hourly": Generate a 
tag every hour.</li><li>"two-hours": Generate a tag every two 
hours.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>tag.default-time-retained</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>The maximum default time retained for all tags.</td>
+        </tr>
         <tr>
             <td><h5>tag.num-retained-max</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 327e6f231..ac5a4e022 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1020,6 +1020,12 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("The maximum number of tags to retain.");
 
+    public static final ConfigOption<Duration> TAG_DEFAULT_TIME_RETAINED =
+            key("tag.default-time-retained")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("The maximum default time retained for 
all tags.");
+
     public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT 
=
             key("snapshot.watermark-idle-timeout")
                     .durationType()
@@ -1653,6 +1659,10 @@ public class CoreOptions implements Serializable {
         return options.get(TAG_NUM_RETAINED_MAX);
     }
 
+    public Duration tagDefaultTimeRetained() {
+        return options.get(TAG_DEFAULT_TIME_RETAINED);
+    }
+
     public Duration snapshotWatermarkIdleTimeout() {
         return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 87cc4e65c..b7830f0ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -41,7 +41,7 @@ import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CallbackUtils;
 import org.apache.paimon.table.sink.TagCallback;
-import org.apache.paimon.tag.TagAutoCreation;
+import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
@@ -249,8 +249,8 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Override
     @Nullable
-    public TagAutoCreation newTagCreationManager() {
-        return TagAutoCreation.create(
+    public TagAutoManager newTagCreationManager() {
+        return TagAutoManager.create(
                 options,
                 snapshotManager(),
                 newTagManager(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 6731121c5..1d9e247d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -33,7 +33,7 @@ import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.TagCallback;
-import org.apache.paimon.tag.TagAutoCreation;
+import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
@@ -93,7 +93,7 @@ public interface FileStore<T> extends Serializable {
     PartitionExpire newPartitionExpire(String commitUser);
 
     @Nullable
-    TagAutoCreation newTagCreationManager();
+    TagAutoManager newTagCreationManager();
 
     ServiceManager newServiceManager();
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 0ac23ecf0..0c42c4e43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -95,37 +95,37 @@ public class Snapshot {
     // null for paimon <= 0.2
     @JsonProperty(FIELD_VERSION)
     @Nullable
-    private final Integer version;
+    protected final Integer version;
 
     @JsonProperty(FIELD_ID)
-    private final long id;
+    protected final long id;
 
     @JsonProperty(FIELD_SCHEMA_ID)
-    private final long schemaId;
+    protected final long schemaId;
 
     // a manifest list recording all changes from the previous snapshots
     @JsonProperty(FIELD_BASE_MANIFEST_LIST)
-    private final String baseManifestList;
+    protected final String baseManifestList;
 
     // a manifest list recording all new changes occurred in this snapshot
     // for faster expire and streaming reads
     @JsonProperty(FIELD_DELTA_MANIFEST_LIST)
-    private final String deltaManifestList;
+    protected final String deltaManifestList;
 
     // a manifest list recording all changelog produced in this snapshot
     // null if no changelog is produced, or for paimon <= 0.2
     @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
     @Nullable
-    private final String changelogManifestList;
+    protected final String changelogManifestList;
 
     // a manifest recording all index files of this table
     // null if no index file
     @JsonProperty(FIELD_INDEX_MANIFEST)
     @JsonInclude(JsonInclude.Include.NON_NULL)
-    private final String indexManifest;
+    protected final String indexManifest;
 
     @JsonProperty(FIELD_COMMIT_USER)
-    private final String commitUser;
+    protected final String commitUser;
 
     // Mainly for snapshot deduplication.
     //
@@ -135,37 +135,37 @@ public class Snapshot {
     // If snapshot A has a smaller commitIdentifier than snapshot B, then 
snapshot A must be
     // committed before snapshot B, and thus snapshot A must contain older 
records than snapshot B.
     @JsonProperty(FIELD_COMMIT_IDENTIFIER)
-    private final long commitIdentifier;
+    protected final long commitIdentifier;
 
     @JsonProperty(FIELD_COMMIT_KIND)
-    private final CommitKind commitKind;
+    protected final CommitKind commitKind;
 
     @JsonProperty(FIELD_TIME_MILLIS)
-    private final long timeMillis;
+    protected final long timeMillis;
 
     @JsonProperty(FIELD_LOG_OFFSETS)
     @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
-    private final Map<Integer, Long> logOffsets;
+    protected final Map<Integer, Long> logOffsets;
 
     // record count of all changes occurred in this snapshot
     // null for paimon <= 0.3
     @JsonProperty(FIELD_TOTAL_RECORD_COUNT)
     @Nullable
-    private final Long totalRecordCount;
+    protected final Long totalRecordCount;
 
     // record count of all new changes occurred in this snapshot
     // null for paimon <= 0.3
     @JsonProperty(FIELD_DELTA_RECORD_COUNT)
     @Nullable
-    private final Long deltaRecordCount;
+    protected final Long deltaRecordCount;
 
     // record count of all changelog produced in this snapshot
     // null for paimon <= 0.3
     @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT)
     @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
-    private final Long changelogRecordCount;
+    protected final Long changelogRecordCount;
 
     // watermark for input records
     // null for paimon <= 0.3
@@ -174,14 +174,14 @@ public class Snapshot {
     @JsonProperty(FIELD_WATERMARK)
     @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
-    private final Long watermark;
+    protected final Long watermark;
 
     // stats file name for statistics of this table
     // null if no stats file
     @JsonInclude(JsonInclude.Include.NON_NULL)
     @JsonProperty(FIELD_STATISTICS)
     @Nullable
-    private final String statistics;
+    protected final String statistics;
 
     public Snapshot(
             long id,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 83508adf0..e389a471c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1231,7 +1231,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return latestSnapshot -> false;
     }
 
-    static ConflictCheck mustConflictCheck() {
+    public static ConflictCheck mustConflictCheck() {
         return latestSnapshot -> true;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 70e33ddb3..871bba4af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -60,6 +60,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -461,8 +462,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
     }
 
-    @Override
-    public void createTag(String tagName, long fromSnapshotId) {
+    public Snapshot createTagInternal(long fromSnapshotId) {
         SnapshotManager snapshotManager = snapshotManager();
         Snapshot snapshot = null;
         if (snapshotManager.snapshotExists(fromSnapshotId)) {
@@ -482,18 +482,36 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 snapshot != null,
                 "Cannot create tag because given snapshot #%s doesn't exist.",
                 fromSnapshotId);
-        createTag(tagName, snapshot);
+        return snapshot;
+    }
+
+    @Override
+    public void createTag(String tagName, long fromSnapshotId) {
+        createTag(
+                tagName, coreOptions().tagDefaultTimeRetained(), 
createTagInternal(fromSnapshotId));
+    }
+
+    @Override
+    public void createTag(String tagName, @Nullable Duration timeRetained, 
long fromSnapshotId) {
+        createTag(tagName, timeRetained, createTagInternal(fromSnapshotId));
     }
 
     @Override
     public void createTag(String tagName) {
         Snapshot latestSnapshot = snapshotManager().latestSnapshot();
         checkNotNull(latestSnapshot, "Cannot create tag because latest 
snapshot doesn't exist.");
-        createTag(tagName, latestSnapshot);
+        createTag(tagName, coreOptions().tagDefaultTimeRetained(), 
latestSnapshot);
+    }
+
+    @Override
+    public void createTag(String tagName, @Nullable Duration timeRetained) {
+        Snapshot latestSnapshot = snapshotManager().latestSnapshot();
+        checkNotNull(latestSnapshot, "Cannot create tag because latest 
snapshot doesn't exist.");
+        createTag(tagName, timeRetained, latestSnapshot);
     }
 
-    private void createTag(String tagName, Snapshot fromSnapshot) {
-        tagManager().createTag(fromSnapshot, tagName, 
store().createTagCallbacks());
+    private void createTag(String tagName, @Nullable Duration timeRetained, 
Snapshot fromSnapshot) {
+        tagManager().createTag(fromSnapshot, tagName, timeRetained, 
store().createTagCallbacks());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index d07035f26..be823156c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table;
 
+import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.InnerTableCommit;
@@ -25,6 +26,9 @@ import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -109,6 +113,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Experimental
+    default void createTag(String tagName, @Nullable Duration timeRetained, 
long fromSnapshotId) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support createTag.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void createTag(String tagName) {
         throw new UnsupportedOperationException(
@@ -117,6 +129,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Experimental
+    default void createTag(String tagName, @Nullable Duration timeRetained) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support createTag.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void deleteTag(String tagName) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 3ed6a1990..3d46d25a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -26,7 +26,10 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -76,10 +79,16 @@ public interface Table extends Serializable {
     @Experimental
     void createTag(String tagName, long fromSnapshotId);
 
+    @Experimental
+    void createTag(String tagName, @Nullable Duration timeRetained, long 
fromSnapshotId);
+
     /** Create a tag from latest snapshot. */
     @Experimental
     void createTag(String tagName);
 
+    @Experimental
+    void createTag(String tagName, @Nullable Duration timeRetained);
+
     /** Delete a tag by name. */
     @Experimental
     void deleteTag(String tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c63511ac4..9d471ee57 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -31,7 +31,7 @@ import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.metrics.CommitMetrics;
-import org.apache.paimon.tag.TagAutoCreation;
+import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.IOUtils;
@@ -76,7 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {
     private final List<CommitCallback> commitCallbacks;
     @Nullable private final Runnable expireSnapshots;
     @Nullable private final PartitionExpire partitionExpire;
-    @Nullable private final TagAutoCreation tagAutoCreation;
+    @Nullable private final TagAutoManager tagAutoManager;
     private final Lock lock;
 
     @Nullable private final Duration consumerExpireTime;
@@ -96,7 +96,7 @@ public class TableCommitImpl implements InnerTableCommit {
             List<CommitCallback> commitCallbacks,
             @Nullable Runnable expireSnapshots,
             @Nullable PartitionExpire partitionExpire,
-            @Nullable TagAutoCreation tagAutoCreation,
+            @Nullable TagAutoManager tagAutoManager,
             Lock lock,
             @Nullable Duration consumerExpireTime,
             ConsumerManager consumerManager,
@@ -112,7 +112,7 @@ public class TableCommitImpl implements InnerTableCommit {
         this.commitCallbacks = commitCallbacks;
         this.expireSnapshots = expireSnapshots;
         this.partitionExpire = partitionExpire;
-        this.tagAutoCreation = tagAutoCreation;
+        this.tagAutoManager = tagAutoManager;
         this.lock = lock;
 
         this.consumerExpireTime = consumerExpireTime;
@@ -131,8 +131,14 @@ public class TableCommitImpl implements InnerTableCommit {
     }
 
     public boolean forceCreatingSnapshot() {
-        return this.forceCreatingSnapshot
-                || (tagAutoCreation != null && 
tagAutoCreation.forceCreatingSnapshot());
+        if (this.forceCreatingSnapshot) {
+            return true;
+        }
+        if (tagAutoManager != null) {
+            return tagAutoManager.getTagAutoCreation() != null
+                    && 
tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
+        }
+        return false;
     }
 
     @Override
@@ -349,8 +355,8 @@ public class TableCommitImpl implements InnerTableCommit {
             partitionExpire.expire(partitionExpireIdentifier);
         }
 
-        if (tagAutoCreation != null) {
-            tagAutoCreation.run();
+        if (tagAutoManager != null) {
+            tagAutoManager.run();
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index ebd8aa7af..e51ebcb0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -39,6 +38,7 @@ import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.tag.Tag;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
@@ -51,6 +51,7 @@ import org.apache.paimon.utils.TagManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -79,7 +80,10 @@ public class TagsTable implements ReadonlyTable {
                             new DataField(2, "schema_id", new 
BigIntType(false)),
                             new DataField(3, "commit_time", new 
TimestampType(false, 3)),
                             new DataField(4, "record_count", new 
BigIntType(true)),
-                            new DataField(5, "branches", 
SerializationUtils.newStringType(true))));
+                            new DataField(5, "branches", 
SerializationUtils.newStringType(true)),
+                            new DataField(6, "create_time", new 
TimestampType(false, 3)),
+                            new DataField(
+                                    7, "time_retained", 
SerializationUtils.newStringType(true))));
 
     private final FileIO fileIO;
     private final Path location;
@@ -204,9 +208,9 @@ public class TagsTable implements ReadonlyTable {
             Options options = new Options();
             options.set(CoreOptions.PATH, location.toUri().toString());
             FileStoreTable table = FileStoreTableFactory.create(fileIO, 
options);
-            SortedMap<Snapshot, List<String>> tags = table.tagManager().tags();
-            Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
-            for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
+            SortedMap<Tag, List<String>> tags = 
table.tagManager().tagsWithTimeRetained();
+            Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
+            for (Map.Entry<Tag, List<String>> tag : tags.entrySet()) {
                 for (String tagName : tag.getValue()) {
                     nameToSnapshot.put(tagName, tag.getKey());
                 }
@@ -234,17 +238,24 @@ public class TagsTable implements ReadonlyTable {
         }
 
         private InternalRow toRow(
-                Map.Entry<String, Snapshot> tag, Map<String, List<String>> 
tagBranches) {
-            Snapshot snapshot = tag.getValue();
-            List<String> branches = tagBranches.get(tag.getKey());
+                Map.Entry<String, Tag> snapshot, Map<String, List<String>> 
tagBranches) {
+            Tag tag = snapshot.getValue();
+            List<String> branches = tagBranches.get(snapshot.getKey());
             return GenericRow.of(
-                    BinaryString.fromString(tag.getKey()),
-                    snapshot.id(),
-                    snapshot.schemaId(),
+                    BinaryString.fromString(snapshot.getKey()),
+                    tag.id(),
+                    tag.schemaId(),
+                    
Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())),
+                    tag.totalRecordCount(),
+                    BinaryString.fromString(branches == null ? "[]" : 
branches.toString()),
                     Timestamp.fromLocalDateTime(
-                            
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
-                    snapshot.totalRecordCount(),
-                    BinaryString.fromString(branches == null ? "[]" : 
branches.toString()));
+                            tag.getTagCreateTime() == null
+                                    ? LocalDateTime.MIN
+                                    : tag.getTagCreateTime()),
+                    BinaryString.fromString(
+                            tag.getTagTimeRetained() == null
+                                    ? ""
+                                    : tag.getTagTimeRetained().toString()));
         }
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
new file mode 100644
index 000000000..58dbf430b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Snapshot with tagCreateTime and tagTimeRetained. */
+public class Tag extends Snapshot {
+
+    public static final Comparator<Tag> TAG_COMPARATOR = new TagComparator();
+
+    private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime";
+    private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained";
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_TAG_CREATE_TIME)
+    @Nullable
+    private final LocalDateTime tagCreateTime;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_TAG_TIME_RETAINED)
+    @Nullable
+    private final Duration tagTimeRetained;
+
+    @JsonCreator
+    public Tag(
+            @JsonProperty(FIELD_VERSION) @Nullable Integer version,
+            @JsonProperty(FIELD_ID) long id,
+            @JsonProperty(FIELD_SCHEMA_ID) long schemaId,
+            @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
+            @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
+            @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String 
changelogManifestList,
+            @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
+            @JsonProperty(FIELD_COMMIT_USER) String commitUser,
+            @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
+            @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
+            @JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
+            @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
+            @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long 
totalRecordCount,
+            @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long 
deltaRecordCount,
+            @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long 
changelogRecordCount,
+            @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
+            @JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
+            @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime 
tagCreateTime,
+            @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration 
tagTimeRetained) {
+        super(
+                version,
+                id,
+                schemaId,
+                baseManifestList,
+                deltaManifestList,
+                changelogManifestList,
+                indexManifest,
+                commitUser,
+                commitIdentifier,
+                commitKind,
+                timeMillis,
+                logOffsets,
+                totalRecordCount,
+                deltaRecordCount,
+                changelogRecordCount,
+                watermark,
+                statistics);
+        this.tagCreateTime = tagCreateTime;
+        this.tagTimeRetained = tagTimeRetained;
+    }
+
+    @JsonGetter(FIELD_TAG_CREATE_TIME)
+    public @Nullable LocalDateTime getTagCreateTime() {
+        return tagCreateTime;
+    }
+
+    @JsonGetter(FIELD_TAG_TIME_RETAINED)
+    public @Nullable Duration getTagTimeRetained() {
+        return tagTimeRetained;
+    }
+
+    public String toJson() {
+        return JsonSerdeUtil.toJson(this);
+    }
+
+    public static Tag fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, Tag.class);
+    }
+
+    public static Tag fromPath(FileIO fileIO, Path path) {
+        try {
+            String json = fileIO.readFileUtf8(path);
+            return Tag.fromJson(json);
+        } catch (IOException e) {
+            throw new RuntimeException("Fails to read tag from path " + path, 
e);
+        }
+    }
+
+    public static Optional<Tag> safelyFromTagPath(FileIO fileIO, Path path) 
throws IOException {
+        try {
+            String json = fileIO.readFileUtf8(path);
+            return Optional.of(Tag.fromJson(json));
+        } catch (FileNotFoundException e) {
+            return Optional.empty();
+        }
+    }
+
+    public static Tag fromSnapshotAndTagTtl(
+            Snapshot snapshot, Duration tagTimeRetained, LocalDateTime 
tagCreateTime) {
+        return new Tag(
+                snapshot.version(),
+                snapshot.id(),
+                snapshot.schemaId(),
+                snapshot.baseManifestList(),
+                snapshot.deltaManifestList(),
+                snapshot.changelogManifestList(),
+                snapshot.indexManifest(),
+                snapshot.commitUser(),
+                snapshot.commitIdentifier(),
+                snapshot.commitKind(),
+                snapshot.timeMillis(),
+                snapshot.logOffsets(),
+                snapshot.totalRecordCount(),
+                snapshot.deltaRecordCount(),
+                snapshot.changelogRecordCount(),
+                snapshot.watermark(),
+                snapshot.statistics(),
+                tagCreateTime,
+                tagTimeRetained);
+    }
+
+    public Snapshot toSnapshot() {
+        return new Snapshot(
+                version,
+                id,
+                schemaId,
+                baseManifestList,
+                deltaManifestList,
+                changelogManifestList,
+                indexManifest,
+                commitUser,
+                commitIdentifier,
+                commitKind,
+                timeMillis,
+                logOffsets,
+                totalRecordCount,
+                deltaRecordCount,
+                changelogRecordCount,
+                watermark,
+                statistics);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), tagCreateTime, tagTimeRetained);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        Tag that = (Tag) o;
+        return Objects.equals(tagCreateTime, that.tagCreateTime)
+                && Objects.equals(tagTimeRetained, that.tagTimeRetained);
+    }
+
+    private static class TagComparator implements Comparator<Tag> {
+        @Override
+        public int compare(Tag tag1, Tag tag2) {
+            int comparisonResult = 0;
+
+            // Compare id
+            comparisonResult = Long.compare(tag1.id, tag2.id);
+            if (comparisonResult != 0) {
+                return comparisonResult;
+            }
+
+            // Compare tagCreateTime
+            if (tag1.tagCreateTime != null && tag2.tagCreateTime != null) {
+                comparisonResult = 
tag1.tagCreateTime.compareTo(tag2.tagCreateTime);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare tagTimeRetained
+            if (tag1.tagTimeRetained != null && tag2.tagTimeRetained != null) {
+                comparisonResult = 
tag1.tagTimeRetained.compareTo(tag2.tagTimeRetained);
+            }
+
+            // Compare version
+            if (tag1.version != null && tag2.version != null) {
+                comparisonResult = Integer.compare(tag1.version, tag2.version);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare schemaId
+            comparisonResult = Long.compare(tag1.schemaId, tag2.schemaId);
+            if (comparisonResult != 0) {
+                return comparisonResult;
+            }
+
+            // Compare baseManifestList
+            if (tag1.baseManifestList != null && tag2.baseManifestList != 
null) {
+                comparisonResult = 
tag1.baseManifestList.compareTo(tag2.baseManifestList);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare deltaManifestList
+            if (tag1.deltaManifestList != null && tag2.deltaManifestList != 
null) {
+                comparisonResult = 
tag1.deltaManifestList.compareTo(tag2.deltaManifestList);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare changelogManifestList
+            if (tag1.changelogManifestList != null && 
tag2.changelogManifestList != null) {
+                comparisonResult = 
tag1.changelogManifestList.compareTo(tag2.changelogManifestList);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare indexManifest
+            if (tag1.indexManifest != null && tag2.indexManifest != null) {
+                comparisonResult = 
tag1.indexManifest.compareTo(tag2.indexManifest);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare commitUser
+            if (tag1.commitUser != null && tag2.commitUser != null) {
+                comparisonResult = tag1.commitUser.compareTo(tag2.commitUser);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare commitIdentifier
+            comparisonResult = Long.compare(tag1.commitIdentifier, 
tag2.commitIdentifier);
+            if (comparisonResult != 0) {
+                return comparisonResult;
+            }
+
+            // Compare commitKind
+            if (tag1.commitKind != null && tag2.commitKind != null) {
+                comparisonResult = tag1.commitKind.compareTo(tag2.commitKind);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare timeMillis
+            comparisonResult = Long.compare(tag1.timeMillis, tag2.timeMillis);
+            if (comparisonResult != 0) {
+                return comparisonResult;
+            }
+
+            // Compare logOffsets
+            if (tag1.logOffsets != null && tag2.logOffsets != null) {
+                comparisonResult = Integer.compare(tag1.logOffsets.size(), 
tag2.logOffsets.size());
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare totalRecordCount
+            if (tag1.totalRecordCount != null && tag2.totalRecordCount != 
null) {
+                comparisonResult = Long.compare(tag1.totalRecordCount, 
tag2.totalRecordCount);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare deltaRecordCount
+            if (tag1.deltaRecordCount != null && tag2.deltaRecordCount != 
null) {
+                comparisonResult = Long.compare(tag1.deltaRecordCount, 
tag2.deltaRecordCount);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare changelogRecordCount
+            if (tag1.changelogRecordCount != null && tag2.changelogRecordCount 
!= null) {
+                comparisonResult =
+                        Long.compare(tag1.changelogRecordCount, 
tag2.changelogRecordCount);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare watermark
+            if (tag1.watermark != null && tag2.watermark != null) {
+                comparisonResult = Long.compare(tag1.watermark, 
tag2.watermark);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            // Compare statistics
+            if (tag1.statistics != null && tag2.statistics != null) {
+                comparisonResult = tag1.statistics.compareTo(tag2.statistics);
+                if (comparisonResult != 0) {
+                    return comparisonResult;
+                }
+            }
+
+            return comparisonResult;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 505454313..41e1e8d16 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -20,7 +20,6 @@ package org.apache.paimon.tag;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
 import org.apache.paimon.tag.TagTimeExtractor.WatermarkExtractor;
@@ -46,11 +45,10 @@ public class TagAutoCreation {
 
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
-    private final TagDeletion tagDeletion;
     private final TagTimeExtractor timeExtractor;
     private final TagPeriodHandler periodHandler;
     private final Duration delay;
-    private final Integer numRetainedMax;
+    private final Duration timeRetained;
     private final List<TagCallback> callbacks;
     private final Duration idlenessTimeout;
 
@@ -60,20 +58,18 @@ public class TagAutoCreation {
     private TagAutoCreation(
             SnapshotManager snapshotManager,
             TagManager tagManager,
-            TagDeletion tagDeletion,
             TagTimeExtractor timeExtractor,
             TagPeriodHandler periodHandler,
             Duration delay,
-            Integer numRetainedMax,
+            @Nullable Duration timeRetained,
             Duration idlenessTimeout,
             List<TagCallback> callbacks) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
-        this.tagDeletion = tagDeletion;
         this.timeExtractor = timeExtractor;
         this.periodHandler = periodHandler;
         this.delay = delay;
-        this.numRetainedMax = numRetainedMax;
+        this.timeRetained = timeRetained;
         this.callbacks = callbacks;
         this.idlenessTimeout = idlenessTimeout;
 
@@ -118,7 +114,7 @@ public class TagAutoCreation {
     public void run() {
         while (true) {
             if (snapshotManager.snapshotExists(nextSnapshot)) {
-                tryToTag(snapshotManager.snapshot(nextSnapshot));
+                tryToCreateTags(snapshotManager.snapshot(nextSnapshot));
                 nextSnapshot++;
             } else {
                 // avoid snapshot has been expired
@@ -132,7 +128,7 @@ public class TagAutoCreation {
         }
     }
 
-    private void tryToTag(Snapshot snapshot) {
+    private void tryToCreateTags(Snapshot snapshot) {
         Optional<LocalDateTime> timeOptional =
                 timeExtractor.extract(snapshot.timeMillis(), 
snapshot.watermark());
         if (!timeOptional.isPresent()) {
@@ -144,28 +140,8 @@ public class TagAutoCreation {
                 || isAfterOrEqual(time.minus(delay), 
periodHandler.nextTagTime(nextTag))) {
             LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
             String tagName = periodHandler.timeToTag(thisTag);
-            tagManager.createTag(snapshot, tagName, callbacks);
+            tagManager.createTag(snapshot, tagName, timeRetained, callbacks);
             nextTag = periodHandler.nextTagTime(thisTag);
-
-            if (numRetainedMax != null) {
-                // only handle auto-created tags here
-                SortedMap<Snapshot, List<String>> tags = 
tagManager.tags(periodHandler::isAutoTag);
-                if (tags.size() > numRetainedMax) {
-                    int toDelete = tags.size() - numRetainedMax;
-                    int i = 0;
-                    for (List<String> tag : tags.values()) {
-                        tagManager.deleteTag(
-                                checkAndGetOneAutoTag(tag),
-                                tagDeletion,
-                                snapshotManager,
-                                callbacks);
-                        i++;
-                        if (i == toDelete) {
-                            break;
-                        }
-                    }
-                }
-            }
         }
     }
 
@@ -187,7 +163,6 @@ public class TagAutoCreation {
             CoreOptions options,
             SnapshotManager snapshotManager,
             TagManager tagManager,
-            TagDeletion tagDeletion,
             List<TagCallback> callbacks) {
         TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(options);
         if (extractor == null) {
@@ -196,11 +171,10 @@ public class TagAutoCreation {
         return new TagAutoCreation(
                 snapshotManager,
                 tagManager,
-                tagDeletion,
                 extractor,
                 TagPeriodHandler.create(options),
                 options.tagCreationDelay(),
-                options.tagNumRetainedMax(),
+                options.tagDefaultTimeRetained(),
                 options.snapshotWatermarkIdleTimeout(),
                 callbacks);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java
new file mode 100644
index 000000000..e31a926b5
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+
+/** A manager to expire tags. */
+public class TagAutoExpire {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TagAutoExpire.class);
+
+    private final SnapshotManager snapshotManager;
+    private final TagManager tagManager;
+    private final TagDeletion tagDeletion;
+    private final TagPeriodHandler periodHandler;
+    private final Integer numRetainedMax;
+    private final List<TagCallback> callbacks;
+
+    private TagAutoExpire(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            TagDeletion tagDeletion,
+            TagPeriodHandler periodHandler,
+            Duration delay,
+            Integer numRetainedMax,
+            List<TagCallback> callbacks) {
+        this.snapshotManager = snapshotManager;
+        this.tagManager = tagManager;
+        this.tagDeletion = tagDeletion;
+        this.periodHandler = periodHandler;
+        this.numRetainedMax = numRetainedMax;
+        this.callbacks = callbacks;
+        this.periodHandler.validateDelay(delay);
+    }
+
+    public void run() {
+        Set<String> deleteTags = new HashSet<>();
+        deleteTags.addAll(getExpireTagsByNumRetainedMax());
+        deleteTags.addAll(getExpireTagsByTimeRetained());
+        deleteTags.forEach(
+                tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager, 
callbacks));
+    }
+
+    private Set<String> getExpireTagsByNumRetainedMax() {
+        Set<String> deleteTags = new HashSet<>();
+        if (numRetainedMax != null) {
+            // only handle auto-created tags here
+            SortedMap<Snapshot, List<String>> tags = 
tagManager.tags(periodHandler::isAutoTag);
+            if (tags.size() > numRetainedMax) {
+                int toDelete = tags.size() - numRetainedMax;
+                int i = 0;
+                for (List<String> tag : tags.values()) {
+                    String tagName = 
TagAutoCreation.checkAndGetOneAutoTag(tag);
+                    LOG.info(
+                            "Delete tag {}, because the number of auto-created 
tags reached numRetainedMax of {}.",
+                            tagName,
+                            numRetainedMax);
+                    deleteTags.add(tagName);
+                    i++;
+                    if (i == toDelete) {
+                        break;
+                    }
+                }
+            }
+        }
+        return deleteTags;
+    }
+
+    private Set<String> getExpireTagsByTimeRetained() {
+        // handle auto-created and non-auto-created-tags here
+        Set<String> deleteTags = new HashSet<>();
+        SortedMap<Tag, List<String>> tags = tagManager.tagsWithTimeRetained();
+        for (Map.Entry<Tag, List<String>> entry : tags.entrySet()) {
+            Tag tag = entry.getKey();
+            LocalDateTime createTime = tag.getTagCreateTime();
+            Duration timeRetained = tag.getTagTimeRetained();
+            if (createTime == null || timeRetained == null) {
+                continue;
+            }
+            if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) {
+                for (String tagName : entry.getValue()) {
+                    LOG.info(
+                            "Delete tag {}, because its existence time has 
reached its timeRetained of {}.",
+                            tagName,
+                            timeRetained);
+                    deleteTags.add(tagName);
+                }
+            }
+        }
+        return deleteTags;
+    }
+
+    public static TagAutoExpire create(
+            CoreOptions options,
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            TagDeletion tagDeletion,
+            List<TagCallback> callbacks) {
+        return new TagAutoExpire(
+                snapshotManager,
+                tagManager,
+                tagDeletion,
+                TagPeriodHandler.create(options),
+                options.tagCreationDelay(),
+                options.tagNumRetainedMax(),
+                callbacks);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
new file mode 100644
index 000000000..a99155947
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.List;
+
+/** A manager to create and expire tags. */
+public class TagAutoManager {
+
+    private final TagAutoCreation tagAutoCreation;
+    private final TagAutoExpire tagAutoExpire;
+
+    private TagAutoManager(TagAutoCreation tagAutoCreation, TagAutoExpire 
tagAutoExpire) {
+        this.tagAutoCreation = tagAutoCreation;
+        this.tagAutoExpire = tagAutoExpire;
+    }
+
+    public void run() {
+        if (tagAutoCreation != null) {
+            tagAutoCreation.run();
+        }
+        if (tagAutoExpire != null) {
+            tagAutoExpire.run();
+        }
+    }
+
+    public static TagAutoManager create(
+            CoreOptions options,
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            TagDeletion tagDeletion,
+            List<TagCallback> callbacks) {
+        TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(options);
+
+        return new TagAutoManager(
+                extractor == null
+                        ? null
+                        : TagAutoCreation.create(options, snapshotManager, 
tagManager, callbacks),
+                TagAutoExpire.create(options, snapshotManager, tagManager, 
tagDeletion, callbacks));
+    }
+
+    public TagAutoCreation getTagAutoCreation() {
+        return tagAutoCreation;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 676276a30..aa493dd4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -36,6 +36,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.Serialize
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 import javax.annotation.Nullable;
 
@@ -57,6 +58,7 @@ public class JsonSerdeUtil {
     static {
         OBJECT_MAPPER_INSTANCE = new ObjectMapper();
         OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule());
+        OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule());
     }
 
     public static <V> LinkedHashMap<String, V> parseJsonMap(String jsonString, 
Class<V> valueType) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 134dea459..ce6f02020 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -26,11 +26,16 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.tag.Tag;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.time.Duration;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -75,7 +80,11 @@ public class TagManager {
     }
 
     /** Create a tag from given snapshot and save it in the storage. */
-    public void createTag(Snapshot snapshot, String tagName, List<TagCallback> 
callbacks) {
+    public void createTag(
+            Snapshot snapshot,
+            String tagName,
+            @Nullable Duration timeRetained,
+            List<TagCallback> callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
 
         // skip create tag for the same snapshot of the same name.
@@ -86,7 +95,13 @@ public class TagManager {
         } else {
             Path newTagPath = tagPath(tagName);
             try {
-                fileIO.writeFileUtf8(newTagPath, snapshot.toJson());
+                fileIO.writeFileUtf8(
+                        newTagPath,
+                        timeRetained != null
+                                ? Tag.fromSnapshotAndTagTtl(
+                                                snapshot, timeRetained, 
LocalDateTime.now())
+                                        .toJson()
+                                : snapshot.toJson());
             } catch (IOException e) {
                 throw new RuntimeException(
                         String.format(
@@ -227,7 +242,7 @@ public class TagManager {
     /** Get the tagged snapshot by name. */
     public Snapshot taggedSnapshot(String tagName) {
         checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
-        return Snapshot.fromPath(fileIO, tagPath(tagName));
+        return Tag.fromPath(fileIO, tagPath(tagName)).toSnapshot();
     }
 
     public long tagCount() {
@@ -243,25 +258,41 @@ public class TagManager {
         return new ArrayList<>(tags().keySet());
     }
 
-    /** Get all tagged snapshots with names sorted by snapshot id. */
+    /** Get all tag sorted by Tag. */
+    public SortedMap<Tag, List<String>> tagsWithTimeRetained() {
+        return tagsWithFilter(tagName -> true);
+    }
+
+    /** Get all tagged snapshots with tag names sorted by snapshot id. */
     public SortedMap<Snapshot, List<String>> tags() {
         return tags(tagName -> true);
     }
 
+    public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
+        SortedMap<Snapshot, List<String>> sortedTagMap =
+                new TreeMap<>(Comparator.comparingLong(Snapshot::id));
+        SortedMap<Tag, List<String>> tags = tagsWithFilter(filter);
+        tags.forEach(
+                (key, value) ->
+                        sortedTagMap
+                                .computeIfAbsent(key.toSnapshot(), tagNames -> 
new ArrayList<>())
+                                .addAll(value));
+        return sortedTagMap;
+    }
+
     /**
-     * Retrieves a sorted map of snapshots filtered based on a provided 
predicate. The predicate
-     * determines which tag names should be included in the result. Only 
snapshots with tag names
-     * that pass the predicate test are included.
+     * Retrieves a sorted map of tags filtered based on a provided predicate. 
The predicate
+     * determines which tag names should be included in the result. Only tags 
with tag names that
+     * pass the predicate test are included.
      *
-     * @param filter A Predicate that tests each tag name. Snapshots with tag 
names that fail the
-     *     test are excluded from the result.
-     * @return A sorted map of filtered snapshots keyed by their IDs, each 
associated with its tag
-     *     name.
-     * @throws RuntimeException if an IOException occurs during retrieval of 
snapshots.
+     * @param filter A Predicate that tests each tag name. Tags with tag names 
that fail the test
+     *     are excluded from the result.
+     * @return A sorted map of filtered tags keyed by Tag.TAG_COMPARATOR, each 
associated with its
+     *     tag name.
+     * @throws RuntimeException if an IOException occurs during retrieval of 
tags.
      */
-    public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
-        TreeMap<Snapshot, List<String>> tags =
-                new TreeMap<>(Comparator.comparingLong(Snapshot::id));
+    public SortedMap<Tag, List<String>> tagsWithFilter(Predicate<String> 
filter) {
+        TreeMap<Tag, List<String>> tags = new TreeMap<>(Tag.TAG_COMPARATOR);
         try {
             List<Path> paths =
                     listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
@@ -276,10 +307,10 @@ public class TagManager {
                 }
                 // If the tag file is not found, it might be deleted by
                 // other processes, so just skip this tag
-                Snapshot.safelyFromPath(fileIO, path)
+                Tag.safelyFromTagPath(fileIO, path)
                         .ifPresent(
-                                snapshot ->
-                                        tags.computeIfAbsent(snapshot, s -> 
new ArrayList<>())
+                                tag ->
+                                        tags.computeIfAbsent(tag, s -> new 
ArrayList<>())
                                                 .add(tagName));
             }
         } catch (IOException e) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index de95819ad..626c13c01 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -140,7 +140,11 @@ public class ExpireSnapshotsTest {
         // create tags for each snapshot
         for (int id = 1; id <= latestSnapshotId; id++) {
             Snapshot snapshot = snapshotManager.snapshot(id);
-            tagManager.createTag(snapshot, "tag" + id, 
Collections.emptyList());
+            tagManager.createTag(
+                    snapshot,
+                    "tag" + id,
+                    store.options().tagDefaultTimeRetained(),
+                    Collections.emptyList());
         }
 
         // randomly expire snapshots
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 819b70d8a..a9be8d3ac 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -51,6 +51,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -262,7 +263,7 @@ public class FileDeletionTest {
 
         // step 2: commit -A (by clean bucket 0) and create tag1
         cleanBucket(store, gen.getPartition(gen.next()), 0);
-        createTag(snapshotManager.snapshot(2), "tag1");
+        createTag(snapshotManager.snapshot(2), "tag1", 
store.options().tagDefaultTimeRetained());
         assertThat(tagManager.tagExists("tag1")).isTrue();
 
         // step 3: commit C to bucket 2
@@ -273,7 +274,7 @@ public class FileDeletionTest {
 
         // step 4: commit -B (by clean bucket 1) and create tag2
         cleanBucket(store, partition, 1);
-        createTag(snapshotManager.snapshot(4), "tag2");
+        createTag(snapshotManager.snapshot(4), "tag2", 
store.options().tagDefaultTimeRetained());
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
         // step 5: commit D to bucket 3
@@ -353,7 +354,7 @@ public class FileDeletionTest {
         // snapshot 3: commit -A (by clean bucket 0)
         cleanBucket(store, gen.getPartition(gen.next()), 0);
 
-        createTag(snapshotManager.snapshot(1), "tag1");
+        createTag(snapshotManager.snapshot(1), "tag1", 
store.options().tagDefaultTimeRetained());
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
 
         // check data file and manifests
@@ -410,7 +411,7 @@ public class FileDeletionTest {
                 Arrays.asList(snapshot1.baseManifestList(), 
snapshot1.deltaManifestList());
 
         // create tag1
-        createTag(snapshot1, "tag1");
+        createTag(snapshot1, "tag1", store.options().tagDefaultTimeRetained());
 
         // expire snapshot 1, 2
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -485,9 +486,9 @@ public class FileDeletionTest {
                 Arrays.asList(snapshot2.baseManifestList(), 
snapshot2.deltaManifestList());
 
         // create tags
-        createTag(snapshotManager.snapshot(1), "tag1");
-        createTag(snapshotManager.snapshot(2), "tag2");
-        createTag(snapshotManager.snapshot(4), "tag3");
+        createTag(snapshotManager.snapshot(1), "tag1", 
store.options().tagDefaultTimeRetained());
+        createTag(snapshotManager.snapshot(2), "tag2", 
store.options().tagDefaultTimeRetained());
+        createTag(snapshotManager.snapshot(4), "tag3", 
store.options().tagDefaultTimeRetained());
 
         // expire snapshot 1, 2, 3, 4
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -735,7 +736,7 @@ public class FileDeletionTest {
                         null);
     }
 
-    private void createTag(Snapshot snapshot, String tagName) {
-        tagManager.createTag(snapshot, tagName, Collections.emptyList());
+    private void createTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {
+        tagManager.createTag(snapshot, tagName, timeRetained, 
Collections.emptyList());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 51cfcd702..b0d357853 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -30,6 +29,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.tag.Tag;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.TagManager;
@@ -120,19 +120,27 @@ class TagsTableTest extends TableTestBase {
     private List<InternalRow> getExceptedResult(
             Function<String, List<String>> tagBranchesFunction) {
         List<InternalRow> internalRows = new ArrayList<>();
-        for (Map.Entry<Snapshot, List<String>> tag : 
tagManager.tags().entrySet()) {
-            Snapshot snapshot = tag.getKey();
-            for (String tagName : tag.getValue()) {
+        for (Map.Entry<Tag, List<String>> snapshot : 
tagManager.tagsWithTimeRetained().entrySet()) {
+            Tag tag = snapshot.getKey();
+            for (String tagName : snapshot.getValue()) {
                 internalRows.add(
                         GenericRow.of(
                                 BinaryString.fromString(tagName),
-                                snapshot.id(),
-                                snapshot.schemaId(),
+                                tag.id(),
+                                tag.schemaId(),
                                 Timestamp.fromLocalDateTime(
-                                        
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
-                                snapshot.totalRecordCount(),
+                                        
DateTimeUtils.toLocalDateTime(tag.timeMillis())),
+                                tag.totalRecordCount(),
                                 BinaryString.fromString(
-                                        
tagBranchesFunction.apply(tagName).toString())));
+                                        
tagBranchesFunction.apply(tagName).toString()),
+                                Timestamp.fromLocalDateTime(
+                                        tag.getTagCreateTime() == null
+                                                ? LocalDateTime.MIN
+                                                : tag.getTagCreateTime()),
+                                BinaryString.fromString(
+                                        tag.getTagTimeRetained() == null
+                                                ? ""
+                                                : 
tag.getTagTimeRetained().toString())));
             }
         }
         return internalRows;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
similarity index 78%
rename from 
paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index f76a58e6c..a065945ba 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.tag;
 import org.apache.paimon.CoreOptions.TagCreationMode;
 import org.apache.paimon.CoreOptions.TagCreationPeriod;
 import org.apache.paimon.CoreOptions.TagPeriodFormatter;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
@@ -33,6 +34,7 @@ import org.junit.jupiter.api.Test;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.Collections;
 
 import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
@@ -41,12 +43,13 @@ import static 
org.apache.paimon.CoreOptions.SNAPSHOT_WATERMARK_IDLE_TIMEOUT;
 import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION;
 import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
 import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
+import static org.apache.paimon.CoreOptions.TAG_DEFAULT_TIME_RETAINED;
 import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for tag automatic creation. */
-public class TagAutoCreationTest extends PrimaryKeyTableTestBase {
+public class TagAutoManagerTest extends PrimaryKeyTableTestBase {
 
     @Test
     public void testTag() throws Exception {
@@ -320,6 +323,104 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testAutoCreateTagNotExpiredByTimeRetained() throws Exception {
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+        options.set(TAG_NUM_RETAINED_MAX, 3);
+        options.set(TAG_DEFAULT_TIME_RETAINED, Duration.ofMillis(500));
+        FileStoreTable table = this.table.copy(options.toMap());
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        TagManager tagManager = table.store().newTagManager();
+
+        // test normal creation
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
+        commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-18T14:00:00")));
+        commit.commit(new ManifestCommittable(2, 
utcMills("2023-07-18T15:12:00")));
+        commit.commit(new ManifestCommittable(3, 
utcMills("2023-07-18T16:00:00")));
+
+        // test expire old tag by time-retained
+        Thread.sleep(1000);
+        commit.commit(new ManifestCommittable(4, 
utcMills("2023-07-18T19:00:00")));
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 18");
+
+        commit.close();
+    }
+
+    @Test
+    public void testExpireTagsByTimeRetained() throws Exception {
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+        options.set(TAG_NUM_RETAINED_MAX, 3);
+        options.set(TAG_DEFAULT_TIME_RETAINED, Duration.ofMillis(500));
+        FileStoreTable table = this.table.copy(options.toMap());
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        TagManager tagManager = table.store().newTagManager();
+
+        // test normal creation
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
+        commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-18T14:00:00")));
+        commit.commit(new ManifestCommittable(2, 
utcMills("2023-07-18T15:12:00")));
+        commit.commit(new ManifestCommittable(3, 
utcMills("2023-07-18T16:00:00")));
+
+        Snapshot snapshot1 =
+                new Snapshot(
+                        4,
+                        0L,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        0L,
+                        Snapshot.CommitKind.APPEND,
+                        1000,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+        tagManager.createTag(
+                snapshot1,
+                "non-auto-create-tag-shoule-expire",
+                Duration.ofMillis(500),
+                Collections.emptyList());
+
+        Snapshot snapshot2 =
+                new Snapshot(
+                        5,
+                        0L,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        0L,
+                        Snapshot.CommitKind.APPEND,
+                        1000,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+        tagManager.createTag(
+                snapshot2,
+                "non-auto-create-tag-shoule-not-expire",
+                Duration.ofDays(1),
+                Collections.emptyList());
+
+        // test expire old tag by time-retained
+        Thread.sleep(1000);
+        commit.commit(new ManifestCommittable(6, 
utcMills("2023-07-18T19:00:00")));
+        assertThat(tagManager.allTagNames())
+                .containsOnly("2023-07-18 18", 
"non-auto-create-tag-shoule-not-expire");
+        commit.close();
+    }
+
     private long localZoneMills(String timestamp) {
         return LocalDateTime.parse(timestamp)
                 .atZone(ZoneId.systemDefault())
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
new file mode 100644
index 000000000..922055983
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Test for {@link Tag}. */
+public class TagTest {
+
+    private final Snapshot snapshot =
+            new Snapshot(
+                    0,
+                    0L,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    0L,
+                    Snapshot.CommitKind.APPEND,
+                    1000,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+
+    @Test
+    public void testFromJson() {
+        Tag tag = Tag.fromJson(snapshot.toJson());
+        Assert.assertEquals(
+                "{\n"
+                        + "  \"version\" : 3,\n"
+                        + "  \"id\" : 0,\n"
+                        + "  \"schemaId\" : 0,\n"
+                        + "  \"baseManifestList\" : null,\n"
+                        + "  \"deltaManifestList\" : null,\n"
+                        + "  \"changelogManifestList\" : null,\n"
+                        + "  \"commitUser\" : null,\n"
+                        + "  \"commitIdentifier\" : 0,\n"
+                        + "  \"commitKind\" : \"APPEND\",\n"
+                        + "  \"timeMillis\" : 1000,\n"
+                        + "  \"totalRecordCount\" : null,\n"
+                        + "  \"deltaRecordCount\" : null\n"
+                        + "}",
+                tag.toJson());
+    }
+
+    @Test
+    public void testFromSnapshotAndTagTtl() {
+        Tag tag =
+                Tag.fromSnapshotAndTagTtl(
+                        snapshot,
+                        Duration.ofSeconds(5),
+                        LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789));
+        String tagJson = tag.toJson();
+        Assert.assertEquals(
+                "{\n"
+                        + "  \"version\" : 3,\n"
+                        + "  \"id\" : 0,\n"
+                        + "  \"schemaId\" : 0,\n"
+                        + "  \"baseManifestList\" : null,\n"
+                        + "  \"deltaManifestList\" : null,\n"
+                        + "  \"changelogManifestList\" : null,\n"
+                        + "  \"commitUser\" : null,\n"
+                        + "  \"commitIdentifier\" : 0,\n"
+                        + "  \"commitKind\" : \"APPEND\",\n"
+                        + "  \"timeMillis\" : 1000,\n"
+                        + "  \"totalRecordCount\" : null,\n"
+                        + "  \"deltaRecordCount\" : null,\n"
+                        + "  \"tagCreateTime\" : [ 1969, 1, 1, 0, 0, 0, 
123456789 ],\n"
+                        + "  \"tagTimeRetained\" : 5.000000000\n"
+                        + "}",
+                tagJson);
+
+        Tag newTag = Tag.fromJson(tagJson);
+        Assert.assertEquals(tag, newTag);
+    }
+
+    @Test
+    public void testTagComparator() {
+        Tag tag1 =
+                new Tag(
+                        3,
+                        2L,
+                        0,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        8,
+                        Snapshot.CommitKind.APPEND,
+                        1000,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        LocalDateTime.now(),
+                        Duration.ofSeconds(10));
+
+        Tag tag2 =
+                new Tag(
+                        3,
+                        1L,
+                        0,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        8,
+                        Snapshot.CommitKind.APPEND,
+                        1000,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        LocalDateTime.now(),
+                        Duration.ofSeconds(10));
+
+        Tag tag3 =
+                new Tag(
+                        3,
+                        0L,
+                        0,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        8,
+                        Snapshot.CommitKind.APPEND,
+                        1000,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        LocalDateTime.now(),
+                        Duration.ofSeconds(10));
+        List<Tag> tags = new ArrayList<>();
+        tags.add(tag1);
+        tags.add(tag2);
+        tags.add(tag3);
+        tags.sort(Tag.TAG_COMPARATOR);
+        Assert.assertEquals(0, tags.get(0).id());
+        Assert.assertEquals(1, tags.get(1).id());
+        Assert.assertEquals(2, tags.get(2).id());
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
new file mode 100644
index 000000000..12f38931d
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.FileStoreTestUtils;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.tag.Tag;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
+import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for TagManager. */
+public class TagManagerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private final FileIO fileIO = new LocalFileIO();
+
+    private long commitIdentifier;
+    private String root;
+    private TagManager tagManager;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        commitIdentifier = 0L;
+        root = tempDir.toString();
+        tagManager = null;
+    }
+
+    @Test
+    public void testCreateTagWithoutTimeRetained() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
+        tagManager = new TagManager(fileIO, store.options().path());
+        SnapshotManager snapshotManager = store.snapshotManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // commit A to bucket 0 and B to bucket 1
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        for (int bucket : Arrays.asList(0, 1)) {
+            List<KeyValue> kvs = partitionedData(5, gen);
+            writeData(store, kvs, partition, bucket, writers);
+        }
+        commitData(store, commitIdentifier++, writers);
+
+        tagManager.createTag(
+                snapshotManager.snapshot(1),
+                "tag",
+                store.options().tagDefaultTimeRetained(),
+                Collections.emptyList());
+        assertThat(tagManager.tagExists("tag")).isTrue();
+        Snapshot snapshot = tagManager.taggedSnapshot("tag");
+        String snapshotJson = snapshot.toJson();
+        Assertions.assertTrue(
+                !snapshotJson.contains("tagCreateTime")
+                        && !snapshotJson.contains("tagTimeRetained"));
+        Assertions.assertEquals(snapshot, Snapshot.fromJson(snapshotJson));
+    }
+
+    @Test
+    public void testCreateTagWithTimeRetained() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
+        tagManager = new TagManager(fileIO, store.options().path());
+        SnapshotManager snapshotManager = store.snapshotManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // commit A to bucket 0 and B to bucket 1
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        for (int bucket : Arrays.asList(0, 1)) {
+            List<KeyValue> kvs = partitionedData(5, gen);
+            writeData(store, kvs, partition, bucket, writers);
+        }
+        commitData(store, commitIdentifier++, writers);
+
+        tagManager.createTag(
+                snapshotManager.snapshot(1), "tag", Duration.ofDays(1), 
Collections.emptyList());
+        assertThat(tagManager.tagExists("tag")).isTrue();
+        SortedMap<Tag, List<String>> tagsWithTimeRetained = 
tagManager.tagsWithTimeRetained();
+        Assertions.assertEquals(1, tagsWithTimeRetained.size());
+        Tag tag = tagsWithTimeRetained.firstKey();
+        String tagJson = tag.toJson();
+        Assertions.assertTrue(
+                tagJson.contains("tagCreateTime") && 
tagJson.contains("tagTimeRetained"));
+        Assertions.assertEquals(tag, Tag.fromJson(tagJson));
+        assertThat(tagsWithTimeRetained.get(tag)).contains("tag");
+    }
+
+    private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode, int buckets)
+            throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        CoreOptions.ChangelogProducer changelogProducer;
+        if (random.nextBoolean()) {
+            changelogProducer = CoreOptions.ChangelogProducer.INPUT;
+        } else {
+            changelogProducer = CoreOptions.ChangelogProducer.NONE;
+        }
+
+        RowType rowType, partitionType;
+        switch (mode) {
+            case NON_PARTITIONED:
+                rowType = TestKeyValueGenerator.NON_PARTITIONED_ROW_TYPE;
+                partitionType = 
TestKeyValueGenerator.NON_PARTITIONED_PART_TYPE;
+                break;
+            case SINGLE_PARTITIONED:
+                rowType = TestKeyValueGenerator.SINGLE_PARTITIONED_ROW_TYPE;
+                partitionType = 
TestKeyValueGenerator.SINGLE_PARTITIONED_PART_TYPE;
+                break;
+            case MULTI_PARTITIONED:
+                rowType = TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+                partitionType = TestKeyValueGenerator.DEFAULT_PART_TYPE;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported generator 
mode: " + mode);
+        }
+
+        SchemaManager schemaManager = new SchemaManager(fileIO, new 
Path(root));
+        TableSchema tableSchema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                partitionType.getFieldNames(),
+                                TestKeyValueGenerator.getPrimaryKeys(mode),
+                                Collections.emptyMap(),
+                                null));
+
+        return new TestFileStore.Builder(
+                        "avro",
+                        root,
+                        buckets,
+                        partitionType,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        rowType,
+                        
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                        DeduplicateMergeFunction.factory(),
+                        tableSchema)
+                .changelogProducer(changelogProducer)
+                .build();
+    }
+
+    private void writeData(
+            TestFileStore store,
+            List<KeyValue> kvs,
+            BinaryRow partition,
+            int bucket,
+            Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
+            throws Exception {
+        writers.computeIfAbsent(partition, p -> new HashMap<>())
+                .put(bucket, FileStoreTestUtils.writeData(store, kvs, 
partition, bucket));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
index 0e500c3e7..f1455d377 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
@@ -18,12 +18,16 @@
 
 package org.apache.paimon.flink.action;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.Map;
 
 /** Create tag action for Flink. */
 public class CreateTagAction extends TableActionBase {
 
     private final String tagName;
+    private final Duration timeRetained;
     private final Long snapshotId;
 
     public CreateTagAction(
@@ -32,18 +36,20 @@ public class CreateTagAction extends TableActionBase {
             String tableName,
             Map<String, String> catalogConfig,
             String tagName,
+            @Nullable Duration timeRetained,
             Long snapshotId) {
         super(warehouse, databaseName, tableName, catalogConfig);
         this.tagName = tagName;
+        this.timeRetained = timeRetained;
         this.snapshotId = snapshotId;
     }
 
     @Override
     public void run() throws Exception {
         if (snapshotId == null) {
-            table.createTag(tagName);
+            table.createTag(tagName, timeRetained);
         } else {
-            table.createTag(tagName, snapshotId);
+            table.createTag(tagName, timeRetained, snapshotId);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
index 01bbdba42..6cca76cb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
@@ -18,8 +18,11 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.utils.TimeUtils;
+
 import org.apache.flink.api.java.tuple.Tuple3;
 
+import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 
@@ -30,6 +33,7 @@ public class CreateTagActionFactory implements ActionFactory {
 
     private static final String TAG_NAME = "tag_name";
     private static final String SNAPSHOT = "snapshot";
+    private static final String TIME_RETAINED = "time_retained";
 
     @Override
     public String identifier() {
@@ -49,9 +53,20 @@ public class CreateTagActionFactory implements ActionFactory 
{
             snapshot = Long.parseLong(params.get(SNAPSHOT));
         }
 
+        Duration timeRetained = null;
+        if (params.has(TIME_RETAINED)) {
+            timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED));
+        }
+
         CreateTagAction action =
                 new CreateTagAction(
-                        tablePath.f0, tablePath.f1, tablePath.f2, 
catalogConfig, tagName, snapshot);
+                        tablePath.f0,
+                        tablePath.f1,
+                        tablePath.f2,
+                        catalogConfig,
+                        tagName,
+                        timeRetained,
+                        snapshot);
         return Optional.of(action);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 9999fdf11..3bdc2e07e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
@@ -30,7 +31,7 @@ import javax.annotation.Nullable;
  * Create tag procedure. Usage:
  *
  * <pre><code>
- *  CALL sys.create_tag('tableId', 'tagName', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', 'timeRetained', snapshotId)
  * </code></pre>
  */
 public class CreateTagProcedure extends ProcedureBase {
@@ -38,23 +39,29 @@ public class CreateTagProcedure extends ProcedureBase {
     public static final String IDENTIFIER = "create_tag";
 
     public String[] call(
-            ProcedureContext procedureContext, String tableId, String tagName, 
long snapshotId)
+            ProcedureContext procedureContext,
+            String tableId,
+            String tagName,
+            String timeRetained,
+            long snapshotId)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, snapshotId);
+        return innerCall(tableId, tagName, timeRetained, snapshotId);
     }
 
-    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
+    public String[] call(
+            ProcedureContext procedureContext, String tableId, String tagName, 
String timeRetained)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, null);
+        return innerCall(tableId, tagName, timeRetained, null);
     }
 
-    private String[] innerCall(String tableId, String tagName, @Nullable Long 
snapshotId)
+    private String[] innerCall(
+            String tableId, String tagName, String timeRetained, @Nullable 
Long snapshotId)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         if (snapshotId == null) {
-            table.createTag(tagName);
+            table.createTag(tagName, TimeUtils.parseDuration(timeRetained));
         } else {
-            table.createTag(tagName, snapshotId);
+            table.createTag(tagName, TimeUtils.parseDuration(timeRetained), 
snapshotId);
         }
         return new String[] {"Success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index dcf2c8b00..6d27c6019 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
@@ -76,6 +77,8 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
 
     private final NavigableSet<Long> identifiersForTags;
 
+    private final Duration tagTimeRetained;
+
     private transient SnapshotManager snapshotManager;
 
     private transient TagManager tagManager;
@@ -91,13 +94,15 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
             SerializableSupplier<SnapshotManager> snapshotManagerFactory,
             SerializableSupplier<TagManager> tagManagerFactory,
             SerializableSupplier<TagDeletion> tagDeletionFactory,
-            SerializableSupplier<List<TagCallback>> callbacksSupplier) {
+            SerializableSupplier<List<TagCallback>> callbacksSupplier,
+            Duration tagTimeRetained) {
         this.commitOperator = commitOperator;
         this.tagManagerFactory = tagManagerFactory;
         this.snapshotManagerFactory = snapshotManagerFactory;
         this.tagDeletionFactory = tagDeletionFactory;
         this.callbacksSupplier = callbacksSupplier;
         this.identifiersForTags = new TreeSet<>();
+        this.tagTimeRetained = tagTimeRetained;
     }
 
     @Override
@@ -167,7 +172,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
         for (Snapshot snapshot : snapshotForTags) {
             String tagName = SAVEPOINT_TAG_PREFIX + 
snapshot.commitIdentifier();
             if (!tagManager.tagExists(tagName)) {
-                tagManager.createTag(snapshot, tagName, callbacks);
+                tagManager.createTag(snapshot, tagName, tagTimeRetained, 
callbacks);
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index 2c898831e..23202b450 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -118,7 +118,11 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
                         tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
             }
             // Create a new tag
-            tagManager.createTag(snapshot, tagName, 
table.store().createTagCallbacks());
+            tagManager.createTag(
+                    snapshot,
+                    tagName,
+                    table.coreOptions().tagDefaultTimeRetained(),
+                    table.store().createTagCallbacks());
             // Expire the tag
             expireTag();
         } catch (Exception e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f7ebf5afd..fa4526897 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -238,7 +238,8 @@ public abstract class FlinkSink<T> implements Serializable {
                             table::snapshotManager,
                             table::tagManager,
                             () -> table.store().newTagDeletion(),
-                            () -> table.store().createTagCallbacks());
+                            () -> table.store().createTagCallbacks(),
+                            table.coreOptions().tagDefaultTimeRetained());
         }
         if (conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
                 && table.coreOptions().tagCreationMode() == 
TagCreationMode.BATCH) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 8d445ab95..eb6f70f65 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -36,6 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for branch management actions. */
 class BranchActionITCase extends ActionITCaseBase {
+
     @Test
     void testCreateAndDeleteBranch() throws Exception {
 
@@ -63,7 +64,8 @@ class BranchActionITCase extends ActionITCaseBase {
 
         TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
         callProcedure(
-                String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+                String.format(
+                        "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", 
database, tableName));
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
         BranchManager branchManager = table.branchManager();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index a01849ed9..a650ad31b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -83,7 +83,8 @@ public class TagActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+                    String.format(
+                            "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
@@ -153,7 +154,8 @@ public class TagActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+                    String.format(
+                            "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isTrue();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index 0dbde0579..3b58c24d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -208,7 +208,8 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
                 table::snapshotManager,
                 table::tagManager,
                 () -> table.store().newTagDeletion(),
-                () -> table.store().createTagCallbacks());
+                () -> table.store().createTagCallbacks(),
+                table.store().options().tagDefaultTimeRetained());
     }
 
     @Override
@@ -224,6 +225,7 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
                 table::snapshotManager,
                 table::tagManager,
                 () -> table.store().newTagDeletion(),
-                () -> table.store().createTagCallbacks());
+                () -> table.store().createTagCallbacks(),
+                table.store().options().tagDefaultTimeRetained());
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 4d9753bab..88878c2c7 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -913,7 +913,7 @@ public abstract class HiveCatalogITCaseBase {
                         "    'metastore.tag-to-partition' = 'dt'",
                         ")"));
         tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 
1)");
 
         assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
                 .containsExactlyInAnyOrder("dt=2023-10-16");
@@ -927,7 +927,7 @@ public abstract class HiveCatalogITCaseBase {
         // another tag
 
         tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 
2)");
 
         assertThat(hiveShell.executeQuery("SELECT * FROM t"))
                 .containsExactlyInAnyOrder(
@@ -951,9 +951,9 @@ public abstract class HiveCatalogITCaseBase {
                         + "    'metastore.tag-to-partition' = 'dt'\n"
                         + ")");
         tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 
1)");
         tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 
2)");
 
         assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
                 .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17");
@@ -979,7 +979,7 @@ public abstract class HiveCatalogITCaseBase {
                         "    'metastore.tag-to-partition' = 'dt'",
                         ")"));
         tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 
1)");
 
         assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
                 .containsExactlyInAnyOrder("dt=2023-10-16");
@@ -991,7 +991,7 @@ public abstract class HiveCatalogITCaseBase {
                 .containsExactlyInAnyOrder("1\t10\t2023-10-16", 
"2\t20\t2023-10-16");
 
         tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 
2)");
 
         tEnv.executeSql("ALTER TABLE t ADD z INT");
         tEnv.executeSql("INSERT INTO t VALUES (3, 30, 5), (4, 40, 6)").await();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
index 7f9bdb621..38d181bac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.utils.TimeUtils;
+
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -26,6 +28,8 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
+import java.time.Duration;
+
 import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -36,6 +40,7 @@ public class CreateTagProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.required("tag", StringType),
+                ProcedureParameter.optional("time_retained", StringType),
                 ProcedureParameter.optional("snapshot", LongType)
             };
 
@@ -63,15 +68,17 @@ public class CreateTagProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String tag = args.getString(1);
-        Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
+        Duration timeRetained =
+                args.isNullAt(2) ? null : 
TimeUtils.parseDuration(args.getString(2));
+        Long snapshot = args.isNullAt(3) ? null : args.getLong(3);
 
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
                     if (snapshot == null) {
-                        table.createTag(tag);
+                        table.createTag(tag, timeRetained);
                     } else {
-                        table.createTag(tag, snapshot);
+                        table.createTag(tag, timeRetained, snapshot);
                     }
                     InternalRow outputRow = newInternalRow(true);
                     return new InternalRow[] {outputRow};
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index e505ae110..3c9531a8a 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -70,7 +70,8 @@ class CreateAndDeleteTagProcedureTest extends 
PaimonSparkTestBase with StreamTes
             checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
             checkAnswer(
               spark.sql(
-                "CALL paimon.sys.create_tag(table => 'test.T', tag => 
'test_tag', snapshot => 2)"),
+                "CALL paimon.sys.create_tag(" +
+                  "table => 'test.T', tag => 'test_tag', time_retained => '5 
d', snapshot => 2)"),
               Row(true) :: Nil)
             checkAnswer(
               spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),

Reply via email to