This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 907525dd5 [core] support create tag based on tag (#3044)
907525dd5 is described below

commit 907525dd5e5bba57a12cdb6b6e065ed9e805beaf
Author: Aitozi <[email protected]>
AuthorDate: Tue Mar 19 17:32:25 2024 +0800

    [core] support create tag based on tag (#3044)
---
 .../paimon/table/AbstractFileStoreTable.java       | 19 ++++++-
 .../java/org/apache/paimon/utils/TagManager.java   | 28 ++++++----
 .../paimon/table/FileStoreTableTestBase.java       | 59 ++++++++++++++++++++++
 3 files changed, 93 insertions(+), 13 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 39b368982..d2c840ba9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -65,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.function.BiConsumer;
 
 import static org.apache.paimon.CoreOptions.PATH;
@@ -427,11 +428,25 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public void createTag(String tagName, long fromSnapshotId) {
         SnapshotManager snapshotManager = snapshotManager();
+        Snapshot snapshot = null;
+        if (snapshotManager.snapshotExists(fromSnapshotId)) {
+            snapshot = snapshotManager.snapshot(fromSnapshotId);
+        } else {
+            SortedMap<Snapshot, List<String>> tags = tagManager().tags();
+            for (Snapshot snap : tags.keySet()) {
+                if (snap.id() == fromSnapshotId) {
+                    snapshot = snap;
+                    break;
+                } else if (snap.id() > fromSnapshotId) {
+                    break;
+                }
+            }
+        }
         checkArgument(
-                snapshotManager.snapshotExists(fromSnapshotId),
+                snapshot != null,
                 "Cannot create tag because given snapshot #%s doesn't exist.",
                 fromSnapshotId);
-        createTag(tagName, snapshotManager.snapshot(fromSnapshotId));
+        createTag(tagName, snapshot);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a29a3e151..90f690053 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -77,18 +77,24 @@ public class TagManager {
     /** Create a tag from given snapshot and save it in the storage. */
     public void createTag(Snapshot snapshot, String tagName, List<TagCallback> 
callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
-        checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", 
tagName);
 
-        Path newTagPath = tagPath(tagName);
-        try {
-            fileIO.writeFileUtf8(newTagPath, snapshot.toJson());
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    String.format(
-                            "Exception occurs when committing tag '%s' (path 
%s). "
-                                    + "Cannot clean up because we can't 
determine the success.",
-                            tagName, newTagPath),
-                    e);
+        // skip create tag for the same snapshot of the same name.
+        if (tagExists(tagName)) {
+            Snapshot tagged = taggedSnapshot(tagName);
+            Preconditions.checkArgument(
+                    tagged.id() == snapshot.id(), "Tag name '%s' already 
exists.", tagName);
+        } else {
+            Path newTagPath = tagPath(tagName);
+            try {
+                fileIO.writeFileUtf8(newTagPath, snapshot.toJson());
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing tag '%s' 
(path %s). "
+                                        + "Cannot clean up because we can't 
determine the success.",
+                                tagName, newTagPath),
+                        e);
+            }
         }
 
         try {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 4cd019568..fabd61639 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -68,6 +68,7 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 import org.apache.paimon.utils.TraceableFileIO;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -915,6 +916,64 @@ public abstract class FileStoreTableTestBase {
         assertThat(tagged.equals(snapshot2)).isTrue();
     }
 
+    @Test
+    public void testCreateTagOnExpiredSnapshot() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(SNAPSHOT_NUM_RETAINED_MAX, 1);
+                            conf.set(SNAPSHOT_NUM_RETAINED_MIN, 1);
+                        });
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            // snapshot 1
+            write.write(rowData(1, 10, 100L));
+            commit.commit(0, write.prepareCommit(false, 1));
+            table.createTag("test-tag", 1);
+            // verify that tag file exist
+            TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
+            assertThat(tagManager.tagExists("test-tag")).isTrue();
+            // verify that test-tag is equal to snapshot 1
+            Snapshot tagged = tagManager.taggedSnapshot("test-tag");
+            Snapshot snapshot1 = table.snapshotManager().snapshot(1);
+            assertThat(tagged.equals(snapshot1)).isTrue();
+            // snapshot 2
+            write.write(rowData(2, 20, 200L));
+            commit.commit(1, write.prepareCommit(false, 2));
+            SnapshotManager snapshotManager = new SnapshotManager(new 
TraceableFileIO(), tablePath);
+            // The snapshot 1 is expired.
+            assertThat(snapshotManager.snapshotExists(1)).isFalse();
+            table.createTag("test-tag-2", 1);
+            // verify that tag file exist
+            assertThat(tagManager.tagExists("test-tag-2")).isTrue();
+            // verify that test-tag is equal to snapshot 1
+            Snapshot tag2 = tagManager.taggedSnapshot("test-tag-2");
+            assertThat(tag2.equals(snapshot1)).isTrue();
+        }
+    }
+
+    @Test
+    public void testCreateSameTagName() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            // snapshot 1
+            write.write(rowData(1, 10, 100L));
+            commit.commit(0, write.prepareCommit(false, 1));
+            // snapshot 2
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+            TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
+            table.createTag("test-tag", 1);
+            // verify that tag file exist
+            assertThat(tagManager.tagExists("test-tag")).isTrue();
+            // Create again
+            table.createTag("test-tag", 1);
+            Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2))
+                    .hasMessageContaining("Tag name 'test-tag' already 
exists.");
+        }
+    }
+
     @Test
     public void testCreateBranch() throws Exception {
         FileStoreTable table = createFileStoreTable();

Reply via email to