This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aaab6b7d [core] Update tag metadata for the same snapshot of the same 
tag (#3563)
2aaab6b7d is described below

commit 2aaab6b7d33fa5a704f84d9e317b066c73370657
Author: askwang <[email protected]>
AuthorDate: Tue Jun 25 15:49:00 2024 +0800

    [core] Update tag metadata for the same snapshot of the same tag (#3563)
---
 .../java/org/apache/paimon/utils/TagManager.java   | 39 +++++++++------
 .../CreateAndDeleteTagProcedureTest.scala          | 57 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 15 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 23675bf9c..b814bd7bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -94,32 +94,41 @@ public class TagManager {
             List<TagCallback> callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
 
-        // skip create tag for the same snapshot of the same name.
+        // When timeRetained is not defined, please do not write the 
tagCreatorTime field,
+        // as this will cause older versions (<= 0.7) of readers to be unable 
to read this
+        // tag.
+        // When timeRetained is defined, it is fine, because timeRetained is 
the new
+        // feature.
+        String content =
+                timeRetained != null
+                        ? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, 
LocalDateTime.now())
+                                .toJson()
+                        : snapshot.toJson();
+        Path tagPath = tagPath(tagName);
+
+        // update tag metadata into for the same snapshot of the same tag name.
         if (tagExists(tagName)) {
             Snapshot tagged = taggedSnapshot(tagName);
             Preconditions.checkArgument(
                     tagged.id() == snapshot.id(), "Tag name '%s' already 
exists.", tagName);
+            try {
+                fileIO.overwriteFileUtf8(tagPath, content);
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        String.format(
+                                "Tag already exists. Failed to update tag 
metadata info for tag '%s' (path %s).",
+                                tagName, tagPath),
+                        e);
+            }
         } else {
-            Path newTagPath = tagPath(tagName);
             try {
-                // When timeRetained is not defined, please do not write the 
tagCreatorTime field,
-                // as this will cause older versions (<= 0.7) of readers to be 
unable to read this
-                // tag.
-                // When timeRetained is defined, it is fine, because 
timeRetained is the new
-                // feature.
-                fileIO.writeFileUtf8(
-                        newTagPath,
-                        timeRetained != null
-                                ? Tag.fromSnapshotAndTagTtl(
-                                                snapshot, timeRetained, 
LocalDateTime.now())
-                                        .toJson()
-                                : snapshot.toJson());
+                fileIO.writeFileUtf8(tagPath, content);
             } catch (IOException e) {
                 throw new RuntimeException(
                         String.format(
                                 "Exception occurs when committing tag '%s' 
(path %s). "
                                         + "Cannot clean up because we can't 
determine the success.",
-                                tagName, newTagPath),
+                                tagName, tagPath),
                         e);
             }
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 3c9531a8a..4351abd3f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -98,4 +98,61 @@ class CreateAndDeleteTagProcedureTest extends 
PaimonSparkTestBase with StreamTes
       }
     }
   }
+
+  test("Paimon Procedure: create same tag with same snapshot") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.create_tag(" +
+                  "table => 'test.T', tag => 'test_tag', snapshot => 1)"),
+              Row(true) :: Nil)
+            checkAnswer(
+              spark.sql(
+                "SELECT count(time_retained) FROM paimon.test.`T$tags` where 
tag_name = 'test_tag'"),
+              Row(0) :: Nil)
+
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.create_tag(" +
+                  "table => 'test.T', tag => 'test_tag', time_retained => '5 
d', snapshot => 1)"),
+              Row(true) :: Nil)
+            checkAnswer(
+              spark.sql(
+                "SELECT count(time_retained) FROM paimon.test.`T$tags` where 
tag_name = 'test_tag'"),
+              Row(1) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }

Reply via email to