This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 0bd955cb30c015d918e886f2fa61eb70ae697da8
Author: Jingsong <[email protected]>
AuthorDate: Tue Apr 16 10:30:47 2024 +0800

    [core] Adjust codes TTL for tag
    
    This closes #3159
---
 docs/content/maintenance/manage-tags.md            |   6 +-
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   3 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |   1 -
 .../src/main/java/org/apache/paimon/FileStore.java |   1 -
 .../src/main/java/org/apache/paimon/Snapshot.java  |   8 +-
 .../paimon/table/AbstractFileStoreTable.java       |  14 +-
 .../org/apache/paimon/table/ReadonlyTable.java     |  11 +-
 .../main/java/org/apache/paimon/table/Table.java   |   6 +-
 .../apache/paimon/table/sink/TableCommitImpl.java  |   8 +-
 .../org/apache/paimon/table/system/TagsTable.java  |  10 +-
 .../src/main/java/org/apache/paimon/tag/Tag.java   | 165 +--------------------
 .../org/apache/paimon/tag/TagAutoCreation.java     |  47 +++++-
 .../java/org/apache/paimon/tag/TagAutoExpire.java  | 140 -----------------
 .../java/org/apache/paimon/tag/TagAutoManager.java |  15 +-
 .../java/org/apache/paimon/tag/TagTimeExpire.java  |  82 ++++++++++
 .../org/apache/paimon/utils/SnapshotManager.java   |   5 +-
 .../java/org/apache/paimon/utils/TagManager.java   |  71 ++++-----
 .../apache/paimon/table/system/TagsTableTest.java  |  42 +++---
 .../test/java/org/apache/paimon/tag/TagTest.java   |  79 ----------
 .../org/apache/paimon/utils/TagManagerTest.java    |   9 +-
 .../paimon/flink/action/CreateTagAction.java       |  10 +-
 .../flink/action/CreateTagActionFactory.java       |   4 +-
 .../paimon/flink/procedure/CreateTagProcedure.java |  41 ++++-
 .../paimon/flink/action/BranchActionITCase.java    |   2 +-
 .../paimon/flink/action/TagActionITCase.java       |   6 +-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  12 +-
 .../paimon/spark/procedure/CreateTagProcedure.java |  10 +-
 28 files changed, 289 insertions(+), 521 deletions(-)

diff --git a/docs/content/maintenance/manage-tags.md 
b/docs/content/maintenance/manage-tags.md
index 26b657ebb..75a449665 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -108,8 +108,8 @@ You can create a tag with given name and snapshot ID.
     --database <database-name> \ 
     --table <table-name> \
     --tag_name <tag-name> \
-    --time_retained <time-retained> \
     [--snapshot <snapshot_id>] \
+    [--time_retained <time-retained>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
 ```
 
@@ -127,7 +127,7 @@ public class CreateTag {
     public static void main(String[] args) {
         Table table = ...;
         table.createTag("my-tag", 1);
-        table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1);
+        table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12));
     }
 }
 ```
@@ -142,7 +142,7 @@ CALL create_tag(table => 'test.t', tag => 'test_tag', 
snapshot => 2);
 
 To create a tag with retained 1 day, run the following sql:
 ```sql
-CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', 
snapshot => 2);
+CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, 
time_retained => '1 d');
 ```
 
 To create a tag based on the latest snapshot id, run the following sql:
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2e7c4ae17..48822af1b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -682,7 +682,7 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td><h5>tag.default-time-retained</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Duration</td>
-            <td>The maximum default time retained for all tags.</td>
+            <td>The default maximum time retained for newly created tags.</td>
         </tr>
         <tr>
             <td><h5>tag.num-retained-max</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index ac5a4e022..2440bdcc6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1024,7 +1024,7 @@ public class CoreOptions implements Serializable {
             key("tag.default-time-retained")
                     .durationType()
                     .noDefaultValue()
-                    .withDescription("The maximum default time retained for 
all tags.");
+                    .withDescription("The default maximum time retained for 
newly created tags.");
 
     public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT 
=
             key("snapshot.watermark-idle-timeout")
@@ -1655,6 +1655,7 @@ public class CoreOptions implements Serializable {
         return options.get(TAG_PERIOD_FORMATTER);
     }
 
+    @Nullable
     public Integer tagNumRetainedMax() {
         return options.get(TAG_NUM_RETAINED_MAX);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index b7830f0ac..07a73d3de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -248,7 +248,6 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     }
 
     @Override
-    @Nullable
     public TagAutoManager newTagCreationManager() {
         return TagAutoManager.create(
                 options,
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1d9e247d4..870feffde 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -92,7 +92,6 @@ public interface FileStore<T> extends Serializable {
     @Nullable
     PartitionExpire newPartitionExpire(String commitUser);
 
-    @Nullable
     TagAutoManager newTagCreationManager();
 
     ServiceManager newServiceManager();
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 0c42c4e43..fefc41058 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -42,7 +42,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 
 /**
  * This file is the entrance to all data committed at some specific time point.
@@ -447,12 +446,13 @@ public class Snapshot {
         }
     }
 
-    public static Optional<Snapshot> safelyFromPath(FileIO fileIO, Path path) 
throws IOException {
+    @Nullable
+    public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws 
IOException {
         try {
             String json = fileIO.readFileUtf8(path);
-            return Optional.of(Snapshot.fromJson(json));
+            return Snapshot.fromJson(json);
         } catch (FileNotFoundException e) {
-            return Optional.empty();
+            return null;
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 871bba4af..1ca321d56 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -488,29 +488,29 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public void createTag(String tagName, long fromSnapshotId) {
         createTag(
-                tagName, coreOptions().tagDefaultTimeRetained(), 
createTagInternal(fromSnapshotId));
+                tagName, createTagInternal(fromSnapshotId), 
coreOptions().tagDefaultTimeRetained());
     }
 
     @Override
-    public void createTag(String tagName, @Nullable Duration timeRetained, 
long fromSnapshotId) {
-        createTag(tagName, timeRetained, createTagInternal(fromSnapshotId));
+    public void createTag(String tagName, long fromSnapshotId, Duration 
timeRetained) {
+        createTag(tagName, createTagInternal(fromSnapshotId), timeRetained);
     }
 
     @Override
     public void createTag(String tagName) {
         Snapshot latestSnapshot = snapshotManager().latestSnapshot();
         checkNotNull(latestSnapshot, "Cannot create tag because latest 
snapshot doesn't exist.");
-        createTag(tagName, coreOptions().tagDefaultTimeRetained(), 
latestSnapshot);
+        createTag(tagName, latestSnapshot, 
coreOptions().tagDefaultTimeRetained());
     }
 
     @Override
-    public void createTag(String tagName, @Nullable Duration timeRetained) {
+    public void createTag(String tagName, Duration timeRetained) {
         Snapshot latestSnapshot = snapshotManager().latestSnapshot();
         checkNotNull(latestSnapshot, "Cannot create tag because latest 
snapshot doesn't exist.");
-        createTag(tagName, timeRetained, latestSnapshot);
+        createTag(tagName, latestSnapshot, timeRetained);
     }
 
-    private void createTag(String tagName, @Nullable Duration timeRetained, 
Snapshot fromSnapshot) {
+    private void createTag(String tagName, Snapshot fromSnapshot, @Nullable 
Duration timeRetained) {
         tagManager().createTag(fromSnapshot, tagName, timeRetained, 
store().createTagCallbacks());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index be823156c..be4976f10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.table;
 
-import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.InnerTableCommit;
@@ -26,8 +25,6 @@ import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 
-import javax.annotation.Nullable;
-
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -113,8 +110,8 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
-    @Experimental
-    default void createTag(String tagName, @Nullable Duration timeRetained, 
long fromSnapshotId) {
+    @Override
+    default void createTag(String tagName, long fromSnapshotId, Duration 
timeRetained) {
         throw new UnsupportedOperationException(
                 String.format(
                         "Readonly Table %s does not support createTag.",
@@ -129,8 +126,8 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
-    @Experimental
-    default void createTag(String tagName, @Nullable Duration timeRetained) {
+    @Override
+    default void createTag(String tagName, Duration timeRetained) {
         throw new UnsupportedOperationException(
                 String.format(
                         "Readonly Table %s does not support createTag.",
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 3d46d25a1..3650b773c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -26,8 +26,6 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.RowType;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.List;
@@ -80,14 +78,14 @@ public interface Table extends Serializable {
     void createTag(String tagName, long fromSnapshotId);
 
     @Experimental
-    void createTag(String tagName, @Nullable Duration timeRetained, long 
fromSnapshotId);
+    void createTag(String tagName, long fromSnapshotId, Duration timeRetained);
 
     /** Create a tag from latest snapshot. */
     @Experimental
     void createTag(String tagName);
 
     @Experimental
-    void createTag(String tagName, @Nullable Duration timeRetained);
+    void createTag(String tagName, Duration timeRetained);
 
     /** Delete a tag by name. */
     @Experimental
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 9d471ee57..eab7a0c6d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -134,11 +134,9 @@ public class TableCommitImpl implements InnerTableCommit {
         if (this.forceCreatingSnapshot) {
             return true;
         }
-        if (tagAutoManager != null) {
-            return tagAutoManager.getTagAutoCreation() != null
-                    && 
tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
-        }
-        return false;
+        return tagAutoManager != null
+                && tagAutoManager.getTagAutoCreation() != null
+                && tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index e51ebcb0d..21c9cf495 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.SerializationUtils;
 import org.apache.paimon.utils.TagManager;
@@ -61,7 +62,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.SortedMap;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -208,12 +208,10 @@ public class TagsTable implements ReadonlyTable {
             Options options = new Options();
             options.set(CoreOptions.PATH, location.toUri().toString());
             FileStoreTable table = FileStoreTableFactory.create(fileIO, 
options);
-            SortedMap<Tag, List<String>> tags = 
table.tagManager().tagsWithTimeRetained();
+            List<Pair<Tag, String>> tags = table.tagManager().tagObjects();
             Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
-            for (Map.Entry<Tag, List<String>> tag : tags.entrySet()) {
-                for (String tagName : tag.getValue()) {
-                    nameToSnapshot.put(tagName, tag.getKey());
-                }
+            for (Pair<Tag, String> tag : tags) {
+                nameToSnapshot.put(tag.getValue(), tag.getKey());
             }
             Map<String, List<String>> tagBranches = new HashMap<>();
             table.branchManager()
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
index 58dbf430b..938bb417d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -34,16 +34,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.LocalDateTime;
-import java.util.Comparator;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 
 /** Snapshot with tagCreateTime and tagTimeRetained. */
 public class Tag extends Snapshot {
 
-    public static final Comparator<Tag> TAG_COMPARATOR = new TagComparator();
-
     private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime";
     private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained";
 
@@ -110,6 +106,7 @@ public class Tag extends Snapshot {
         return tagTimeRetained;
     }
 
+    @Override
     public String toJson() {
         return JsonSerdeUtil.toJson(this);
     }
@@ -127,12 +124,13 @@ public class Tag extends Snapshot {
         }
     }
 
-    public static Optional<Tag> safelyFromTagPath(FileIO fileIO, Path path) 
throws IOException {
+    @Nullable
+    public static Tag safelyFromPath(FileIO fileIO, Path path) throws 
IOException {
         try {
             String json = fileIO.readFileUtf8(path);
-            return Optional.of(Tag.fromJson(json));
+            return Tag.fromJson(json);
         } catch (FileNotFoundException e) {
-            return Optional.empty();
+            return null;
         }
     }
 
@@ -160,7 +158,7 @@ public class Tag extends Snapshot {
                 tagTimeRetained);
     }
 
-    public Snapshot toSnapshot() {
+    public Snapshot trimToSnapshot() {
         return new Snapshot(
                 version,
                 id,
@@ -201,155 +199,4 @@ public class Tag extends Snapshot {
         return Objects.equals(tagCreateTime, that.tagCreateTime)
                 && Objects.equals(tagTimeRetained, that.tagTimeRetained);
     }
-
-    private static class TagComparator implements Comparator<Tag> {
-        @Override
-        public int compare(Tag tag1, Tag tag2) {
-            int comparisonResult = 0;
-
-            // Compare id
-            comparisonResult = Long.compare(tag1.id, tag2.id);
-            if (comparisonResult != 0) {
-                return comparisonResult;
-            }
-
-            // Compare tagCreateTime
-            if (tag1.tagCreateTime != null && tag2.tagCreateTime != null) {
-                comparisonResult = 
tag1.tagCreateTime.compareTo(tag2.tagCreateTime);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare tagTimeRetained
-            if (tag1.tagTimeRetained != null && tag2.tagTimeRetained != null) {
-                comparisonResult = 
tag1.tagTimeRetained.compareTo(tag2.tagTimeRetained);
-            }
-
-            // Compare version
-            if (tag1.version != null && tag2.version != null) {
-                comparisonResult = Integer.compare(tag1.version, tag2.version);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare schemaId
-            comparisonResult = Long.compare(tag1.schemaId, tag2.schemaId);
-            if (comparisonResult != 0) {
-                return comparisonResult;
-            }
-
-            // Compare baseManifestList
-            if (tag1.baseManifestList != null && tag2.baseManifestList != 
null) {
-                comparisonResult = 
tag1.baseManifestList.compareTo(tag2.baseManifestList);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare deltaManifestList
-            if (tag1.deltaManifestList != null && tag2.deltaManifestList != 
null) {
-                comparisonResult = 
tag1.deltaManifestList.compareTo(tag2.deltaManifestList);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare changelogManifestList
-            if (tag1.changelogManifestList != null && 
tag2.changelogManifestList != null) {
-                comparisonResult = 
tag1.changelogManifestList.compareTo(tag2.changelogManifestList);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare indexManifest
-            if (tag1.indexManifest != null && tag2.indexManifest != null) {
-                comparisonResult = 
tag1.indexManifest.compareTo(tag2.indexManifest);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare commitUser
-            if (tag1.commitUser != null && tag2.commitUser != null) {
-                comparisonResult = tag1.commitUser.compareTo(tag2.commitUser);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare commitIdentifier
-            comparisonResult = Long.compare(tag1.commitIdentifier, 
tag2.commitIdentifier);
-            if (comparisonResult != 0) {
-                return comparisonResult;
-            }
-
-            // Compare commitKind
-            if (tag1.commitKind != null && tag2.commitKind != null) {
-                comparisonResult = tag1.commitKind.compareTo(tag2.commitKind);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare timeMillis
-            comparisonResult = Long.compare(tag1.timeMillis, tag2.timeMillis);
-            if (comparisonResult != 0) {
-                return comparisonResult;
-            }
-
-            // Compare logOffsets
-            if (tag1.logOffsets != null && tag2.logOffsets != null) {
-                comparisonResult = Integer.compare(tag1.logOffsets.size(), 
tag2.logOffsets.size());
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare totalRecordCount
-            if (tag1.totalRecordCount != null && tag2.totalRecordCount != 
null) {
-                comparisonResult = Long.compare(tag1.totalRecordCount, 
tag2.totalRecordCount);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare deltaRecordCount
-            if (tag1.deltaRecordCount != null && tag2.deltaRecordCount != 
null) {
-                comparisonResult = Long.compare(tag1.deltaRecordCount, 
tag2.deltaRecordCount);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare changelogRecordCount
-            if (tag1.changelogRecordCount != null && tag2.changelogRecordCount 
!= null) {
-                comparisonResult =
-                        Long.compare(tag1.changelogRecordCount, 
tag2.changelogRecordCount);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare watermark
-            if (tag1.watermark != null && tag2.watermark != null) {
-                comparisonResult = Long.compare(tag1.watermark, 
tag2.watermark);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            // Compare statistics
-            if (tag1.statistics != null && tag2.statistics != null) {
-                comparisonResult = tag1.statistics.compareTo(tag2.statistics);
-                if (comparisonResult != 0) {
-                    return comparisonResult;
-                }
-            }
-
-            return comparisonResult;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 41e1e8d16..409ceb3dc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -20,12 +20,16 @@ package org.apache.paimon.tag;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
 import org.apache.paimon.tag.TagTimeExtractor.WatermarkExtractor;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.time.Duration;
@@ -43,12 +47,16 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
 /** A manager to create tags automatically. */
 public class TagAutoCreation {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TagAutoCreation.class);
+
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
+    private final TagDeletion tagDeletion;
     private final TagTimeExtractor timeExtractor;
     private final TagPeriodHandler periodHandler;
     private final Duration delay;
-    private final Duration timeRetained;
+    @Nullable private final Integer numRetainedMax;
+    @Nullable private final Duration defaultTimeRetained;
     private final List<TagCallback> callbacks;
     private final Duration idlenessTimeout;
 
@@ -58,18 +66,22 @@ public class TagAutoCreation {
     private TagAutoCreation(
             SnapshotManager snapshotManager,
             TagManager tagManager,
+            TagDeletion tagDeletion,
             TagTimeExtractor timeExtractor,
             TagPeriodHandler periodHandler,
             Duration delay,
-            @Nullable Duration timeRetained,
+            @Nullable Integer numRetainedMax,
+            @Nullable Duration defaultTimeRetained,
             Duration idlenessTimeout,
             List<TagCallback> callbacks) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
+        this.tagDeletion = tagDeletion;
         this.timeExtractor = timeExtractor;
         this.periodHandler = periodHandler;
         this.delay = delay;
-        this.timeRetained = timeRetained;
+        this.numRetainedMax = numRetainedMax;
+        this.defaultTimeRetained = defaultTimeRetained;
         this.callbacks = callbacks;
         this.idlenessTimeout = idlenessTimeout;
 
@@ -140,8 +152,32 @@ public class TagAutoCreation {
                 || isAfterOrEqual(time.minus(delay), 
periodHandler.nextTagTime(nextTag))) {
             LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
             String tagName = periodHandler.timeToTag(thisTag);
-            tagManager.createTag(snapshot, tagName, timeRetained, callbacks);
+            tagManager.createTag(snapshot, tagName, defaultTimeRetained, 
callbacks);
             nextTag = periodHandler.nextTagTime(thisTag);
+
+            if (numRetainedMax != null) {
+                // only handle auto-created tags here
+                SortedMap<Snapshot, List<String>> tags = 
tagManager.tags(periodHandler::isAutoTag);
+                if (tags.size() > numRetainedMax) {
+                    int toDelete = tags.size() - numRetainedMax;
+                    int i = 0;
+                    for (List<String> tag : tags.values()) {
+                        LOG.info(
+                                "Delete tag {}, because the number of 
auto-created tags reached numRetainedMax of {}.",
+                                tagName,
+                                numRetainedMax);
+                        tagManager.deleteTag(
+                                checkAndGetOneAutoTag(tag),
+                                tagDeletion,
+                                snapshotManager,
+                                callbacks);
+                        i++;
+                        if (i == toDelete) {
+                            break;
+                        }
+                    }
+                }
+            }
         }
     }
 
@@ -163,6 +199,7 @@ public class TagAutoCreation {
             CoreOptions options,
             SnapshotManager snapshotManager,
             TagManager tagManager,
+            TagDeletion tagDeletion,
             List<TagCallback> callbacks) {
         TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(options);
         if (extractor == null) {
@@ -171,9 +208,11 @@ public class TagAutoCreation {
         return new TagAutoCreation(
                 snapshotManager,
                 tagManager,
+                tagDeletion,
                 extractor,
                 TagPeriodHandler.create(options),
                 options.tagCreationDelay(),
+                options.tagNumRetainedMax(),
                 options.tagDefaultTimeRetained(),
                 options.snapshotWatermarkIdleTimeout(),
                 callbacks);
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java
deleted file mode 100644
index e31a926b5..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.tag;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.TagDeletion;
-import org.apache.paimon.table.sink.TagCallback;
-import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.TagManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-
-/** A manager to expire tags. */
-public class TagAutoExpire {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TagAutoExpire.class);
-
-    private final SnapshotManager snapshotManager;
-    private final TagManager tagManager;
-    private final TagDeletion tagDeletion;
-    private final TagPeriodHandler periodHandler;
-    private final Integer numRetainedMax;
-    private final List<TagCallback> callbacks;
-
-    private TagAutoExpire(
-            SnapshotManager snapshotManager,
-            TagManager tagManager,
-            TagDeletion tagDeletion,
-            TagPeriodHandler periodHandler,
-            Duration delay,
-            Integer numRetainedMax,
-            List<TagCallback> callbacks) {
-        this.snapshotManager = snapshotManager;
-        this.tagManager = tagManager;
-        this.tagDeletion = tagDeletion;
-        this.periodHandler = periodHandler;
-        this.numRetainedMax = numRetainedMax;
-        this.callbacks = callbacks;
-        this.periodHandler.validateDelay(delay);
-    }
-
-    public void run() {
-        Set<String> deleteTags = new HashSet<>();
-        deleteTags.addAll(getExpireTagsByNumRetainedMax());
-        deleteTags.addAll(getExpireTagsByTimeRetained());
-        deleteTags.forEach(
-                tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager, 
callbacks));
-    }
-
-    private Set<String> getExpireTagsByNumRetainedMax() {
-        Set<String> deleteTags = new HashSet<>();
-        if (numRetainedMax != null) {
-            // only handle auto-created tags here
-            SortedMap<Snapshot, List<String>> tags = 
tagManager.tags(periodHandler::isAutoTag);
-            if (tags.size() > numRetainedMax) {
-                int toDelete = tags.size() - numRetainedMax;
-                int i = 0;
-                for (List<String> tag : tags.values()) {
-                    String tagName = 
TagAutoCreation.checkAndGetOneAutoTag(tag);
-                    LOG.info(
-                            "Delete tag {}, because the number of auto-created 
tags reached numRetainedMax of {}.",
-                            tagName,
-                            numRetainedMax);
-                    deleteTags.add(tagName);
-                    i++;
-                    if (i == toDelete) {
-                        break;
-                    }
-                }
-            }
-        }
-        return deleteTags;
-    }
-
-    private Set<String> getExpireTagsByTimeRetained() {
-        // handle auto-created and non-auto-created-tags here
-        Set<String> deleteTags = new HashSet<>();
-        SortedMap<Tag, List<String>> tags = tagManager.tagsWithTimeRetained();
-        for (Map.Entry<Tag, List<String>> entry : tags.entrySet()) {
-            Tag tag = entry.getKey();
-            LocalDateTime createTime = tag.getTagCreateTime();
-            Duration timeRetained = tag.getTagTimeRetained();
-            if (createTime == null || timeRetained == null) {
-                continue;
-            }
-            if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) {
-                for (String tagName : entry.getValue()) {
-                    LOG.info(
-                            "Delete tag {}, because its existence time has 
reached its timeRetained of {}.",
-                            tagName,
-                            timeRetained);
-                    deleteTags.add(tagName);
-                }
-            }
-        }
-        return deleteTags;
-    }
-
-    public static TagAutoExpire create(
-            CoreOptions options,
-            SnapshotManager snapshotManager,
-            TagManager tagManager,
-            TagDeletion tagDeletion,
-            List<TagCallback> callbacks) {
-        return new TagAutoExpire(
-                snapshotManager,
-                tagManager,
-                tagDeletion,
-                TagPeriodHandler.create(options),
-                options.tagCreationDelay(),
-                options.tagNumRetainedMax(),
-                callbacks);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
index a99155947..387a3e746 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
@@ -30,19 +30,19 @@ import java.util.List;
 public class TagAutoManager {
 
     private final TagAutoCreation tagAutoCreation;
-    private final TagAutoExpire tagAutoExpire;
+    private final TagTimeExpire tagTimeExpire;
 
-    private TagAutoManager(TagAutoCreation tagAutoCreation, TagAutoExpire 
tagAutoExpire) {
+    private TagAutoManager(TagAutoCreation tagAutoCreation, TagTimeExpire 
tagTimeExpire) {
         this.tagAutoCreation = tagAutoCreation;
-        this.tagAutoExpire = tagAutoExpire;
+        this.tagTimeExpire = tagTimeExpire;
     }
 
     public void run() {
         if (tagAutoCreation != null) {
             tagAutoCreation.run();
         }
-        if (tagAutoExpire != null) {
-            tagAutoExpire.run();
+        if (tagTimeExpire != null) {
+            tagTimeExpire.run();
         }
     }
 
@@ -57,8 +57,9 @@ public class TagAutoManager {
         return new TagAutoManager(
                 extractor == null
                         ? null
-                        : TagAutoCreation.create(options, snapshotManager, 
tagManager, callbacks),
-                TagAutoExpire.create(options, snapshotManager, tagManager, 
tagDeletion, callbacks));
+                        : TagAutoCreation.create(
+                                options, snapshotManager, tagManager, 
tagDeletion, callbacks),
+                TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, 
callbacks));
     }
 
     public TagAutoCreation getTagAutoCreation() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java
new file mode 100644
index 000000000..d4797c0cb
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/** A manager to expire tags by time. */
+public class TagTimeExpire {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TagTimeExpire.class);
+
+    private final SnapshotManager snapshotManager;
+    private final TagManager tagManager;
+    private final TagDeletion tagDeletion;
+    private final List<TagCallback> callbacks;
+
+    private TagTimeExpire(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            TagDeletion tagDeletion,
+            List<TagCallback> callbacks) {
+        this.snapshotManager = snapshotManager;
+        this.tagManager = tagManager;
+        this.tagDeletion = tagDeletion;
+        this.callbacks = callbacks;
+    }
+
+    public void run() {
+        List<Pair<Tag, String>> tags = tagManager.tagObjects();
+        for (Pair<Tag, String> pair : tags) {
+            Tag tag = pair.getLeft();
+            String tagName = pair.getRight();
+            LocalDateTime createTime = tag.getTagCreateTime();
+            Duration timeRetained = tag.getTagTimeRetained();
+            if (createTime == null || timeRetained == null) {
+                continue;
+            }
+            if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) {
+                LOG.info(
+                        "Delete tag {}, because its existence time has reached 
its timeRetained of {}.",
+                        tagName,
+                        timeRetained);
+                tagManager.deleteTag(tagName, tagDeletion, snapshotManager, 
callbacks);
+            }
+        }
+    }
+
+    public static TagTimeExpire create(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            TagDeletion tagDeletion,
+            List<TagCallback> callbacks) {
+        return new TagTimeExpire(snapshotManager, tagManager, tagDeletion, 
callbacks);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 174b4233c..dbbc8fffd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -394,7 +394,10 @@ public class SnapshotManager implements Serializable {
 
         List<Snapshot> snapshots = new ArrayList<>();
         for (Path path : paths) {
-            Snapshot.safelyFromPath(fileIO, path).ifPresent(snapshots::add);
+            Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
+            if (snapshot != null) {
+                snapshots.add(snapshot);
+            }
         }
 
         return snapshots;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index ce6f02020..8b7818fed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -242,7 +242,8 @@ public class TagManager {
     /** Get the tagged snapshot by name. */
     public Snapshot taggedSnapshot(String tagName) {
         checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
-        return Tag.fromPath(fileIO, tagPath(tagName)).toSnapshot();
+        // Trim to snapshot to avoid equals and compare snapshot.
+        return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot();
     }
 
     public long tagCount() {
@@ -258,41 +259,25 @@ public class TagManager {
         return new ArrayList<>(tags().keySet());
     }
 
-    /** Get all tag sorted by Tag. */
-    public SortedMap<Tag, List<String>> tagsWithTimeRetained() {
-        return tagsWithFilter(tagName -> true);
-    }
-
     /** Get all tagged snapshots with tag names sorted by snapshot id. */
     public SortedMap<Snapshot, List<String>> tags() {
         return tags(tagName -> true);
     }
 
-    public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
-        SortedMap<Snapshot, List<String>> sortedTagMap =
-                new TreeMap<>(Comparator.comparingLong(Snapshot::id));
-        SortedMap<Tag, List<String>> tags = tagsWithFilter(filter);
-        tags.forEach(
-                (key, value) ->
-                        sortedTagMap
-                                .computeIfAbsent(key.toSnapshot(), tagNames -> 
new ArrayList<>())
-                                .addAll(value));
-        return sortedTagMap;
-    }
-
     /**
-     * Retrieves a sorted map of tags filtered based on a provided predicate. 
The predicate
-     * determines which tag names should be included in the result. Only tags 
with tag names that
-     * pass the predicate test are included.
+     * Retrieves a sorted map of snapshots filtered based on a provided 
predicate. The predicate
+     * determines which tag names should be included in the result. Only 
snapshots with tag names
+     * that pass the predicate test are included.
      *
-     * @param filter A Predicate that tests each tag name. Tags with tag names 
that fail the test
-     *     are excluded from the result.
-     * @return A sorted map of filtered tags keyed by Tag.TAG_COMPARATOR, each 
associated with its
-     *     tag name.
-     * @throws RuntimeException if an IOException occurs during retrieval of 
tags.
+     * @param filter A Predicate that tests each tag name. Snapshots with tag 
names that fail the
+     *     test are excluded from the result.
+     * @return A sorted map of filtered snapshots keyed by their IDs, each 
associated with its tag
+     *     name.
+     * @throws RuntimeException if an IOException occurs during retrieval of 
snapshots.
      */
-    public SortedMap<Tag, List<String>> tagsWithFilter(Predicate<String> 
filter) {
-        TreeMap<Tag, List<String>> tags = new TreeMap<>(Tag.TAG_COMPARATOR);
+    public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
+        TreeMap<Snapshot, List<String>> tags =
+                new TreeMap<>(Comparator.comparingLong(Snapshot::id));
         try {
             List<Path> paths =
                     listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
@@ -307,11 +292,10 @@ public class TagManager {
                 }
                 // If the tag file is not found, it might be deleted by
                 // other processes, so just skip this tag
-                Tag.safelyFromTagPath(fileIO, path)
-                        .ifPresent(
-                                tag ->
-                                        tags.computeIfAbsent(tag, s -> new 
ArrayList<>())
-                                                .add(tagName));
+                Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
+                if (snapshot != null) {
+                    tags.computeIfAbsent(snapshot, s -> new 
ArrayList<>()).add(tagName);
+                }
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -319,6 +303,27 @@ public class TagManager {
         return tags;
     }
 
+    /** Get all {@link Tag}s. */
+    public List<Pair<Tag, String>> tagObjects() {
+        try {
+            List<Path> paths =
+                    listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
+                            .map(FileStatus::getPath)
+                            .collect(Collectors.toList());
+            List<Pair<Tag, String>> tags = new ArrayList<>();
+            for (Path path : paths) {
+                String tagName = path.getName().substring(TAG_PREFIX.length());
+                Tag tag = Tag.safelyFromPath(fileIO, path);
+                if (tag != null) {
+                    tags.add(Pair.of(tag, tagName));
+                }
+            }
+            return tags;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public List<String> sortTagsOfOneSnapshot(List<String> tagNames) {
         return tagNames.stream()
                 .map(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index b0d357853..4b381a601 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -42,7 +43,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -120,28 +120,26 @@ class TagsTableTest extends TableTestBase {
     private List<InternalRow> getExceptedResult(
             Function<String, List<String>> tagBranchesFunction) {
         List<InternalRow> internalRows = new ArrayList<>();
-        for (Map.Entry<Tag, List<String>> snapshot : 
tagManager.tagsWithTimeRetained().entrySet()) {
+        for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {
             Tag tag = snapshot.getKey();
-            for (String tagName : snapshot.getValue()) {
-                internalRows.add(
-                        GenericRow.of(
-                                BinaryString.fromString(tagName),
-                                tag.id(),
-                                tag.schemaId(),
-                                Timestamp.fromLocalDateTime(
-                                        
DateTimeUtils.toLocalDateTime(tag.timeMillis())),
-                                tag.totalRecordCount(),
-                                BinaryString.fromString(
-                                        
tagBranchesFunction.apply(tagName).toString()),
-                                Timestamp.fromLocalDateTime(
-                                        tag.getTagCreateTime() == null
-                                                ? LocalDateTime.MIN
-                                                : tag.getTagCreateTime()),
-                                BinaryString.fromString(
-                                        tag.getTagTimeRetained() == null
-                                                ? ""
-                                                : 
tag.getTagTimeRetained().toString())));
-            }
+            String tagName = snapshot.getValue();
+            internalRows.add(
+                    GenericRow.of(
+                            BinaryString.fromString(tagName),
+                            tag.id(),
+                            tag.schemaId(),
+                            Timestamp.fromLocalDateTime(
+                                    
DateTimeUtils.toLocalDateTime(tag.timeMillis())),
+                            tag.totalRecordCount(),
+                            
BinaryString.fromString(tagBranchesFunction.apply(tagName).toString()),
+                            Timestamp.fromLocalDateTime(
+                                    tag.getTagCreateTime() == null
+                                            ? LocalDateTime.MIN
+                                            : tag.getTagCreateTime()),
+                            BinaryString.fromString(
+                                    tag.getTagTimeRetained() == null
+                                            ? ""
+                                            : 
tag.getTagTimeRetained().toString())));
         }
         return internalRows;
     }
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
index 922055983..3198366dd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
@@ -25,8 +25,6 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.List;
 
 /** Test for {@link Tag}. */
 public class TagTest {
@@ -101,81 +99,4 @@ public class TagTest {
         Tag newTag = Tag.fromJson(tagJson);
         Assert.assertEquals(tag, newTag);
     }
-
-    @Test
-    public void testTagComparator() {
-        Tag tag1 =
-                new Tag(
-                        3,
-                        2L,
-                        0,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        8,
-                        Snapshot.CommitKind.APPEND,
-                        1000,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        LocalDateTime.now(),
-                        Duration.ofSeconds(10));
-
-        Tag tag2 =
-                new Tag(
-                        3,
-                        1L,
-                        0,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        8,
-                        Snapshot.CommitKind.APPEND,
-                        1000,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        LocalDateTime.now(),
-                        Duration.ofSeconds(10));
-
-        Tag tag3 =
-                new Tag(
-                        3,
-                        0L,
-                        0,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        8,
-                        Snapshot.CommitKind.APPEND,
-                        1000,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        LocalDateTime.now(),
-                        Duration.ofSeconds(10));
-        List<Tag> tags = new ArrayList<>();
-        tags.add(tag1);
-        tags.add(tag2);
-        tags.add(tag3);
-        tags.sort(Tag.TAG_COMPARATOR);
-        Assert.assertEquals(0, tags.get(0).id());
-        Assert.assertEquals(1, tags.get(1).id());
-        Assert.assertEquals(2, tags.get(2).id());
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
index 12f38931d..3e702b9b2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
@@ -46,7 +46,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
@@ -122,14 +121,14 @@ public class TagManagerTest {
         tagManager.createTag(
                 snapshotManager.snapshot(1), "tag", Duration.ofDays(1), 
Collections.emptyList());
         assertThat(tagManager.tagExists("tag")).isTrue();
-        SortedMap<Tag, List<String>> tagsWithTimeRetained = 
tagManager.tagsWithTimeRetained();
-        Assertions.assertEquals(1, tagsWithTimeRetained.size());
-        Tag tag = tagsWithTimeRetained.firstKey();
+        List<Pair<Tag, String>> tags = tagManager.tagObjects();
+        Assertions.assertEquals(1, tags.size());
+        Tag tag = tags.get(0).getKey();
         String tagJson = tag.toJson();
         Assertions.assertTrue(
                 tagJson.contains("tagCreateTime") && 
tagJson.contains("tagTimeRetained"));
         Assertions.assertEquals(tag, Tag.fromJson(tagJson));
-        assertThat(tagsWithTimeRetained.get(tag)).contains("tag");
+        assertThat(tags.get(0).getValue()).contains("tag");
     }
 
     private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode, int buckets)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
index f1455d377..cfc9b558b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
@@ -27,8 +27,8 @@ import java.util.Map;
 public class CreateTagAction extends TableActionBase {
 
     private final String tagName;
-    private final Duration timeRetained;
-    private final Long snapshotId;
+    private final @Nullable Long snapshotId;
+    private final @Nullable Duration timeRetained;
 
     public CreateTagAction(
             String warehouse,
@@ -36,8 +36,8 @@ public class CreateTagAction extends TableActionBase {
             String tableName,
             Map<String, String> catalogConfig,
             String tagName,
-            @Nullable Duration timeRetained,
-            Long snapshotId) {
+            @Nullable Long snapshotId,
+            @Nullable Duration timeRetained) {
         super(warehouse, databaseName, tableName, catalogConfig);
         this.tagName = tagName;
         this.timeRetained = timeRetained;
@@ -49,7 +49,7 @@ public class CreateTagAction extends TableActionBase {
         if (snapshotId == null) {
             table.createTag(tagName, timeRetained);
         } else {
-            table.createTag(tagName, timeRetained, snapshotId);
+            table.createTag(tagName, snapshotId, timeRetained);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
index 6cca76cb1..7769fa1d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
@@ -65,8 +65,8 @@ public class CreateTagActionFactory implements ActionFactory {
                         tablePath.f2,
                         catalogConfig,
                         tagName,
-                        timeRetained,
-                        snapshot);
+                        snapshot,
+                        timeRetained);
         return Optional.of(action);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 3bdc2e07e..1a7b03ef6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -27,45 +27,70 @@ import org.apache.flink.table.procedure.ProcedureContext;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
+
 /**
  * Create tag procedure. Usage:
  *
  * <pre><code>
- *  CALL sys.create_tag('tableId', 'tagName', 'timeRetained', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
  * </code></pre>
  */
 public class CreateTagProcedure extends ProcedureBase {
 
     public static final String IDENTIFIER = "create_tag";
 
+    public String[] call(
+            ProcedureContext procedureContext, String tableId, String tagName, 
long snapshotId)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, tagName, snapshotId, null);
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, tagName, null, null);
+    }
+
     public String[] call(
             ProcedureContext procedureContext,
             String tableId,
             String tagName,
-            String timeRetained,
-            long snapshotId)
+            long snapshotId,
+            String timeRetained)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, timeRetained, snapshotId);
+        return innerCall(tableId, tagName, snapshotId, timeRetained);
     }
 
     public String[] call(
             ProcedureContext procedureContext, String tableId, String tagName, 
String timeRetained)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, timeRetained, null);
+        return innerCall(tableId, tagName, null, timeRetained);
     }
 
     private String[] innerCall(
-            String tableId, String tagName, String timeRetained, @Nullable 
Long snapshotId)
+            String tableId,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable String timeRetained)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         if (snapshotId == null) {
-            table.createTag(tagName, TimeUtils.parseDuration(timeRetained));
+            table.createTag(tagName, toDuration(timeRetained));
         } else {
-            table.createTag(tagName, TimeUtils.parseDuration(timeRetained), 
snapshotId);
+            table.createTag(tagName, snapshotId, toDuration(timeRetained));
         }
         return new String[] {"Success"};
     }
 
+    @Nullable
+    private static Duration toDuration(@Nullable String s) {
+        if (s == null) {
+            return null;
+        }
+
+        return TimeUtils.parseDuration(s);
+    }
+
     @Override
     public String identifier() {
         return IDENTIFIER;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index eb6f70f65..7a6361657 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -65,7 +65,7 @@ class BranchActionITCase extends ActionITCaseBase {
         TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
         callProcedure(
                 String.format(
-                        "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", 
database, tableName));
+                        "CALL sys.create_tag('%s.%s', 'tag2', 2, '5 d')", 
database, tableName));
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
         BranchManager branchManager = table.branchManager();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index a650ad31b..1fda583d2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -83,8 +83,7 @@ public class TagActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format(
-                            "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", 
database, tableName));
+                    String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
@@ -154,8 +153,7 @@ public class TagActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format(
-                            "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", 
database, tableName));
+                    String.format("CALL sys.create_tag('%s.%s', 'tag2',  2)", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isTrue();
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 88878c2c7..4d9753bab 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -913,7 +913,7 @@ public abstract class HiveCatalogITCaseBase {
                         "    'metastore.tag-to-partition' = 'dt'",
                         ")"));
         tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 
1)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
 
         assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
                 .containsExactlyInAnyOrder("dt=2023-10-16");
@@ -927,7 +927,7 @@ public abstract class HiveCatalogITCaseBase {
         // another tag
 
         tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 
2)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
 
         assertThat(hiveShell.executeQuery("SELECT * FROM t"))
                 .containsExactlyInAnyOrder(
@@ -951,9 +951,9 @@ public abstract class HiveCatalogITCaseBase {
                         + "    'metastore.tag-to-partition' = 'dt'\n"
                         + ")");
         tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 
1)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
         tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 
2)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
 
         assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
                 .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17");
@@ -979,7 +979,7 @@ public abstract class HiveCatalogITCaseBase {
                         "    'metastore.tag-to-partition' = 'dt'",
                         ")"));
         tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 
1)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
 
         assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
                 .containsExactlyInAnyOrder("dt=2023-10-16");
@@ -991,7 +991,7 @@ public abstract class HiveCatalogITCaseBase {
                 .containsExactlyInAnyOrder("1\t10\t2023-10-16", 
"2\t20\t2023-10-16");
 
         tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
-        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 
2)");
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
 
         tEnv.executeSql("ALTER TABLE t ADD z INT");
         tEnv.executeSql("INSERT INTO t VALUES (3, 30, 5), (4, 40, 6)").await();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
index 38d181bac..b3f863c5e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
@@ -40,8 +40,8 @@ public class CreateTagProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.required("tag", StringType),
-                ProcedureParameter.optional("time_retained", StringType),
-                ProcedureParameter.optional("snapshot", LongType)
+                ProcedureParameter.optional("snapshot", LongType),
+                ProcedureParameter.optional("time_retained", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -68,9 +68,9 @@ public class CreateTagProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String tag = args.getString(1);
+        Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
         Duration timeRetained =
-                args.isNullAt(2) ? null : 
TimeUtils.parseDuration(args.getString(2));
-        Long snapshot = args.isNullAt(3) ? null : args.getLong(3);
+                args.isNullAt(3) ? null : 
TimeUtils.parseDuration(args.getString(3));
 
         return modifyPaimonTable(
                 tableIdent,
@@ -78,7 +78,7 @@ public class CreateTagProcedure extends BaseProcedure {
                     if (snapshot == null) {
                         table.createTag(tag, timeRetained);
                     } else {
-                        table.createTag(tag, timeRetained, snapshot);
+                        table.createTag(tag, snapshot, timeRetained);
                     }
                     InternalRow outputRow = newInternalRow(true);
                     return new InternalRow[] {outputRow};

Reply via email to