This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 907525dd5 [core] support create tag based on tag (#3044)
907525dd5 is described below
commit 907525dd5e5bba57a12cdb6b6e065ed9e805beaf
Author: Aitozi <[email protected]>
AuthorDate: Tue Mar 19 17:32:25 2024 +0800
[core] support create tag based on tag (#3044)
---
.../paimon/table/AbstractFileStoreTable.java | 19 ++++++-
.../java/org/apache/paimon/utils/TagManager.java | 28 ++++++----
.../paimon/table/FileStoreTableTestBase.java | 59 ++++++++++++++++++++++
3 files changed, 93 insertions(+), 13 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 39b368982..d2c840ba9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -65,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.SortedMap;
import java.util.function.BiConsumer;
import static org.apache.paimon.CoreOptions.PATH;
@@ -427,11 +428,25 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public void createTag(String tagName, long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
+ Snapshot snapshot = null;
+ if (snapshotManager.snapshotExists(fromSnapshotId)) {
+ snapshot = snapshotManager.snapshot(fromSnapshotId);
+ } else {
+ SortedMap<Snapshot, List<String>> tags = tagManager().tags();
+ for (Snapshot snap : tags.keySet()) {
+ if (snap.id() == fromSnapshotId) {
+ snapshot = snap;
+ break;
+ } else if (snap.id() > fromSnapshotId) {
+ break;
+ }
+ }
+ }
checkArgument(
- snapshotManager.snapshotExists(fromSnapshotId),
+ snapshot != null,
"Cannot create tag because given snapshot #%s doesn't exist.",
fromSnapshotId);
- createTag(tagName, snapshotManager.snapshot(fromSnapshotId));
+ createTag(tagName, snapshot);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a29a3e151..90f690053 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -77,18 +77,24 @@ public class TagManager {
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName, List<TagCallback>
callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
- checkArgument(!tagExists(tagName), "Tag name '%s' already exists.",
tagName);
- Path newTagPath = tagPath(tagName);
- try {
- fileIO.writeFileUtf8(newTagPath, snapshot.toJson());
- } catch (IOException e) {
- throw new RuntimeException(
- String.format(
- "Exception occurs when committing tag '%s' (path
%s). "
- + "Cannot clean up because we can't
determine the success.",
- tagName, newTagPath),
- e);
+ // skip create tag for the same snapshot of the same name.
+ if (tagExists(tagName)) {
+ Snapshot tagged = taggedSnapshot(tagName);
+ Preconditions.checkArgument(
+ tagged.id() == snapshot.id(), "Tag name '%s' already
exists.", tagName);
+ } else {
+ Path newTagPath = tagPath(tagName);
+ try {
+ fileIO.writeFileUtf8(newTagPath, snapshot.toJson());
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Exception occurs when committing tag '%s'
(path %s). "
+ + "Cannot clean up because we can't
determine the success.",
+ tagName, newTagPath),
+ e);
+ }
}
try {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 4cd019568..fabd61639 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -68,6 +68,7 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.apache.paimon.utils.TraceableFileIO;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -915,6 +916,64 @@ public abstract class FileStoreTableTestBase {
assertThat(tagged.equals(snapshot2)).isTrue();
}
+ @Test
+ public void testCreateTagOnExpiredSnapshot() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(SNAPSHOT_NUM_RETAINED_MAX, 1);
+ conf.set(SNAPSHOT_NUM_RETAINED_MIN, 1);
+ });
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ // snapshot 1
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ table.createTag("test-tag", 1);
+ // verify that tag file exist
+ TagManager tagManager = new TagManager(new TraceableFileIO(),
tablePath);
+ assertThat(tagManager.tagExists("test-tag")).isTrue();
+ // verify that test-tag is equal to snapshot 1
+ Snapshot tagged = tagManager.taggedSnapshot("test-tag");
+ Snapshot snapshot1 = table.snapshotManager().snapshot(1);
+ assertThat(tagged.equals(snapshot1)).isTrue();
+ // snapshot 2
+ write.write(rowData(2, 20, 200L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ SnapshotManager snapshotManager = new SnapshotManager(new
TraceableFileIO(), tablePath);
+ // The snapshot 1 is expired.
+ assertThat(snapshotManager.snapshotExists(1)).isFalse();
+ table.createTag("test-tag-2", 1);
+ // verify that tag file exist
+ assertThat(tagManager.tagExists("test-tag-2")).isTrue();
+ // verify that test-tag is equal to snapshot 1
+ Snapshot tag2 = tagManager.taggedSnapshot("test-tag-2");
+ assertThat(tag2.equals(snapshot1)).isTrue();
+ }
+ }
+
+ @Test
+ public void testCreateSameTagName() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ // snapshot 1
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ // snapshot 2
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ TagManager tagManager = new TagManager(new TraceableFileIO(),
tablePath);
+ table.createTag("test-tag", 1);
+ // verify that tag file exist
+ assertThat(tagManager.tagExists("test-tag")).isTrue();
+ // Create again
+ table.createTag("test-tag", 1);
+ Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2))
+ .hasMessageContaining("Tag name 'test-tag' already
exists.");
+ }
+ }
+
@Test
public void testCreateBranch() throws Exception {
FileStoreTable table = createFileStoreTable();