This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 65e9ba80ddb6f96c454a2038ed8702693bcc620f
Author: yuzelin <[email protected]>
AuthorDate: Fri Jan 3 15:29:25 2025 +0800

    [core] Refactor TagManager to remove unnecessary tag existence check (#4820)
---
 .../paimon/utils/SupplierWithIOException.java      | 29 ++++++++
 .../apache/paimon/operation/OrphanFilesClean.java  | 12 +--
 .../paimon/table/AbstractFileStoreTable.java       |  8 +-
 .../snapshot/IncrementalTagStartingScanner.java    | 33 +++-----
 .../snapshot/StaticFromTagStartingScanner.java     |  2 +-
 .../table/source/snapshot/TimeTravelUtil.java      |  2 +-
 .../org/apache/paimon/table/system/TagsTable.java  | 25 +++----
 .../org/apache/paimon/tag/TagAutoCreation.java     |  5 +-
 .../org/apache/paimon/utils/BranchManager.java     |  4 +-
 .../java/org/apache/paimon/utils/TagManager.java   | 87 ++++++++++++----------
 .../paimon/operation/ExpireSnapshotsTest.java      |  3 +-
 .../apache/paimon/operation/FileDeletionTest.java  | 10 +--
 .../paimon/table/FileStoreTableTestBase.java       | 17 ++---
 .../paimon/table/IndexFileExpireTableTest.java     |  6 +-
 .../org/apache/paimon/tag/TagAutoManagerTest.java  |  6 +-
 .../org/apache/paimon/utils/TagManagerTest.java    | 11 ++-
 .../org/apache/paimon/utils/TraceableFileIO.java   | 32 ++------
 .../ProcedurePositionalArgumentsITCase.java        |  2 +-
 .../apache/paimon/flink/clone/PickFilesUtil.java   | 11 +--
 .../sink/AutoTagForSavepointCommitterOperator.java |  5 +-
 .../flink/sink/BatchWriteGeneratorTagOperator.java |  9 +--
 .../action/CreateTagFromWatermarkActionITTest.java | 19 +++--
 .../paimon/flink/action/ExpireTagsActionTest.java  |  2 +-
 .../paimon/flink/action/ReplaceTagActionTest.java  | 12 +--
 .../CreateTagFromTimestampProcedureITCase.java     |  2 +-
 .../CreateTagFromWatermarkProcedureITCase.java     |  2 +-
 .../flink/procedure/ExpireTagsProcedureITCase.java |  4 +-
 .../flink/procedure/ReplaceTagProcedureITCase.java |  2 +-
 .../sink/BatchWriteGeneratorTagOperatorTest.java   |  3 +-
 .../CreateTagFromTimestampProcedureTest.scala      |  2 +-
 .../spark/procedure/ExpireTagsProcedureTest.scala  |  4 +-
 31 files changed, 189 insertions(+), 182 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java
new file mode 100644
index 0000000000..f5c79817e4
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+/** A {@link Supplier} with {@link IOException}. */
+@FunctionalInterface
+public interface SupplierWithIOException<T> {
+
+    T get() throws IOException;
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index fc2e1200f0..c2b9be4c27 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -37,6 +37,7 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableConsumer;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.SupplierWithIOException;
 import org.apache.paimon.utils.TagManager;
 
 import org.slf4j.Logger;
@@ -322,13 +323,13 @@ public abstract class OrphanFilesClean implements 
Serializable {
      * {@link FileNotFoundException}, return default value. Finally, if retry 
times reaches the
      * limits, rethrow the IOException.
      */
-    protected static <T> T retryReadingFiles(ReaderWithIOException<T> reader, 
T defaultValue)
+    protected static <T> T retryReadingFiles(SupplierWithIOException<T> 
reader, T defaultValue)
             throws IOException {
         int retryNumber = 0;
         IOException caught = null;
         while (retryNumber++ < READ_FILE_RETRY_NUM) {
             try {
-                return reader.read();
+                return reader.get();
             } catch (FileNotFoundException e) {
                 return defaultValue;
             } catch (IOException e) {
@@ -349,13 +350,6 @@ public abstract class OrphanFilesClean implements 
Serializable {
         return status.getModificationTime() < olderThanMillis;
     }
 
-    /** A helper functional interface for method {@link #retryReadingFiles}. */
-    @FunctionalInterface
-    protected interface ReaderWithIOException<T> {
-
-        T read() throws IOException;
-    }
-
     public static SerializableConsumer<Path> createFileCleaner(
             Catalog catalog, @Nullable Boolean dryRun) {
         SerializableConsumer<Path> fileCleaner;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7e008698c4..935469a819 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -541,7 +541,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     private Optional<TableSchema> travelToTag(String tagName, Options options) 
{
-        return travelToSnapshot(tagManager().taggedSnapshot(tagName), options);
+        return 
travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options);
     }
 
     private Optional<TableSchema> travelToSnapshot(long snapshotId, Options 
options) {
@@ -633,7 +633,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     private void createTag(String tagName, Snapshot fromSnapshot, @Nullable 
Duration timeRetained) {
-        tagManager().createTag(fromSnapshot, tagName, timeRetained, 
store().createTagCallbacks());
+        tagManager()
+                .createTag(
+                        fromSnapshot, tagName, timeRetained, 
store().createTagCallbacks(), false);
     }
 
     @Override
@@ -689,7 +691,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         TagManager tagManager = tagManager();
         checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' 
doesn't exist.", tagName);
 
-        Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
+        Snapshot taggedSnapshot = 
tagManager.getOrThrow(tagName).trimToSnapshot();
         rollbackHelper().cleanLargerThan(taggedSnapshot);
 
         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 2cdf5bff9d..e08ac9f44c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -25,36 +25,27 @@ import org.apache.paimon.utils.TagManager;
 /** {@link StartingScanner} for incremental changes by tag. */
 public class IncrementalTagStartingScanner extends AbstractStartingScanner {
 
-    private final String start;
-    private final String end;
+    private final Snapshot start;
+    private final Snapshot end;
 
     public IncrementalTagStartingScanner(
-            SnapshotManager snapshotManager, String start, String end) {
+            SnapshotManager snapshotManager, String startTagName, String 
endTagName) {
         super(snapshotManager);
-        this.start = start;
-        this.end = end;
         TagManager tagManager =
                 new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
-        if (startingSnapshot != null) {
-            this.startingSnapshotId = startingSnapshot.id();
-        }
-    }
-
-    @Override
-    public Result scan(SnapshotReader reader) {
-        TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        Snapshot tag1 = tagManager.taggedSnapshot(start);
-        Snapshot tag2 = tagManager.taggedSnapshot(end);
-
-        if (tag2.id() <= tag1.id()) {
+        start = tagManager.getOrThrow(startTagName).trimToSnapshot();
+        end = tagManager.getOrThrow(endTagName).trimToSnapshot();
+        if (end.id() <= start.id()) {
             throw new IllegalArgumentException(
                     String.format(
                             "Tag end %s with snapshot id %s should be larger 
than tag start %s with snapshot id %s",
-                            end, tag2.id(), start, tag1.id()));
+                            endTagName, end.id(), startTagName, start.id()));
         }
+        this.startingSnapshotId = start.id();
+    }
 
-        return 
StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
+    @Override
+    public Result scan(SnapshotReader reader) {
+        return 
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index 4fa070299f..b22e17e9a0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -43,7 +43,7 @@ public class StaticFromTagStartingScanner extends 
ReadPlanStartingScanner {
     public SnapshotReader configure(SnapshotReader snapshotReader) {
         TagManager tagManager =
                 new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+        Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
         return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 4c8b41aa42..5b4ee4e58c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -122,6 +122,6 @@ public class TimeTravelUtil {
         String tagName = options.scanTagName();
         TagManager tagManager =
                 new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        return tagManager.taggedSnapshot(tagName);
+        return tagManager.getOrThrow(tagName).trimToSnapshot();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 9aafdb5983..8f28be8af2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -59,6 +59,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -227,27 +228,25 @@ public class TagsTable implements ReadonlyTable {
                         && ((LeafPredicate) predicate).literals().get(0) 
instanceof BinaryString
                         && 
predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) {
                     String equalValue = ((LeafPredicate) 
predicate).literals().get(0).toString();
-                    if (tagManager.tagExists(equalValue)) {
-                        predicateMap.put(equalValue, 
tagManager.tag(equalValue));
-                    }
+                    tagManager.get(equalValue).ifPresent(tag -> 
predicateMap.put(equalValue, tag));
                 }
 
                 if (predicate instanceof CompoundPredicate) {
                     CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
                     // optimize for IN filter
                     if ((compoundPredicate.function()) instanceof Or) {
+                        List<String> tagNames = new ArrayList<>();
                         InPredicateVisitor.extractInElements(predicate, 
TAG_NAME)
                                 .ifPresent(
-                                        leafs ->
-                                                leafs.forEach(
-                                                        leaf -> {
-                                                            String leftName = 
leaf.toString();
-                                                            if 
(tagManager.tagExists(leftName)) {
-                                                                
predicateMap.put(
-                                                                        
leftName,
-                                                                        
tagManager.tag(leftName));
-                                                            }
-                                                        }));
+                                        e ->
+                                                e.stream()
+                                                        .map(Object::toString)
+                                                        
.forEach(tagNames::add));
+                        tagNames.forEach(
+                                name ->
+                                        tagManager
+                                                .get(name)
+                                                .ifPresent(value -> 
predicateMap.put(name, value)));
                     }
                 }
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 3989786bd2..5232e89a8b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -167,9 +167,8 @@ public class TagAutoCreation {
             }
             String tagName = periodHandler.timeToTag(thisTag);
             LOG.info("The tag name is {}.", tagName);
-            if (!tagManager.tagExists(tagName)) {
-                tagManager.createTag(snapshot, tagName, defaultTimeRetained, 
callbacks);
-            }
+            // shouldn't throw exception when tag exists
+            tagManager.createTag(snapshot, tagName, defaultTimeRetained, 
callbacks, true);
             nextTag = periodHandler.nextTagTime(thisTag);
             LOG.info("The next tag time after this is {}.", nextTag);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 2ea5f542f4..7dfb30e698 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -107,9 +107,7 @@ public class BranchManager {
 
     public void createBranch(String branchName, String tagName) {
         validateBranch(branchName);
-        checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not 
exists.", tagName);
-
-        Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+        Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
 
         try {
             // Copy the corresponding tag, snapshot and schema files into the 
branch directory
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 4019395d8d..4713703bbd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -97,31 +98,34 @@ public class TagManager {
 
     /** Create a tag from given snapshot and save it in the storage. */
     public void createTag(
-            Snapshot snapshot, String tagName, Duration timeRetained, 
List<TagCallback> callbacks) {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' 
is blank.", tagName);
-        checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", 
tagName);
+            Snapshot snapshot,
+            String tagName,
+            Duration timeRetained,
+            List<TagCallback> callbacks,
+            boolean ignoreIfExists) {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name 
shouldn't be blank.");
+        if (tagExists(tagName)) {
+            checkArgument(ignoreIfExists, "Tag '%s' already exists.", tagName);
+            return;
+        }
         createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
     }
 
     /** Replace a tag from given snapshot and save it in the storage. */
     public void replaceTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' 
is blank.", tagName);
-        checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", 
tagName);
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name 
shouldn't be blank.");
+        checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
         createOrReplaceTag(snapshot, tagName, timeRetained, null);
     }
 
-    public void createOrReplaceTag(
+    private void createOrReplaceTag(
             Snapshot snapshot,
             String tagName,
             @Nullable Duration timeRetained,
             @Nullable List<TagCallback> callbacks) {
-        // When timeRetained is not defined, please do not write the 
tagCreatorTime field,
-        // as this will cause older versions (<= 0.7) of readers to be unable 
to read this
-        // tag.
-        // When timeRetained is defined, it is fine, because timeRetained is 
the new
-        // feature.
+        // When timeRetained is not defined, please do not write the 
tagCreatorTime field, as this
+        // will cause older versions (<= 0.7) of readers to be unable to read 
this tag.
+        // When timeRetained is defined, it is fine, because timeRetained is 
the new feature.
         String content =
                 timeRetained != null
                         ? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, 
LocalDateTime.now())
@@ -152,17 +156,17 @@ public class TagManager {
     }
 
     public void renameTag(String tagName, String targetTagName) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(tagName),
+                "Original tag name shouldn't be blank.");
+        checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(targetTagName),
+                "New tag name shouldn't be blank.");
+        checkArgument(!tagExists(targetTagName), "Tag '%s' already exists.", 
tagName);
+
         try {
-            if (!tagExists(tagName)) {
-                throw new RuntimeException(
-                        String.format("The specified tag name [%s] does not 
exist.", tagName));
-            }
-            if (tagExists(targetTagName)) {
-                throw new RuntimeException(
-                        String.format(
-                                "The specified target tag name [%s] existed, 
please set a  non-existent tag name.",
-                                targetTagName));
-            }
             fileIO.rename(tagPath(tagName), tagPath(targetTagName));
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -172,7 +176,7 @@ public class TagManager {
     /** Make sure the tagNames are ALL tags of one snapshot. */
     public void deleteAllTagsOfOneSnapshot(
             List<String> tagNames, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
-        Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0));
+        Snapshot taggedSnapshot = getOrThrow(tagNames.get(0)).trimToSnapshot();
         List<Snapshot> taggedSnapshots;
 
         // skip file deletion if snapshot exists
@@ -188,19 +192,20 @@ public class TagManager {
         doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
     }
 
+    /** Ignore errors if the tag doesn't exist. */
     public void deleteTag(
             String tagName,
             TagDeletion tagDeletion,
             SnapshotManager snapshotManager,
             List<TagCallback> callbacks) {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' 
is blank.", tagName);
-        if (!tagExists(tagName)) {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name 
shouldn't be blank.");
+        Optional<Tag> tag = get(tagName);
+        if (!tag.isPresent()) {
             LOG.warn("Tag '{}' doesn't exist.", tagName);
             return;
         }
 
-        Snapshot taggedSnapshot = taggedSnapshot(tagName);
+        Snapshot taggedSnapshot = tag.get().trimToSnapshot();
         List<Snapshot> taggedSnapshots;
 
         // skip file deletion if snapshot exists
@@ -303,10 +308,21 @@ public class TagManager {
         }
     }
 
-    /** Get the tagged snapshot by name. */
-    public Snapshot taggedSnapshot(String tagName) {
-        checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
-        return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot();
+    /** Return the tag or Optional.empty() if the tag file not found. */
+    public Optional<Tag> get(String tagName) {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name 
shouldn't be blank.");
+        try {
+            return Optional.of(Tag.tryFromPath(fileIO, tagPath(tagName)));
+        } catch (FileNotFoundException e) {
+            return Optional.empty();
+        }
+    }
+
+    /** Return the tag or throw exception indicating the tag not found. */
+    public Tag getOrThrow(String tagName) {
+        return get(tagName)
+                .orElseThrow(
+                        () -> new IllegalArgumentException("Tag '" + tagName + 
"' doesn't exist."));
     }
 
     public long tagCount() {
@@ -410,12 +426,7 @@ public class TagManager {
         }
         throw new RuntimeException(
                 String.format(
-                        "Didn't find tag with snapshot id '%s'.This is 
unexpected.",
+                        "Didn't find tag with snapshot id '%s'. This is 
unexpected.",
                         taggedSnapshot.id()));
     }
-
-    /** Read tag for tagName. */
-    public Tag tag(String tagName) {
-        return Tag.fromPath(fileIO, tagPath(tagName));
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index abff820b2c..5811fc3721 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -151,7 +151,8 @@ public class ExpireSnapshotsTest {
                     snapshot,
                     "tag" + id,
                     store.options().tagDefaultTimeRetained(),
-                    Collections.emptyList());
+                    Collections.emptyList(),
+                    false);
         }
 
         // randomly expire snapshots
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 3a5ee93daa..45b53ddba4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -313,7 +313,7 @@ public class FileDeletionTest {
         // check manifests
         ManifestList manifestList = store.manifestListFactory().create();
         for (String tagName : Arrays.asList("tag1", "tag2")) {
-            Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+            Snapshot snapshot = tagManager.getOrThrow(tagName);
             List<Path> manifestFilePaths =
                     manifestList.readDataManifests(snapshot).stream()
                             .map(ManifestFileMeta::fileName)
@@ -367,7 +367,7 @@ public class FileDeletionTest {
         FileStorePathFactory pathFactory = store.pathFactory();
         assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
 
-        Snapshot tag1 = tagManager.taggedSnapshot("tag1");
+        Snapshot tag1 = tagManager.getOrThrow("tag1");
         ManifestList manifestList = store.manifestListFactory().create();
         List<Path> manifestFilePaths =
                 manifestList.readDataManifests(tag1).stream()
@@ -519,8 +519,8 @@ public class FileDeletionTest {
         assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 1));
 
         // check manifests
-        Snapshot tag1 = tagManager.taggedSnapshot("tag1");
-        Snapshot tag3 = tagManager.taggedSnapshot("tag3");
+        Snapshot tag1 = tagManager.getOrThrow("tag1");
+        Snapshot tag3 = tagManager.getOrThrow("tag3");
         List<ManifestFileMeta> existing = manifestList.readDataManifests(tag1);
         existing.addAll(manifestList.readDataManifests(tag3));
         for (ManifestFileMeta manifestFileMeta : snapshot2Data) {
@@ -805,6 +805,6 @@ public class FileDeletionTest {
     }
 
     private void createTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {
-        tagManager.createTag(snapshot, tagName, timeRetained, 
Collections.emptyList());
+        tagManager.createTag(snapshot, tagName, timeRetained, 
Collections.emptyList(), false);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 75e284a68c..ecb42d7669 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1080,7 +1080,7 @@ public abstract class FileStoreTableTestBase {
         assertThat(tagManager.tagExists("test-tag")).isTrue();
 
         // verify that test-tag is equal to snapshot 2
-        Snapshot tagged = tagManager.taggedSnapshot("test-tag");
+        Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot();
         Snapshot snapshot2 = table.snapshotManager().snapshot(2);
         assertThat(tagged.equals(snapshot2)).isTrue();
     }
@@ -1103,7 +1103,7 @@ public abstract class FileStoreTableTestBase {
             TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
             assertThat(tagManager.tagExists("test-tag")).isTrue();
             // verify that test-tag is equal to snapshot 1
-            Snapshot tagged = tagManager.taggedSnapshot("test-tag");
+            Snapshot tagged = 
tagManager.getOrThrow("test-tag").trimToSnapshot();
             Snapshot snapshot1 = table.snapshotManager().snapshot(1);
             assertThat(tagged.equals(snapshot1)).isTrue();
             // snapshot 2
@@ -1116,7 +1116,7 @@ public abstract class FileStoreTableTestBase {
             // verify that tag file exist
             assertThat(tagManager.tagExists("test-tag-2")).isTrue();
             // verify that test-tag is equal to snapshot 1
-            Snapshot tag2 = tagManager.taggedSnapshot("test-tag-2");
+            Snapshot tag2 = 
tagManager.getOrThrow("test-tag-2").trimToSnapshot();
             assertThat(tag2.equals(snapshot1)).isTrue();
         }
     }
@@ -1138,9 +1138,9 @@ public abstract class FileStoreTableTestBase {
             assertThat(tagManager.tagExists("test-tag")).isTrue();
             // Create again failed if tag existed
             Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 1))
-                    .hasMessageContaining("Tag name 'test-tag' already 
exists.");
+                    .hasMessageContaining("Tag 'test-tag' already exists.");
             Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2))
-                    .hasMessageContaining("Tag name 'test-tag' already 
exists.");
+                    .hasMessageContaining("Tag 'test-tag' already exists.");
         }
     }
 
@@ -1165,7 +1165,7 @@ public abstract class FileStoreTableTestBase {
         assertThat(tagManager.tagExists("test-tag")).isTrue();
 
         // verify that test-tag is equal to snapshot 2
-        Snapshot tagged = tagManager.taggedSnapshot("test-tag");
+        Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot();
         Snapshot snapshot2 = table.snapshotManager().snapshot(2);
         assertThat(tagged.equals(snapshot2)).isTrue();
 
@@ -1220,7 +1220,7 @@ public abstract class FileStoreTableTestBase {
         assertThatThrownBy(() -> table.createBranch("branch-1", "tag1"))
                 .satisfies(
                         anyCauseMatches(
-                                IllegalArgumentException.class, "Tag name 
'tag1' not exists."));
+                                IllegalArgumentException.class, "Tag 'tag1' 
doesn't exist."));
 
         assertThatThrownBy(() -> table.createBranch("branch0", "test-tag"))
                 .satisfies(
@@ -1409,8 +1409,7 @@ public abstract class FileStoreTableTestBase {
         assertThatThrownBy(() -> table.createTag("", 1))
                 .satisfies(
                         anyCauseMatches(
-                                IllegalArgumentException.class,
-                                String.format("Tag name '%s' is blank", "")));
+                                IllegalArgumentException.class, "Tag name 
shouldn't be blank"));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 4ad634a433..3040e11175 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -107,8 +107,8 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
         assertThat(indexManifestSize()).isEqualTo(3);
 
         TagManager tagManager = new TagManager(LocalFileIO.create(), 
table.location());
-        checkIndexFiles(tagManager.taggedSnapshot("tag3"));
-        checkIndexFiles(tagManager.taggedSnapshot("tag5"));
+        checkIndexFiles(tagManager.getOrThrow("tag3"));
+        checkIndexFiles(tagManager.getOrThrow("tag5"));
     }
 
     @Test
@@ -135,7 +135,7 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
 
         TagManager tagManager = new TagManager(LocalFileIO.create(), 
table.location());
         checkIndexFiles(7);
-        checkIndexFiles(tagManager.taggedSnapshot("tag5"));
+        checkIndexFiles(tagManager.getOrThrow("tag5"));
         assertThat(indexFileSize()).isEqualTo(4);
         assertThat(indexManifestSize()).isEqualTo(2);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 1bebbe5fb5..4dfa802f81 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -392,7 +392,8 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
                 snapshot1,
                 "non-auto-create-tag-shoule-expire",
                 Duration.ofMillis(500),
-                Collections.emptyList());
+                Collections.emptyList(),
+                false);
 
         Snapshot snapshot2 =
                 new Snapshot(
@@ -416,7 +417,8 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
                 snapshot2,
                 "non-auto-create-tag-shoule-not-expire",
                 Duration.ofDays(1),
-                Collections.emptyList());
+                Collections.emptyList(),
+                false);
 
         // test expire old tag by time-retained
         Thread.sleep(1000);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
index 3e702b9b2c..abe9ee0b8a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java
@@ -91,9 +91,10 @@ public class TagManagerTest {
                 snapshotManager.snapshot(1),
                 "tag",
                 store.options().tagDefaultTimeRetained(),
-                Collections.emptyList());
+                Collections.emptyList(),
+                false);
         assertThat(tagManager.tagExists("tag")).isTrue();
-        Snapshot snapshot = tagManager.taggedSnapshot("tag");
+        Snapshot snapshot = tagManager.getOrThrow("tag").trimToSnapshot();
         String snapshotJson = snapshot.toJson();
         Assertions.assertTrue(
                 !snapshotJson.contains("tagCreateTime")
@@ -119,7 +120,11 @@ public class TagManagerTest {
         commitData(store, commitIdentifier++, writers);
 
         tagManager.createTag(
-                snapshotManager.snapshot(1), "tag", Duration.ofDays(1), 
Collections.emptyList());
+                snapshotManager.snapshot(1),
+                "tag",
+                Duration.ofDays(1),
+                Collections.emptyList(),
+                false);
         assertThat(tagManager.tagExists("tag")).isTrue();
         List<Pair<Tag, String>> tags = tagManager.tagObjects();
         Assertions.assertEquals(1, tags.size());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java
index eb616a9ab2..ef811bb712 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java
@@ -37,7 +37,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -62,15 +61,7 @@ public class TraceableFileIO implements FileIO {
 
     @Override
     public PositionOutputStream newOutputStream(Path f, boolean overwrite) 
throws IOException {
-        return createOutputStream(
-                f,
-                () -> {
-                    try {
-                        return originalFs.newOutputStream(f, overwrite);
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                });
+        return createOutputStream(f, () -> originalFs.newOutputStream(f, 
overwrite));
     }
 
     @Override
@@ -85,30 +76,22 @@ public class TraceableFileIO implements FileIO {
 
     @Override
     public SeekableInputStream newInputStream(Path f) throws IOException {
-        return createInputStream(
-                f,
-                () -> {
-                    try {
-                        return originalFs.newInputStream(f);
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                });
+        return createInputStream(f, () -> originalFs.newInputStream(f));
     }
 
     private PositionOutputStream createOutputStream(
-            Path f, Supplier<PositionOutputStream> streamOpener) throws 
IOException {
+            Path f, SupplierWithIOException<PositionOutputStream> 
streamOpener) throws IOException {
 
-        final Supplier<OutStream> wrappedStreamOpener =
+        final SupplierWithIOException<OutStream> wrappedStreamOpener =
                 () -> new OutStream(ThreadUtils.currentStackString(), f, 
streamOpener.get(), this);
 
         return createStream(wrappedStreamOpener, OPEN_OUTPUT_STREAMS);
     }
 
     private SeekableInputStream createInputStream(
-            Path f, Supplier<SeekableInputStream> streamOpener) throws 
IOException {
+            Path f, SupplierWithIOException<SeekableInputStream> streamOpener) 
throws IOException {
 
-        final Supplier<InStream> wrappedStreamOpener =
+        final SupplierWithIOException<InStream> wrappedStreamOpener =
                 () -> new InStream(ThreadUtils.currentStackString(), f, 
streamOpener.get(), this);
 
         return createStream(wrappedStreamOpener, OPEN_INPUT_STREAMS);
@@ -144,7 +127,8 @@ public class TraceableFileIO implements FileIO {
         return originalFs.exists(f);
     }
 
-    private <T> T createStream(final Supplier<T> streamOpener, final 
HashSet<T> openStreams)
+    private <T> T createStream(
+            final SupplierWithIOException<T> streamOpener, final HashSet<T> 
openStreams)
             throws IOException {
         // open the stream outside the lock.
         final T out = streamOpener.get();
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index f79d6fb716..e4d1738667 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -540,7 +540,7 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
         
assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2L);
 
         Assertions.assertThatThrownBy(() -> sql("CALL 
sys.replace_tag('default.T', 'test_tag')"))
-                .hasMessageContaining("Tag name 'test_tag' does not exist.");
+                .hasMessageContaining("Tag 'test_tag' doesn't exist.");
 
         sql("CALL sys.create_tag('default.T', 'test_tag')");
         assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
index 9de974d047..c36a6cd186 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -31,6 +31,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.SupplierWithIOException;
 
 import javax.annotation.Nullable;
 
@@ -176,12 +177,12 @@ public class PickFilesUtil {
     }
 
     @Nullable
-    private static <T> T retryReadingFiles(ReaderWithIOException<T> reader) 
throws IOException {
+    private static <T> T retryReadingFiles(SupplierWithIOException<T> reader) 
throws IOException {
         int retryNumber = 0;
         IOException caught = null;
         while (retryNumber++ < READ_FILE_RETRY_NUM) {
             try {
-                return reader.read();
+                return reader.get();
             } catch (FileNotFoundException e) {
                 return null;
             } catch (IOException e) {
@@ -197,10 +198,4 @@ public class PickFilesUtil {
 
         throw caught;
     }
-
-    /** A helper functional interface for method {@link #retryReadingFiles}. */
-    @FunctionalInterface
-    interface ReaderWithIOException<T> {
-        T read() throws IOException;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index 0822f04612..66d9781207 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -164,9 +164,8 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
                         commitOperator.getCommitUser(), identifiers);
         for (Snapshot snapshot : snapshotForTags) {
             String tagName = SAVEPOINT_TAG_PREFIX + 
snapshot.commitIdentifier();
-            if (!tagManager.tagExists(tagName)) {
-                tagManager.createTag(snapshot, tagName, tagTimeRetained, 
callbacks);
-            }
+            // shouldn't throw exception when tag exists
+            tagManager.createTag(snapshot, tagName, tagTimeRetained, 
callbacks, true);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index 1cbcc4b226..160765716e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -106,16 +106,15 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
                         + 
localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
         try {
             // If the tag already exists, delete the tag
-            if (tagManager.tagExists(tagName)) {
-                tagManager.deleteTag(
-                        tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
-            }
+            tagManager.deleteTag(
+                    tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
             // Create a new tag
             tagManager.createTag(
                     snapshot,
                     tagName,
                     table.coreOptions().tagDefaultTimeRetained(),
-                    table.store().createTagCallbacks());
+                    table.store().createTagCallbacks(),
+                    false);
             // Expire the tag
             expireTag();
         } catch (Exception e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java
index 1198eb47f4..ee42d2e177 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java
@@ -80,8 +80,8 @@ public class CreateTagFromWatermarkActionITTest extends 
ActionITCaseBase {
                         Long.toString(watermark2 - 1))
                 .run();
         assertThat(table.tagManager().tagExists("tag2")).isTrue();
-        
assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(watermark2);
-        
assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()).isEqualTo(commitTime2);
+        
assertThat(table.tagManager().getOrThrow("tag2").watermark()).isEqualTo(watermark2);
+        
assertThat(table.tagManager().getOrThrow("tag2").timeMillis()).isEqualTo(commitTime2);
 
         createAction(
                         CreateTagFromWatermarkAction.class,
@@ -98,8 +98,8 @@ public class CreateTagFromWatermarkActionITTest extends 
ActionITCaseBase {
                         Long.toString(watermark2 + 1))
                 .run();
         assertThat(table.tagManager().tagExists("tag3")).isTrue();
-        
assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3);
-        
assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3);
+        
assertThat(table.tagManager().getOrThrow("tag3").watermark()).isEqualTo(watermark3);
+        
assertThat(table.tagManager().getOrThrow("tag3").timeMillis()).isEqualTo(commitTime3);
     }
 
     @Test
@@ -131,7 +131,7 @@ public class CreateTagFromWatermarkActionITTest extends 
ActionITCaseBase {
 
         assertThat(table.snapshotManager().snapshotExists(1)).isFalse();
 
-        Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1");
+        Snapshot tagSnapshot1 = table.tagManager().getOrThrow("tag1");
 
         long tagsCommitTime = tagSnapshot1.timeMillis();
         long tagsWatermark = tagSnapshot1.watermark();
@@ -158,9 +158,8 @@ public class CreateTagFromWatermarkActionITTest extends 
ActionITCaseBase {
                         Long.toString(tagsWatermark - 1))
                 .run();
         assertThat(table.tagManager().tagExists("tag2")).isTrue();
-        
assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark);
-        assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis())
-                .isEqualTo(tagsCommitTime);
+        
assertThat(table.tagManager().getOrThrow("tag2").watermark()).isEqualTo(tagsWatermark);
+        
assertThat(table.tagManager().getOrThrow("tag2").timeMillis()).isEqualTo(tagsCommitTime);
 
         createAction(
                         CreateTagFromWatermarkAction.class,
@@ -177,7 +176,7 @@ public class CreateTagFromWatermarkActionITTest extends 
ActionITCaseBase {
                         Long.toString(watermark2 - 1))
                 .run();
         assertThat(table.tagManager().tagExists("tag3")).isTrue();
-        
assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2);
-        
assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2);
+        
assertThat(table.tagManager().getOrThrow("tag3").watermark()).isEqualTo(watermark2);
+        
assertThat(table.tagManager().getOrThrow("tag3").timeMillis()).isEqualTo(commitTime2);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
index 7e204ca884..23f2f0261d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
@@ -92,7 +92,7 @@ public class ExpireTagsActionTest extends ActionITCaseBase {
         assertThat(table.tagManager().tagExists("tag-5")).isFalse();
 
         // tag-3 as the base older_than time
-        LocalDateTime olderThanTime = 
table.tagManager().tag("tag-3").getTagCreateTime();
+        LocalDateTime olderThanTime = 
table.tagManager().getOrThrow("tag-3").getTagCreateTime();
         java.sql.Timestamp timestamp =
                 new 
java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond());
         createAction(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
index 00b43b9e11..8b14afdd7d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
@@ -55,12 +55,12 @@ public class ReplaceTagActionTest extends ActionITCaseBase {
                         () ->
                                 bEnv.executeSql(
                                         "CALL sys.replace_tag(`table` => 
'default.T', tag => 'test_tag')"))
-                .hasMessageContaining("Tag name 'test_tag' does not exist.");
+                .hasMessageContaining("Tag 'test_tag' doesn't exist.");
 
         bEnv.executeSql("CALL sys.create_tag(`table` => 'default.T', tag => 
'test_tag')");
         assertThat(tagManager.tagExists("test_tag")).isEqualTo(true);
-        
assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(2);
-        
assertThat(tagManager.tag("test_tag").getTagTimeRetained()).isEqualTo(null);
+        assertThat(tagManager.getOrThrow("test_tag").id()).isEqualTo(2);
+        
assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained()).isEqualTo(null);
 
         // replace tag with new time_retained
         createAction(
@@ -77,7 +77,7 @@ public class ReplaceTagActionTest extends ActionITCaseBase {
                         "--time_retained",
                         "1 d")
                 .run();
-        
assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(24);
+        
assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained().toHours()).isEqualTo(24);
 
         // replace tag with new snapshot and time_retained
         createAction(
@@ -96,7 +96,7 @@ public class ReplaceTagActionTest extends ActionITCaseBase {
                         "--time_retained",
                         "2 d")
                 .run();
-        
assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(1);
-        
assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(48);
+        assertThat(tagManager.getOrThrow("test_tag").id()).isEqualTo(1);
+        
assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained().toHours()).isEqualTo(48);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java
index 9e7aeb5ded..c2d2bcc10f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java
@@ -133,7 +133,7 @@ public class CreateTagFromTimestampProcedureITCase extends 
CatalogITCaseBase {
 
         FileStoreTable table = paimonTable("T");
         long earliestCommitTime = 
table.snapshotManager().earliestSnapshot().timeMillis();
-        long tagSnapshotCommitTime = 
table.tagManager().taggedSnapshot("tag1").timeMillis();
+        long tagSnapshotCommitTime = 
table.tagManager().getOrThrow("tag1").timeMillis();
 
         assertThat(tagSnapshotCommitTime < earliestCommitTime).isTrue();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java
index 8e659e75c8..9255aa9563 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java
@@ -147,7 +147,7 @@ public class CreateTagFromWatermarkProcedureITCase extends 
CatalogITCaseBase {
 
         assertThat(table.snapshotManager().snapshotExists(1)).isFalse();
 
-        Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1");
+        Snapshot tagSnapshot1 = table.tagManager().getOrThrow("tag1");
 
         long tagsCommitTime = tagSnapshot1.timeMillis();
         long tagsWatermark = tagSnapshot1.watermark();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
index 4a89531b22..90e5bc6702 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
@@ -102,7 +102,7 @@ public class ExpireTagsProcedureITCase extends 
CatalogITCaseBase {
 
         // tag-2 as the base older_than time.
         // tag-1 expired by its file creation time.
-        LocalDateTime olderThanTime1 = 
table.tagManager().tag("tag-2").getTagCreateTime();
+        LocalDateTime olderThanTime1 = 
table.tagManager().getOrThrow("tag-2").getTagCreateTime();
         java.sql.Timestamp timestamp1 =
                 new java.sql.Timestamp(
                         
Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond());
@@ -119,7 +119,7 @@ public class ExpireTagsProcedureITCase extends 
CatalogITCaseBase {
 
         // tag-4 as the base older_than time.
         // tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained.
-        LocalDateTime olderThanTime2 = 
table.tagManager().tag("tag-4").getTagCreateTime();
+        LocalDateTime olderThanTime2 = 
table.tagManager().getOrThrow("tag-4").getTagCreateTime();
         java.sql.Timestamp timestamp2 =
                 new java.sql.Timestamp(
                         
Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
index 8a4eb791a6..4ee4287b7f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
@@ -44,7 +44,7 @@ public class ReplaceTagProcedureITCase extends 
CatalogITCaseBase {
                         () ->
                                 sql(
                                         "CALL sys.replace_tag(`table` => 
'default.T', tag => 'test_tag')"))
-                .hasMessageContaining("Tag name 'test_tag' does not exist.");
+                .hasMessageContaining("Tag 'test_tag' doesn't exist.");
 
         sql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')");
         assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index 68162832ea..10292b3a3f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -97,7 +97,8 @@ public class BatchWriteGeneratorTagOperatorTest extends 
CommitterOperatorTest {
         // Get tagName from tagManager.
         String tagName = tagManager.allTagNames().get(0);
         // The tag is consistent with the latest snapshot
-        
assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot());
+        assertThat(tagManager.getOrThrow(tagName).trimToSnapshot())
+                .isEqualTo(snapshotManager.latestSnapshot());
 
         // test tag expiration
         table.createTag("many-tags-test1");
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
index 1da05843dc..301b769288 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
@@ -143,7 +143,7 @@ class CreateTagFromTimestampProcedureTest extends 
PaimonSparkTestBase with Strea
 
             val table = loadTable("T")
             val latestCommitTime = 
table.snapshotManager.latestSnapshot().timeMillis
-            val tagsCommitTime = 
table.tagManager().taggedSnapshot("test_tag").timeMillis
+            val tagsCommitTime = 
table.tagManager().getOrThrow("test_tag").timeMillis
             assert(latestCommitTime > tagsCommitTime)
 
             // make snapshot 1 expire.
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
index 65c0f2b9a2..1ac9709c87 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
@@ -97,7 +97,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase {
 
     // tag-2 as the base older_than time.
     // tag-1 expired by its file creation time.
-    val olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime
+    val olderThanTime1 = 
table.tagManager().getOrThrow("tag-2").getTagCreateTime
     val timestamp1 =
       new 
java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond)
     checkAnswer(
@@ -112,7 +112,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase {
 
     // tag-4 as the base older_than time.
     // tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained.
-    val olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime
+    val olderThanTime2 = 
table.tagManager().getOrThrow("tag-4").getTagCreateTime
     val timestamp2 =
       new 
java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond)
     checkAnswer(

Reply via email to