This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e60ba2b704 [core] Call Iceberg callback on replace_tag procedure
(#5666)
e60ba2b704 is described below
commit e60ba2b704f295aa9f322a48f75ce80a8a595737
Author: Nick Del Nano <[email protected]>
AuthorDate: Tue Jun 3 06:26:02 2025 -0700
[core] Call Iceberg callback on replace_tag procedure (#5666)
---
.../paimon/table/AbstractFileStoreTable.java | 14 +++-
.../java/org/apache/paimon/utils/TagManager.java | 12 +++-
.../flink/iceberg/Flink116IcebergITCase.java | 5 ++
.../flink/iceberg/Flink117IcebergITCase.java | 5 ++
.../flink/iceberg/FlinkIcebergITCaseBase.java | 77 ++++++++++++++++++++++
5 files changed, 109 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d7b0af322d..6494521915 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -587,9 +587,19 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
SnapshotNotExistException.checkNotNull(
latestSnapshot, "Cannot replace tag because latest
snapshot doesn't exist.");
- tagManager().replaceTag(latestSnapshot, tagName, timeRetained);
+ tagManager()
+ .replaceTag(
+ latestSnapshot,
+ tagName,
+ timeRetained,
+ store().createTagCallbacks(this));
} else {
- tagManager().replaceTag(findSnapshot(fromSnapshotId), tagName,
timeRetained);
+ tagManager()
+ .replaceTag(
+ findSnapshot(fromSnapshotId),
+ tagName,
+ timeRetained,
+ store().createTagCallbacks(this));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 132ea8bb9c..a4491f2fa8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergCommitCallback;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
@@ -112,10 +113,17 @@ public class TagManager {
}
/** Replace a tag from given snapshot and save it in the storage. */
- public void replaceTag(Snapshot snapshot, String tagName, Duration
timeRetained) {
+ public void replaceTag(
+ Snapshot snapshot, String tagName, Duration timeRetained,
List<TagCallback> callbacks) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name
shouldn't be blank.");
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
- createOrReplaceTag(snapshot, tagName, timeRetained, null);
+ createOrReplaceTag(
+ snapshot,
+ tagName,
+ timeRetained,
+ callbacks.stream()
+ .filter(callback -> callback instanceof
IcebergCommitCallback)
+ .collect(Collectors.toList()));
}
private void createOrReplaceTag(
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
index dc4810a902..c60df74aa1 100644
---
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
@@ -36,4 +36,9 @@ public class Flink116IcebergITCase extends
FlinkIcebergITCaseBase {
public void testDeleteTags(String format) throws Exception {
// Flink 1.16 does not support delete_tag procedure so we skip this
test.
}
+
+ @Override
+ public void testReplaceTags(String format) throws Exception {
+ // Flink 1.16 does not support replace_tag procedure so we skip this
test.
+ }
}
diff --git
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
index 7a6f0271ad..4acf8cab6b 100644
---
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
+++
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
@@ -29,4 +29,9 @@ public class Flink117IcebergITCase extends
FlinkIcebergITCaseBase {
public void testDeleteTags(String format) throws Exception {
// Flink 1.17 does not support delete_tag procedure so we skip this
test.
}
+
+ @Override
+ public void testReplaceTags(String format) throws Exception {
+ // Flink 1.17 does not support replace_tag procedure so we skip this
test.
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
index 14ad801607..15cd142815 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
@@ -623,6 +623,83 @@ public abstract class FlinkIcebergITCaseBase extends
AbstractTestBase {
.containsExactlyInAnyOrder(Row.of("tag1", "TAG", 1L));
}
+ @ParameterizedTest
+ @ValueSource(strings = {"orc", "parquet", "avro"})
+ public void testReplaceTags(String format) throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+ tEnv.executeSql(
+ "CREATE CATALOG paimon WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "CREATE TABLE paimon.`default`.T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v1 INT,\n"
+ + " v2 STRING,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+ // make sure all changes are visible in iceberg
metadata
+ + " 'full-compaction.delta-commits' = '1',\n"
+ + " 'file.format' = '"
+ + format
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 100, 'apple'), "
+ + "(1, 11, 110, 'banana'), "
+ + "(2, 20, 200, 'cat'), "
+ + "(2, 21, 210, 'dog')")
+ .await();
+
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 101, 'red'), "
+ + "(1, 12, 121, 'green'), "
+ + "(2, 20, 201, 'blue'), "
+ + "(2, 22, 221, 'yellow')")
+ .await();
+
+ tEnv.executeSql(
+ "CREATE CATALOG iceberg WITH (\n"
+ + " 'type' = 'iceberg',\n"
+ + " 'catalog-type' = 'hadoop',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "/iceberg',\n"
+ + " 'cache-enabled' = 'false'\n"
+ + ")");
+
+ tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
+
+ // Replace tag
+ tEnv.executeSql("CALL paimon.sys.replace_tag('default.T', 'tag1', 4,
'1d')");
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT name, type, snapshot_id FROM
iceberg.`default`.T$refs")))
+ .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 4L));
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v1, k, v2, pt FROM
iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(101, 10, "red", 1),
+ Row.of(110, 11, "banana", 1),
+ Row.of(121, 12, "green", 1),
+ Row.of(201, 20, "blue", 2),
+ Row.of(210, 21, "dog", 2),
+ Row.of(221, 22, "yellow", 2));
+ }
+
private List<Row> collect(TableResult result) throws Exception {
List<Row> rows = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {