This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e2acdc217 [spark] Replace create existing tag semantic with 
replace_tag (#4346)
e2acdc217 is described below

commit e2acdc2172f230f22d899c5c16873a1bf5da9041
Author: askwang <135721692+askw...@users.noreply.github.com>
AuthorDate: Thu Oct 31 10:38:07 2024 +0800

    [spark] Replace create existing tag semantic with replace_tag (#4346)
---
 docs/content/flink/procedures.md                   |  26 ++++++
 docs/content/spark/procedures.md                   |  13 +++
 .../paimon/table/AbstractFileStoreTable.java       |  13 +++
 .../paimon/table/DelegatedFileStoreTable.java      |   5 +
 .../java/org/apache/paimon/table/FormatTable.java  |   5 +
 .../org/apache/paimon/table/ReadonlyTable.java     |   8 ++
 .../main/java/org/apache/paimon/table/Table.java   |   4 +
 .../java/org/apache/paimon/utils/TagManager.java   |  43 +++++----
 .../paimon/table/FileStoreTableTestBase.java       |   5 +-
 ...e.java => CreateOrReplaceTagBaseProcedure.java} |  34 ++-----
 .../paimon/flink/procedure/CreateTagProcedure.java |  59 +-----------
 .../flink/procedure/ReplaceTagProcedure.java       |  39 ++++++++
 .../ProcedurePositionalArgumentsITCase.java        |  30 ++++++
 ...y.java => CreateOrReplaceTagActionFactory.java} |  41 +++------
 .../flink/action/CreateTagActionFactory.java       |  50 +++-------
 .../paimon/flink/action/ReplaceTagAction.java      |  51 +++++++++++
 .../flink/action/ReplaceTagActionFactory.java      |  64 +++++++++++++
 ...e.java => CreateOrReplaceTagBaseProcedure.java} |  26 ++----
 .../paimon/flink/procedure/CreateTagProcedure.java |  49 +---------
 .../flink/procedure/ReplaceTagProcedure.java       |  39 ++++++++
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../paimon/flink/action/ReplaceTagActionTest.java  | 102 +++++++++++++++++++++
 .../paimon/flink/procedure/ProcedureTest.java      |   3 +-
 .../flink/procedure/ReplaceTagProcedureITCase.java |  67 ++++++++++++++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 ...e.java => CreateOrReplaceTagBaseProcedure.java} |  28 ++----
 .../paimon/spark/procedure/CreateTagProcedure.java |  63 ++-----------
 .../spark/procedure/ReplaceTagProcedure.java       |  52 +++++++++++
 .../CreateAndDeleteTagProcedureTest.scala          |  16 ++--
 .../spark/procedure/ReplaceTagProcedureTest.scala  |  59 ++++++++++++
 30 files changed, 685 insertions(+), 313 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 52460e27e..e489f48dd 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -222,6 +222,32 @@ All available procedures are listed below.
          CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
       </td>
    </tr>
+   <tr>
+      <td>replace_tag</td>
+      <td>
+         -- Use named argument<br/>
+         -- replace tag with new time retained <br/>
+         CALL [catalog.]sys.replace_tag(`table` => 'identifier', tag => 
'tagName', time_retained => 'timeRetained') <br/>
+         -- replace tag with new snapshot id and time retained <br/>
+         CALL [catalog.]sys.replace_tag(`table` => 'identifier', snapshot_id 
=> 'snapshotId') <br/><br/>
+         -- Use indexed argument<br/>
+         -- replace tag with new snapshot id and time retained <br/>
+         CALL [catalog.]sys.replace_tag('identifier', 'tagName', 'snapshotId', 
'timeRetained') <br/>
+      </td>
+      <td>
+         To replace an existing tag with new tag info. Arguments:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>tag: name of the existed tag. Cannot be empty.</li>
+            <li>snapshot(Long):  id of the snapshot which the tag is based on, 
it is optional.</li>
+            <li>time_retained: The maximum time retained for the existing tag, 
it is optional.</li>
+      </td>
+      <td>
+         -- for Flink 1.18<br/>
+         CALL sys.replace_tag('default.T', 'my_tag', 5, '1 d')<br/><br/>
+         -- for Flink 1.19 and later<br/>
+         CALL sys.replace_tag(`table` => 'default.T', tag => 'my_tag', 
snapshot_id => 5, time_retained => '1 d')<br/><br/>
+      </td>
+   </tr>
    <tr>
       <td>expire_tags</td>
       <td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 8c5226921..5a660cb1f 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -121,6 +121,19 @@ This section introduce all available spark procedures 
about paimon.
          CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', 
target_tag_name => 'tag2')
       </td>
     </tr>
+    <tr>
+      <td>replace_tag</td>
+      <td>
+         Replace an existing tag with new tag info. Arguments:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>tag: name of the existed tag. Cannot be empty.</li>
+            <li>snapshot(Long):  id of the snapshot which the tag is based on, 
it is optional.</li>
+            <li>time_retained: The maximum time retained for the existing tag, 
it is optional.</li>
+      </td>
+      <td>
+         CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', 
snapshot => 10, time_retained => '1 d')
+      </td>
+    </tr>
     <tr>
       <td>delete_tag</td>
       <td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b33cf6922..af0c3d71e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -576,6 +576,19 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         tagManager().renameTag(tagName, targetTagName);
     }
 
+    @Override
+    public void replaceTag(
+            String tagName, @Nullable Long fromSnapshotId, @Nullable Duration 
timeRetained) {
+        if (fromSnapshotId == null) {
+            Snapshot latestSnapshot = snapshotManager().latestSnapshot();
+            SnapshotNotExistException.checkNotNull(
+                    latestSnapshot, "Cannot replace tag because latest 
snapshot doesn't exist.");
+            tagManager().replaceTag(latestSnapshot, tagName, timeRetained);
+        } else {
+            tagManager().replaceTag(findSnapshot(fromSnapshotId), tagName, 
timeRetained);
+        }
+    }
+
     @Override
     public void deleteTag(String tagName) {
         tagManager()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 5d6331aa4..f6f3930ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -187,6 +187,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.renameTag(tagName, targetTagName);
     }
 
+    @Override
+    public void replaceTag(String tagName, Long fromSnapshotId, Duration 
timeRetained) {
+        wrapped.replaceTag(tagName, fromSnapshotId, timeRetained);
+    }
+
     @Override
     public void deleteTag(String tagName) {
         wrapped.deleteTag(tagName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 3224131d4..a53ba545c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -271,6 +271,11 @@ public interface FormatTable extends Table {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    default void replaceTag(String tagName, Long fromSnapshotId, Duration 
timeRetained) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     default void deleteTag(String tagName) {
         throw new UnsupportedOperationException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 4ae593b55..fe5ebbfcd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -197,6 +197,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void replaceTag(String tagName, Long fromSnapshotId, Duration 
timeRetained) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support replaceTag.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void deleteTag(String tagName) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 11dc135a6..db6848f5f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -121,6 +121,10 @@ public interface Table extends Serializable {
     @Experimental
     void renameTag(String tagName, String targetTagName);
 
+    /** Replace a tag with new snapshot id and new time retained. */
+    @Experimental
+    void replaceTag(String tagName, Long fromSnapshotId, Duration 
timeRetained);
+
     /** Delete a tag by name. */
     @Experimental
     void deleteTag(String tagName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index c3a674bc5..65963aafd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -97,13 +97,26 @@ public class TagManager {
 
     /** Create a tag from given snapshot and save it in the storage. */
     public void createTag(
-            Snapshot snapshot,
-            String tagName,
-            @Nullable Duration timeRetained,
-            List<TagCallback> callbacks) {
+            Snapshot snapshot, String tagName, Duration timeRetained, 
List<TagCallback> callbacks) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' 
is blank.", tagName);
+        checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", 
tagName);
+        createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
+    }
+
+    /** Replace a tag from given snapshot and save it in the storage. */
+    public void replaceTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' 
is blank.", tagName);
+        checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", 
tagName);
+        createOrReplaceTag(snapshot, tagName, timeRetained, null);
+    }
 
+    public void createOrReplaceTag(
+            Snapshot snapshot,
+            String tagName,
+            @Nullable Duration timeRetained,
+            @Nullable List<TagCallback> callbacks) {
         // When timeRetained is not defined, please do not write the 
tagCreatorTime field,
         // as this will cause older versions (<= 0.7) of readers to be unable 
to read this
         // tag.
@@ -117,15 +130,7 @@ public class TagManager {
         Path tagPath = tagPath(tagName);
 
         try {
-            if (tagExists(tagName)) {
-                Snapshot tagged = taggedSnapshot(tagName);
-                Preconditions.checkArgument(
-                        tagged.id() == snapshot.id(), "Tag name '%s' already 
exists.", tagName);
-                // update tag metadata into for the same snapshot of the same 
tag name.
-                fileIO.overwriteFileUtf8(tagPath, content);
-            } else {
-                fileIO.writeFile(tagPath, content, false);
-            }
+            fileIO.overwriteFileUtf8(tagPath, content);
         } catch (IOException e) {
             throw new RuntimeException(
                     String.format(
@@ -135,11 +140,13 @@ public class TagManager {
                     e);
         }
 
-        try {
-            callbacks.forEach(callback -> callback.notifyCreation(tagName));
-        } finally {
-            for (TagCallback tagCallback : callbacks) {
-                IOUtils.closeQuietly(tagCallback);
+        if (callbacks != null) {
+            try {
+                callbacks.forEach(callback -> 
callback.notifyCreation(tagName));
+            } finally {
+                for (TagCallback tagCallback : callbacks) {
+                    IOUtils.closeQuietly(tagCallback);
+                }
             }
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f8b15c155..f6343bfe4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1136,8 +1136,9 @@ public abstract class FileStoreTableTestBase {
             table.createTag("test-tag", 1);
             // verify that tag file exist
             assertThat(tagManager.tagExists("test-tag")).isTrue();
-            // Create again
-            table.createTag("test-tag", 1);
+            // Create again failed if tag existed
+            Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 1))
+                    .hasMessageContaining("Tag name 'test-tag' already 
exists.");
             Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2))
                     .hasMessageContaining("Tag name 'test-tag' already 
exists.");
         }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
similarity index 83%
copy from 
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
copy to 
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
index 1a7b03ef6..2b7dadc05 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
@@ -29,16 +29,13 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 
-/**
- * Create tag procedure. Usage:
- *
- * <pre><code>
- *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
- * </code></pre>
- */
-public class CreateTagProcedure extends ProcedureBase {
+/** A base procedure to create or replace a tag. */
+public abstract class CreateOrReplaceTagBaseProcedure extends ProcedureBase {
 
-    public static final String IDENTIFIER = "create_tag";
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, tagName, null, null);
+    }
 
     public String[] call(
             ProcedureContext procedureContext, String tableId, String tagName, 
long snapshotId)
@@ -46,11 +43,6 @@ public class CreateTagProcedure extends ProcedureBase {
         return innerCall(tableId, tagName, snapshotId, null);
     }
 
-    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
-            throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, null, null);
-    }
-
     public String[] call(
             ProcedureContext procedureContext,
             String tableId,
@@ -74,14 +66,13 @@ public class CreateTagProcedure extends ProcedureBase {
             @Nullable String timeRetained)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
-        if (snapshotId == null) {
-            table.createTag(tagName, toDuration(timeRetained));
-        } else {
-            table.createTag(tagName, snapshotId, toDuration(timeRetained));
-        }
+        createOrReplaceTag(table, tagName, snapshotId, 
toDuration(timeRetained));
         return new String[] {"Success"};
     }
 
+    abstract void createOrReplaceTag(
+            Table table, String tagName, Long snapshotId, Duration 
timeRetained);
+
     @Nullable
     private static Duration toDuration(@Nullable String s) {
         if (s == null) {
@@ -90,9 +81,4 @@ public class CreateTagProcedure extends ProcedureBase {
 
         return TimeUtils.parseDuration(s);
     }
-
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 1a7b03ef6..b1af1c939 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -18,14 +18,7 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.TimeUtils;
-
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import javax.annotation.Nullable;
 
 import java.time.Duration;
 
@@ -36,59 +29,17 @@ import java.time.Duration;
  *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
  * </code></pre>
  */
-public class CreateTagProcedure extends ProcedureBase {
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
 
     public static final String IDENTIFIER = "create_tag";
 
-    public String[] call(
-            ProcedureContext procedureContext, String tableId, String tagName, 
long snapshotId)
-            throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, snapshotId, null);
-    }
-
-    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
-            throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, null, null);
-    }
-
-    public String[] call(
-            ProcedureContext procedureContext,
-            String tableId,
-            String tagName,
-            long snapshotId,
-            String timeRetained)
-            throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, snapshotId, timeRetained);
-    }
-
-    public String[] call(
-            ProcedureContext procedureContext, String tableId, String tagName, 
String timeRetained)
-            throws Catalog.TableNotExistException {
-        return innerCall(tableId, tagName, null, timeRetained);
-    }
-
-    private String[] innerCall(
-            String tableId,
-            String tagName,
-            @Nullable Long snapshotId,
-            @Nullable String timeRetained)
-            throws Catalog.TableNotExistException {
-        Table table = catalog.getTable(Identifier.fromString(tableId));
+    @Override
+    void createOrReplaceTag(Table table, String tagName, Long snapshotId, 
Duration timeRetained) {
         if (snapshotId == null) {
-            table.createTag(tagName, toDuration(timeRetained));
+            table.createTag(tagName, timeRetained);
         } else {
-            table.createTag(tagName, snapshotId, toDuration(timeRetained));
+            table.createTag(tagName, snapshotId, timeRetained);
         }
-        return new String[] {"Success"};
-    }
-
-    @Nullable
-    private static Duration toDuration(@Nullable String s) {
-        if (s == null) {
-            return null;
-        }
-
-        return TimeUtils.parseDuration(s);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
new file mode 100644
index 000000000..6ed6ecc0e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.table.Table;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+    private static final String IDENTIFIER = "replace_tag";
+
+    @Override
+    void createOrReplaceTag(Table table, String tagName, Long snapshotId, 
Duration timeRetained) {
+        table.replaceTag(tagName, snapshotId, timeRetained);
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 3eb1bf3c4..f2385e66d 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -24,6 +24,7 @@ import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 
@@ -518,4 +519,33 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
         assertThat(sql("CALL sys.expire_tags('default.T')"))
                 .containsExactlyInAnyOrder(Row.of("tag-3"));
     }
+
+    @Test
+    public void testReplaceTags() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " id INT,"
+                        + " NAME STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket' = '1'"
+                        + ")");
+        sql("INSERT INTO T VALUES (1, 'a')");
+        sql("INSERT INTO T VALUES (2, 'b')");
+        
assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2L);
+
+        Assertions.assertThatThrownBy(() -> sql("CALL 
sys.replace_tag('default.T', 'test_tag')"))
+                .hasMessageContaining("Tag name 'test_tag' does not exist.");
+
+        sql("CALL sys.create_tag('default.T', 'test_tag')");
+        assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
+                .containsExactly(Row.of("test_tag", 2L, null));
+
+        sql("CALL sys.replace_tag('default.T', 'test_tag', 1)");
+        assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
+                .containsExactly(Row.of("test_tag", 1L, null));
+
+        sql("CALL sys.replace_tag('default.T', 'test_tag', 2, '1 d')");
+        assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
+                .containsExactly(Row.of("test_tag", 2L, "PT24H"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java
similarity index 63%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java
index 7769fa1d7..fecb6895b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java
@@ -26,20 +26,13 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 
-/** Factory to create {@link CreateTagAction}. */
-public class CreateTagActionFactory implements ActionFactory {
-
-    public static final String IDENTIFIER = "create_tag";
+/** Factory to create {@link ReplaceTagAction} or {@link ReplaceTagAction}. */
+public abstract class CreateOrReplaceTagActionFactory implements ActionFactory 
{
 
     private static final String TAG_NAME = "tag_name";
     private static final String SNAPSHOT = "snapshot";
     private static final String TIME_RETAINED = "time_retained";
 
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
-
     @Override
     public Optional<Action> create(MultipleParameterToolAdapter params) {
         checkRequiredArgument(params, TAG_NAME);
@@ -58,27 +51,15 @@ public class CreateTagActionFactory implements 
ActionFactory {
             timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED));
         }
 
-        CreateTagAction action =
-                new CreateTagAction(
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        catalogConfig,
-                        tagName,
-                        snapshot,
-                        timeRetained);
-        return Optional.of(action);
+        return Optional.of(
+                createOrReplaceTagAction(
+                        tablePath, catalogConfig, tagName, snapshot, 
timeRetained));
     }
 
-    @Override
-    public void printHelp() {
-        System.out.println("Action \"create_tag\" creates a tag from given 
snapshot.");
-        System.out.println();
-
-        System.out.println("Syntax:");
-        System.out.println(
-                "  create_tag --warehouse <warehouse_path> --database 
<database_name> "
-                        + "--table <table_name> --tag_name <tag_name> 
[--snapshot <snapshot_id>]");
-        System.out.println();
-    }
+    abstract Action createOrReplaceTagAction(
+            Tuple3<String, String, String> tablePath,
+            Map<String, String> catalogConfig,
+            String tagName,
+            Long snapshot,
+            Duration timeRetained);
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
index 7769fa1d7..c52594312 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
@@ -18,56 +18,36 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.utils.TimeUtils;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 
 import java.time.Duration;
 import java.util.Map;
-import java.util.Optional;
 
 /** Factory to create {@link CreateTagAction}. */
-public class CreateTagActionFactory implements ActionFactory {
+public class CreateTagActionFactory extends CreateOrReplaceTagActionFactory {
 
     public static final String IDENTIFIER = "create_tag";
 
-    private static final String TAG_NAME = "tag_name";
-    private static final String SNAPSHOT = "snapshot";
-    private static final String TIME_RETAINED = "time_retained";
-
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterToolAdapter params) {
-        checkRequiredArgument(params, TAG_NAME);
-
-        Tuple3<String, String, String> tablePath = getTablePath(params);
-        Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
-        String tagName = params.get(TAG_NAME);
-
-        Long snapshot = null;
-        if (params.has(SNAPSHOT)) {
-            snapshot = Long.parseLong(params.get(SNAPSHOT));
-        }
-
-        Duration timeRetained = null;
-        if (params.has(TIME_RETAINED)) {
-            timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED));
-        }
-
-        CreateTagAction action =
-                new CreateTagAction(
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        catalogConfig,
-                        tagName,
-                        snapshot,
-                        timeRetained);
-        return Optional.of(action);
+    Action createOrReplaceTagAction(
+            Tuple3<String, String, String> tablePath,
+            Map<String, String> catalogConfig,
+            String tagName,
+            Long snapshot,
+            Duration timeRetained) {
+        return new CreateTagAction(
+                tablePath.f0,
+                tablePath.f1,
+                tablePath.f2,
+                catalogConfig,
+                tagName,
+                snapshot,
+                timeRetained);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java
new file mode 100644
index 000000000..09a85fe8a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Map;
+
+/** Replace tag action for Flink. */
+public class ReplaceTagAction extends TableActionBase {
+
+    private final String tagName;
+    private final @Nullable Long snapshotId;
+    private final @Nullable Duration timeRetained;
+
+    public ReplaceTagAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable Duration timeRetained) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+        this.tagName = tagName;
+        this.timeRetained = timeRetained;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public void run() throws Exception {
+        table.replaceTag(tagName, snapshotId, timeRetained);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java
new file mode 100644
index 000000000..a734e9cfb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.time.Duration;
+import java.util.Map;
+
+/** Factory to create {@link ReplaceTagAction}. */
+public class ReplaceTagActionFactory extends CreateOrReplaceTagActionFactory {
+
+    public static final String IDENTIFIER = "replace_tag";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    Action createOrReplaceTagAction(
+            Tuple3<String, String, String> tablePath,
+            Map<String, String> catalogConfig,
+            String tagName,
+            Long snapshot,
+            Duration timeRetained) {
+        return new ReplaceTagAction(
+                tablePath.f0,
+                tablePath.f1,
+                tablePath.f2,
+                catalogConfig,
+                tagName,
+                snapshot,
+                timeRetained);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"replace_tag\" to replace an existing tag 
with new tag info.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  replace_tag --warehouse <warehouse_path> --database 
<database_name> "
+                        + "--table <table_name> --tag_name <tag_name> 
[--snapshot <snapshot_id>] [--time_retained <time_retained>]");
+        System.out.println();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
similarity index 81%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
index 3fb51c8d9..dba9d4663 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
@@ -32,16 +32,8 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 
-/**
- * Create tag procedure. Usage:
- *
- * <pre><code>
- *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
- * </code></pre>
- */
-public class CreateTagProcedure extends ProcedureBase {
-
-    public static final String IDENTIFIER = "create_tag";
+/** A base procedure to create or replace a tag. */
+public abstract class CreateOrReplaceTagBaseProcedure extends ProcedureBase {
 
     @ProcedureHint(
             argument = {
@@ -64,14 +56,13 @@ public class CreateTagProcedure extends ProcedureBase {
             @Nullable String timeRetained)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
-        if (snapshotId == null) {
-            table.createTag(tagName, toDuration(timeRetained));
-        } else {
-            table.createTag(tagName, snapshotId, toDuration(timeRetained));
-        }
+        createOrReplaceTag(table, tagName, snapshotId, 
toDuration(timeRetained));
         return new String[] {"Success"};
     }
 
+    abstract void createOrReplaceTag(
+            Table table, String tagName, Long snapshotId, Duration 
timeRetained);
+
     @Nullable
     private static Duration toDuration(@Nullable String s) {
         if (s == null) {
@@ -80,9 +71,4 @@ public class CreateTagProcedure extends ProcedureBase {
 
         return TimeUtils.parseDuration(s);
     }
-
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 3fb51c8d9..b1af1c939 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -18,17 +18,7 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.TimeUtils;
-
-import org.apache.flink.table.annotation.ArgumentHint;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.ProcedureHint;
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import javax.annotation.Nullable;
 
 import java.time.Duration;
 
@@ -39,46 +29,17 @@ import java.time.Duration;
  *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
  * </code></pre>
  */
-public class CreateTagProcedure extends ProcedureBase {
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
 
     public static final String IDENTIFIER = "create_tag";
 
-    @ProcedureHint(
-            argument = {
-                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
-                @ArgumentHint(name = "tag", type = @DataTypeHint("STRING")),
-                @ArgumentHint(
-                        name = "snapshot_id",
-                        type = @DataTypeHint("BIGINT"),
-                        isOptional = true),
-                @ArgumentHint(
-                        name = "time_retained",
-                        type = @DataTypeHint("STRING"),
-                        isOptional = true)
-            })
-    public String[] call(
-            ProcedureContext procedureContext,
-            String tableId,
-            String tagName,
-            @Nullable Long snapshotId,
-            @Nullable String timeRetained)
-            throws Catalog.TableNotExistException {
-        Table table = catalog.getTable(Identifier.fromString(tableId));
+    @Override
+    void createOrReplaceTag(Table table, String tagName, Long snapshotId, 
Duration timeRetained) {
         if (snapshotId == null) {
-            table.createTag(tagName, toDuration(timeRetained));
+            table.createTag(tagName, timeRetained);
         } else {
-            table.createTag(tagName, snapshotId, toDuration(timeRetained));
+            table.createTag(tagName, snapshotId, timeRetained);
         }
-        return new String[] {"Success"};
-    }
-
-    @Nullable
-    private static Duration toDuration(@Nullable String s) {
-        if (s == null) {
-            return null;
-        }
-
-        return TimeUtils.parseDuration(s);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
new file mode 100644
index 000000000..6ed6ecc0e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.table.Table;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+    private static final String IDENTIFIER = "replace_tag";
+
+    @Override
+    void createOrReplaceTag(Table table, String tagName, Long snapshotId, 
Duration timeRetained) {
+        table.replaceTag(tagName, snapshotId, timeRetained);
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 2cf57201d..2f40278d6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -26,6 +26,7 @@ 
org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory
 org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory
 org.apache.paimon.flink.action.DeleteTagActionFactory
 org.apache.paimon.flink.action.ExpireTagsActionFactory
+org.apache.paimon.flink.action.ReplaceTagActionFactory
 org.apache.paimon.flink.action.ResetConsumerActionFactory
 org.apache.paimon.flink.action.MigrateTableActionFactory
 org.apache.paimon.flink.action.MigrateFileActionFactory
@@ -51,6 +52,7 @@ 
org.apache.paimon.flink.procedure.CreateTagFromTimestampProcedure
 org.apache.paimon.flink.procedure.CreateTagFromWatermarkProcedure
 org.apache.paimon.flink.procedure.DeleteTagProcedure
 org.apache.paimon.flink.procedure.ExpireTagsProcedure
+org.apache.paimon.flink.procedure.ReplaceTagProcedure
 org.apache.paimon.flink.procedure.CreateBranchProcedure
 org.apache.paimon.flink.procedure.DeleteBranchProcedure
 org.apache.paimon.flink.procedure.DropPartitionProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
new file mode 100644
index 000000000..00b43b9e1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.TagManager;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link ReplaceTagAction}. */
+public class ReplaceTagActionTest extends ActionITCaseBase {
+
+    @BeforeEach
+    public void setUp() {
+        init(warehouse);
+    }
+
+    @Test
+    public void testReplaceTag() throws Exception {
+        bEnv.executeSql(
+                "CREATE TABLE T (id INT, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1')");
+
+        FileStoreTable table = getFileStoreTable("T");
+        TagManager tagManager = table.tagManager();
+
+        bEnv.executeSql("INSERT INTO T VALUES (1, 'a')").await();
+        bEnv.executeSql("INSERT INTO T VALUES (2, 'b')").await();
+        assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2);
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                bEnv.executeSql(
+                                        "CALL sys.replace_tag(`table` => 
'default.T', tag => 'test_tag')"))
+                .hasMessageContaining("Tag name 'test_tag' does not exist.");
+
+        bEnv.executeSql("CALL sys.create_tag(`table` => 'default.T', tag => 
'test_tag')");
+        assertThat(tagManager.tagExists("test_tag")).isEqualTo(true);
+        
assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(2);
+        
assertThat(tagManager.tag("test_tag").getTagTimeRetained()).isEqualTo(null);
+
+        // replace tag with new time_retained
+        createAction(
+                        ReplaceTagAction.class,
+                        "replace_tag",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        "T",
+                        "--tag_name",
+                        "test_tag",
+                        "--time_retained",
+                        "1 d")
+                .run();
+        
assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(24);
+
+        // replace tag with new snapshot and time_retained
+        createAction(
+                        ReplaceTagAction.class,
+                        "replace_tag",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        "T",
+                        "--tag_name",
+                        "test_tag",
+                        "--snapshot",
+                        "1",
+                        "--time_retained",
+                        "2 d")
+                .run();
+        
assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(1);
+        
assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(48);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
index c24d4105a..74e3aeeac 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
@@ -98,7 +98,8 @@ public class ProcedureTest {
     }
 
     private Method getMethodFromName(Class<?> clazz, String methodName) {
-        Method[] methods = clazz.getDeclaredMethods();
+        // get all methods of current and parent class
+        Method[] methods = clazz.getMethods();
 
         for (Method method : methods) {
             if (method.getName().equals(methodName)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
new file mode 100644
index 000000000..8a4eb791a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link ReplaceTagProcedure}. */
+public class ReplaceTagProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws 
Exception {
+        sql(
+                "CREATE TABLE T (id INT, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1')");
+
+        sql("INSERT INTO T VALUES (1, 'a')");
+        sql("INSERT INTO T VALUES (2, 'b')");
+        
assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2);
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CALL sys.replace_tag(`table` => 
'default.T', tag => 'test_tag')"))
+                .hasMessageContaining("Tag name 'test_tag' does not exist.");
+
+        sql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')");
+        assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
+                .containsExactly(Row.of("test_tag", 2L, null));
+
+        // replace tag with new time_retained
+        sql(
+                "CALL sys.replace_tag(`table` => 'default.T', tag => 
'test_tag',"
+                        + " time_retained => '1 d')");
+        assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
+                .containsExactly(Row.of("test_tag", 2L, "PT24H"));
+
+        // replace tag with new snapshot and time_retained
+        sql(
+                "CALL sys.replace_tag(`table` => 'default.T', tag => 
'test_tag',"
+                        + " snapshot => 1, time_retained => '2 d')");
+        assertThat(sql("select tag_name,snapshot_id,time_retained from 
`T$tags`"))
+                .containsExactly(Row.of("test_tag", 2L, "PT48H"));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index dee0c38d4..36c6ff897 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -38,6 +38,7 @@ import org.apache.paimon.spark.procedure.ProcedureBuilder;
 import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
 import org.apache.paimon.spark.procedure.RenameTagProcedure;
 import org.apache.paimon.spark.procedure.RepairProcedure;
+import org.apache.paimon.spark.procedure.ReplaceTagProcedure;
 import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
 import org.apache.paimon.spark.procedure.RollbackProcedure;
 
@@ -64,6 +65,7 @@ public class SparkProcedures {
                 ImmutableMap.builder();
         procedureBuilders.put("rollback", RollbackProcedure::builder);
         procedureBuilders.put("create_tag", CreateTagProcedure::builder);
+        procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
         procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
         procedureBuilders.put(
                 "create_tag_from_timestamp", 
CreateTagFromTimestampProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java
similarity index 79%
copy from 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
copy to 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java
index b3f863c5e..ed264140b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -33,8 +34,8 @@ import java.time.Duration;
 import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
-/** A procedure to create a tag. */
-public class CreateTagProcedure extends BaseProcedure {
+/** A base procedure to create or replace a tag. */
+public abstract class CreateOrReplaceTagBaseProcedure extends BaseProcedure {
 
     private static final ProcedureParameter[] PARAMETERS =
             new ProcedureParameter[] {
@@ -50,7 +51,7 @@ public class CreateTagProcedure extends BaseProcedure {
                         new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
                     });
 
-    protected CreateTagProcedure(TableCatalog tableCatalog) {
+    protected CreateOrReplaceTagBaseProcedure(TableCatalog tableCatalog) {
         super(tableCatalog);
     }
 
@@ -75,27 +76,12 @@ public class CreateTagProcedure extends BaseProcedure {
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
-                    if (snapshot == null) {
-                        table.createTag(tag, timeRetained);
-                    } else {
-                        table.createTag(tag, snapshot, timeRetained);
-                    }
+                    createOrReplaceTag(table, tag, snapshot, timeRetained);
                     InternalRow outputRow = newInternalRow(true);
                     return new InternalRow[] {outputRow};
                 });
     }
 
-    public static ProcedureBuilder builder() {
-        return new BaseProcedure.Builder<CreateTagProcedure>() {
-            @Override
-            public CreateTagProcedure doBuild() {
-                return new CreateTagProcedure(tableCatalog());
-            }
-        };
-    }
-
-    @Override
-    public String description() {
-        return "CreateTagProcedure";
-    }
+    abstract void createOrReplaceTag(
+            Table table, String tagName, Long snapshotId, Duration 
timeRetained);
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
index b3f863c5e..157743f98 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
@@ -18,71 +18,26 @@
 
 package org.apache.paimon.spark.procedure;
 
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
 
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 
 import java.time.Duration;
 
-import static org.apache.spark.sql.types.DataTypes.LongType;
-import static org.apache.spark.sql.types.DataTypes.StringType;
-
 /** A procedure to create a tag. */
-public class CreateTagProcedure extends BaseProcedure {
-
-    private static final ProcedureParameter[] PARAMETERS =
-            new ProcedureParameter[] {
-                ProcedureParameter.required("table", StringType),
-                ProcedureParameter.required("tag", StringType),
-                ProcedureParameter.optional("snapshot", LongType),
-                ProcedureParameter.optional("time_retained", StringType)
-            };
-
-    private static final StructType OUTPUT_TYPE =
-            new StructType(
-                    new StructField[] {
-                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
-                    });
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
 
-    protected CreateTagProcedure(TableCatalog tableCatalog) {
+    private CreateTagProcedure(TableCatalog tableCatalog) {
         super(tableCatalog);
     }
 
     @Override
-    public ProcedureParameter[] parameters() {
-        return PARAMETERS;
-    }
-
-    @Override
-    public StructType outputType() {
-        return OUTPUT_TYPE;
-    }
-
-    @Override
-    public InternalRow[] call(InternalRow args) {
-        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        String tag = args.getString(1);
-        Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
-        Duration timeRetained =
-                args.isNullAt(3) ? null : 
TimeUtils.parseDuration(args.getString(3));
-
-        return modifyPaimonTable(
-                tableIdent,
-                table -> {
-                    if (snapshot == null) {
-                        table.createTag(tag, timeRetained);
-                    } else {
-                        table.createTag(tag, snapshot, timeRetained);
-                    }
-                    InternalRow outputRow = newInternalRow(true);
-                    return new InternalRow[] {outputRow};
-                });
+    void createOrReplaceTag(Table table, String tagName, Long snapshotId, 
Duration timeRetained) {
+        if (snapshotId == null) {
+            table.createTag(tagName, timeRetained);
+        } else {
+            table.createTag(tagName, snapshotId, timeRetained);
+        }
     }
 
     public static ProcedureBuilder builder() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java
new file mode 100644
index 000000000..205fca5ee
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+    private ReplaceTagProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    void createOrReplaceTag(Table table, String tagName, Long snapshotId, 
Duration timeRetained) {
+        table.replaceTag(tagName, snapshotId, timeRetained);
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<ReplaceTagProcedure>() {
+            @Override
+            public ReplaceTagProcedure doBuild() {
+                return new ReplaceTagProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "ReplaceTagProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 6400cb88c..4a4c7ae21 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -172,19 +172,15 @@ class CreateAndDeleteTagProcedureTest extends 
PaimonSparkTestBase with StreamTes
                   "table => 'test.T', tag => 'test_tag', snapshot => 1)"),
               Row(true) :: Nil)
             checkAnswer(
-              spark.sql(
-                "SELECT count(time_retained) FROM paimon.test.`T$tags` where 
tag_name = 'test_tag'"),
-              Row(0) :: Nil)
+              spark.sql("SELECT count(*) FROM paimon.test.`T$tags` where 
tag_name = 'test_tag'"),
+              Row(1) :: Nil)
 
-            checkAnswer(
+            // throw exception "Tag test_tag already exists"
+            assertThrows[IllegalArgumentException] {
               spark.sql(
                 "CALL paimon.sys.create_tag(" +
-                  "table => 'test.T', tag => 'test_tag', time_retained => '5 
d', snapshot => 1)"),
-              Row(true) :: Nil)
-            checkAnswer(
-              spark.sql(
-                "SELECT count(time_retained) FROM paimon.test.`T$tags` where 
tag_name = 'test_tag'"),
-              Row(1) :: Nil)
+                  "table => 'test.T', tag => 'test_tag', time_retained => '5 
d', snapshot => 1)")
+            }
           } finally {
             stream.stop()
           }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala
new file mode 100644
index 000000000..5a9280887
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class ReplaceTagProcedureTest extends PaimonSparkTestBase {
+  test("Paimon Procedure: replace tag to update tag meta") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+                 |""".stripMargin)
+    spark.sql("insert into T values(1, 'a')")
+    spark.sql("insert into T values(2, 'b')")
+    assertResult(2)(loadTable("T").snapshotManager().snapshotCount())
+
+    // throw exception "Tag test_tag does not exist"
+    assertThrows[IllegalArgumentException] {
+      spark.sql("CALL paimon.sys.replace_tag(table => 'test.T', tag => 
'test_tag')")
+    }
+
+    spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 
'test_tag')")
+    checkAnswer(
+      spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+      Row("test_tag", 2, null) :: Nil)
+
+    // replace tag with new time_retained
+    spark.sql(
+      "CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag', 
time_retained => '1 d')")
+    checkAnswer(
+      spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+      Row("test_tag", 2, "PT24H") :: Nil)
+
+    // replace tag with new snapshot and time_retained
+    spark.sql(
+      "CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag', 
snapshot => 1, time_retained => '2 d')")
+    checkAnswer(
+      spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+      Row("test_tag", 1, "PT48H") :: Nil)
+  }
+}

Reply via email to