This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2aaab6b7d [core] Update tag metadata for the same snapshot of the same
tag (#3563)
2aaab6b7d is described below
commit 2aaab6b7d33fa5a704f84d9e317b066c73370657
Author: askwang <[email protected]>
AuthorDate: Tue Jun 25 15:49:00 2024 +0800
[core] Update tag metadata for the same snapshot of the same tag (#3563)
---
.../java/org/apache/paimon/utils/TagManager.java | 39 +++++++++------
.../CreateAndDeleteTagProcedureTest.scala | 57 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 15 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 23675bf9c..b814bd7bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -94,32 +94,41 @@ public class TagManager {
List<TagCallback> callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
- // skip create tag for the same snapshot of the same name.
+ // When timeRetained is not defined, please do not write the
tagCreatorTime field,
+ // as this will cause older versions (<= 0.7) of readers to be unable
to read this
+ // tag.
+ // When timeRetained is defined, it is fine, because timeRetained is
the new
+ // feature.
+ String content =
+ timeRetained != null
+ ? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained,
LocalDateTime.now())
+ .toJson()
+ : snapshot.toJson();
+ Path tagPath = tagPath(tagName);
+
+ // update tag metadata into for the same snapshot of the same tag name.
if (tagExists(tagName)) {
Snapshot tagged = taggedSnapshot(tagName);
Preconditions.checkArgument(
tagged.id() == snapshot.id(), "Tag name '%s' already
exists.", tagName);
+ try {
+ fileIO.overwriteFileUtf8(tagPath, content);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Tag already exists. Failed to update tag
metadata info for tag '%s' (path %s).",
+ tagName, tagPath),
+ e);
+ }
} else {
- Path newTagPath = tagPath(tagName);
try {
- // When timeRetained is not defined, please do not write the
tagCreatorTime field,
- // as this will cause older versions (<= 0.7) of readers to be
unable to read this
- // tag.
- // When timeRetained is defined, it is fine, because
timeRetained is the new
- // feature.
- fileIO.writeFileUtf8(
- newTagPath,
- timeRetained != null
- ? Tag.fromSnapshotAndTagTtl(
- snapshot, timeRetained,
LocalDateTime.now())
- .toJson()
- : snapshot.toJson());
+ fileIO.writeFileUtf8(tagPath, content);
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when committing tag '%s'
(path %s). "
+ "Cannot clean up because we can't
determine the success.",
- tagName, newTagPath),
+ tagName, tagPath),
e);
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 3c9531a8a..4351abd3f 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -98,4 +98,61 @@ class CreateAndDeleteTagProcedureTest extends
PaimonSparkTestBase with StreamTes
}
}
}
+
+ test("Paimon Procedure: create same tag with same snapshot") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.create_tag(" +
+ "table => 'test.T', tag => 'test_tag', snapshot => 1)"),
+ Row(true) :: Nil)
+ checkAnswer(
+ spark.sql(
+ "SELECT count(time_retained) FROM paimon.test.`T$tags` where
tag_name = 'test_tag'"),
+ Row(0) :: Nil)
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.create_tag(" +
+ "table => 'test.T', tag => 'test_tag', time_retained => '5
d', snapshot => 1)"),
+ Row(true) :: Nil)
+ checkAnswer(
+ spark.sql(
+ "SELECT count(time_retained) FROM paimon.test.`T$tags` where
tag_name = 'test_tag'"),
+ Row(1) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}