This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e60ba2b704 [core] Call Iceberg callback on replace_tag procedure 
(#5666)
e60ba2b704 is described below

commit e60ba2b704f295aa9f322a48f75ce80a8a595737
Author: Nick Del Nano <[email protected]>
AuthorDate: Tue Jun 3 06:26:02 2025 -0700

    [core] Call Iceberg callback on replace_tag procedure (#5666)
---
 .../paimon/table/AbstractFileStoreTable.java       | 14 +++-
 .../java/org/apache/paimon/utils/TagManager.java   | 12 +++-
 .../flink/iceberg/Flink116IcebergITCase.java       |  5 ++
 .../flink/iceberg/Flink117IcebergITCase.java       |  5 ++
 .../flink/iceberg/FlinkIcebergITCaseBase.java      | 77 ++++++++++++++++++++++
 5 files changed, 109 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d7b0af322d..6494521915 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -587,9 +587,19 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
             Snapshot latestSnapshot = snapshotManager().latestSnapshot();
             SnapshotNotExistException.checkNotNull(
                     latestSnapshot, "Cannot replace tag because latest 
snapshot doesn't exist.");
-            tagManager().replaceTag(latestSnapshot, tagName, timeRetained);
+            tagManager()
+                    .replaceTag(
+                            latestSnapshot,
+                            tagName,
+                            timeRetained,
+                            store().createTagCallbacks(this));
         } else {
-            tagManager().replaceTag(findSnapshot(fromSnapshotId), tagName, 
timeRetained);
+            tagManager()
+                    .replaceTag(
+                            findSnapshot(fromSnapshotId),
+                            tagName,
+                            timeRetained,
+                            store().createTagCallbacks(this));
         }
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 132ea8bb9c..a4491f2fa8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergCommitCallback;
 import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
@@ -112,10 +113,17 @@ public class TagManager {
     }
 
     /** Replace a tag from given snapshot and save it in the storage. */
-    public void replaceTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {
+    public void replaceTag(
+            Snapshot snapshot, String tagName, Duration timeRetained, 
List<TagCallback> callbacks) {
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name 
shouldn't be blank.");
         checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
-        createOrReplaceTag(snapshot, tagName, timeRetained, null);
+        createOrReplaceTag(
+                snapshot,
+                tagName,
+                timeRetained,
+                callbacks.stream()
+                        .filter(callback -> callback instanceof 
IcebergCommitCallback)
+                        .collect(Collectors.toList()));
     }
 
     private void createOrReplaceTag(
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
index dc4810a902..c60df74aa1 100644
--- 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
@@ -36,4 +36,9 @@ public class Flink116IcebergITCase extends 
FlinkIcebergITCaseBase {
     public void testDeleteTags(String format) throws Exception {
         // Flink 1.16 does not support delete_tag procedure so we skip this 
test.
     }
+
+    @Override
+    public void testReplaceTags(String format) throws Exception {
+        // Flink 1.16 does not support replace_tag procedure so we skip this 
test.
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
 
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
index 7a6f0271ad..4acf8cab6b 100644
--- 
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
+++ 
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
@@ -29,4 +29,9 @@ public class Flink117IcebergITCase extends 
FlinkIcebergITCaseBase {
     public void testDeleteTags(String format) throws Exception {
         // Flink 1.17 does not support delete_tag procedure so we skip this 
test.
     }
+
+    @Override
+    public void testReplaceTags(String format) throws Exception {
+        // Flink 1.17 does not support replace_tag procedure so we skip this 
test.
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
index 14ad801607..15cd142815 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
@@ -623,6 +623,83 @@ public abstract class FlinkIcebergITCaseBase extends 
AbstractTestBase {
                 .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 1L));
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"orc", "parquet", "avro"})
+    public void testReplaceTags(String format) throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+        tEnv.executeSql(
+                "CREATE CATALOG paimon WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                "CREATE TABLE paimon.`default`.T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v1 INT,\n"
+                        + "  v2 STRING,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+                        // make sure all changes are visible in iceberg 
metadata
+                        + "  'full-compaction.delta-commits' = '1',\n"
+                        + "  'file.format' = '"
+                        + format
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 100, 'apple'), "
+                                + "(1, 11, 110, 'banana'), "
+                                + "(2, 20, 200, 'cat'), "
+                                + "(2, 21, 210, 'dog')")
+                .await();
+
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 101, 'red'), "
+                                + "(1, 12, 121, 'green'), "
+                                + "(2, 20, 201, 'blue'), "
+                                + "(2, 22, 221, 'yellow')")
+                .await();
+
+        tEnv.executeSql(
+                "CREATE CATALOG iceberg WITH (\n"
+                        + "  'type' = 'iceberg',\n"
+                        + "  'catalog-type' = 'hadoop',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "/iceberg',\n"
+                        + "  'cache-enabled' = 'false'\n"
+                        + ")");
+
+        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
+
+        // Replace tag
+        tEnv.executeSql("CALL paimon.sys.replace_tag('default.T', 'tag1', 4, 
'1d')");
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT name, type, snapshot_id FROM 
iceberg.`default`.T$refs")))
+                .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 4L));
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v1, k, v2, pt FROM 
iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(101, 10, "red", 1),
+                        Row.of(110, 11, "banana", 1),
+                        Row.of(121, 12, "green", 1),
+                        Row.of(201, 20, "blue", 2),
+                        Row.of(210, 21, "dog", 2),
+                        Row.of(221, 22, "yellow", 2));
+    }
+
     private List<Row> collect(TableResult result) throws Exception {
         List<Row> rows = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {

Reply via email to