This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ee900c86 [core]  One snapshot should have only one auto tag (#5832)
a2ee900c86 is described below

commit a2ee900c862d4af4b66455818e674e0a865ea3c5
Author: JackeyLee007 <[email protected]>
AuthorDate: Tue Jul 8 20:51:28 2025 +0800

    [core]  One snapshot should have only one auto tag (#5832)
---
 docs/content/spark/sql-ddl.md                      |  1 +
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../paimon/table/AbstractFileStoreTable.java       |  2 +-
 .../org/apache/paimon/table/system/TagsTable.java  |  5 +-
 .../java/org/apache/paimon/utils/TagManager.java   | 59 ++++++++++++++++++++--
 .../org/apache/paimon/tag/TagAutoManagerTest.java  | 18 +++++++
 6 files changed, 82 insertions(+), 6 deletions(-)

diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 7a65094c1a..0019d8de18 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -340,6 +340,7 @@ ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 
HOURS;
 -- create or replace a tag, create tag if it not exist, replace tag if it 
exists.
 ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
 ```
+NOTE: If tag.automatic-creation is set, only one auto-tag could be created for 
one snapshot.
 
 ### Delete Tag
 Delete a tag or multiple tags of a table.
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 411c8268b9..da3f61527e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -78,6 +78,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
 import static 
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -333,7 +334,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
 
     @Override
     public TagManager newTagManager() {
-        return new TagManager(fileIO, options.path());
+        return new TagManager(fileIO, options.path(), DEFAULT_MAIN_BRANCH, 
options);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7ea19d9f02..5dd13a37c0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -670,7 +670,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public TagManager tagManager() {
-        return new TagManager(fileIO, path, currentBranch());
+        return new TagManager(fileIO, path, currentBranch(), coreOptions());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 236fbe3110..220f012336 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -223,7 +223,10 @@ public class TagsTable implements ReadonlyTable {
             }
             Path location = ((TagsSplit) split).location;
             Predicate predicate = ((TagsSplit) split).tagPredicate;
-            TagManager tagManager = new TagManager(fileIO, location, branch);
+
+            // There should not be any tag creation related options here for 
tags table.
+            TagManager tagManager =
+                    new TagManager(fileIO, location, branch, 
CoreOptions.fromMap(options()));
 
             Map<String, Tag> nameToSnapshot = new TreeMap<>();
             Map<String, Tag> predicateMap = new TreeMap<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a4491f2fa8..89bc4d64d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
@@ -28,6 +29,8 @@ import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.Tag;
+import org.apache.paimon.tag.TagPeriodHandler;
+import org.apache.paimon.tag.TagTimeExtractor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +43,7 @@ import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -64,20 +68,26 @@ public class TagManager {
     private final FileIO fileIO;
     private final Path tablePath;
     private final String branch;
+    private final CoreOptions coreOptions;
 
     public TagManager(FileIO fileIO, Path tablePath) {
-        this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+        this(fileIO, tablePath, DEFAULT_MAIN_BRANCH, 
CoreOptions.fromMap(Collections.emptyMap()));
     }
 
-    /** Specify the default branch for data writing. */
     public TagManager(FileIO fileIO, Path tablePath, String branch) {
+        this(fileIO, tablePath, branch, 
CoreOptions.fromMap(Collections.emptyMap()));
+    }
+
+    /** Specify the default branch for data writing. */
+    public TagManager(FileIO fileIO, Path tablePath, String branch, 
CoreOptions coreOptions) {
         this.fileIO = fileIO;
         this.tablePath = tablePath;
         this.branch = BranchManager.normalizeBranch(branch);
+        this.coreOptions = coreOptions;
     }
 
     public TagManager copyWithBranch(String branchName) {
-        return new TagManager(fileIO, tablePath, branchName);
+        return new TagManager(fileIO, tablePath, branchName, coreOptions);
     }
 
     /** Return the root Directory of tags. */
@@ -131,6 +141,17 @@ public class TagManager {
             String tagName,
             @Nullable Duration timeRetained,
             @Nullable List<TagCallback> callbacks) {
+
+        if (isAutoTag(tagName)) {
+            List<String> autoTags = getSnapshotAutoTags(snapshot);
+            if (!autoTags.isEmpty()) {
+                throw new RuntimeException(
+                        String.format(
+                                "Snapshot %s is already auto-tagged with %s.",
+                                snapshot.id(), autoTags));
+            }
+        }
+
         // When timeRetained is not defined, please do not write the 
tagCreatorTime field, as this
         // will cause older versions (<= 0.7) of readers to be unable to read 
this tag.
         // When timeRetained is defined, it is fine, because timeRetained is 
the new feature.
@@ -437,4 +458,36 @@ public class TagManager {
                         "Didn't find tag with snapshot id '%s'. This is 
unexpected.",
                         taggedSnapshot.id()));
     }
+
+    /**
+     * @param tagName
+     * @return true only if auto-tag enabled and the name is in right format
+     */
+    private boolean isAutoTag(String tagName) {
+        TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(coreOptions);
+        if (extractor == null) {
+            return false;
+        }
+        TagPeriodHandler periodHandler = TagPeriodHandler.create(coreOptions);
+        return periodHandler.isAutoTag(tagName);
+    }
+
+    /**
+     * @param snapshot
+     * @return the auto-tag names of the snapshot, empty if auto-tag is not 
enabled
+     */
+    private List<String> getSnapshotAutoTags(Snapshot snapshot) {
+        TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(coreOptions);
+        // no auto-tagging, no auto-tags
+        if (extractor == null) {
+            return Collections.emptyList();
+        }
+
+        TagPeriodHandler periodHandler = TagPeriodHandler.create(coreOptions);
+
+        // auto-tagging, tags in auto-tag format are auto-tags
+        return tags().getOrDefault(snapshot, new ArrayList<>()).stream()
+                .filter(tag -> periodHandler.isAutoTag(tag))
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 3794c8dc8a..089a639c50 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -48,6 +48,7 @@ import static 
org.apache.paimon.CoreOptions.TAG_DEFAULT_TIME_RETAINED;
 import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Test for tag automatic creation. */
 public class TagAutoManagerTest extends PrimaryKeyTableTestBase {
@@ -483,6 +484,23 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testMultiAutoTagsOnOneSnapshot() throws Exception {
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+        FileStoreTable table = this.table.copy(options.toMap());
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        TagManager tagManager = table.store().newTagManager();
+
+        // auto-tag
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:00:01")));
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-17");
+
+        // create a tag in auto-tag format should incur an exception
+        assertThrows(RuntimeException.class, () -> 
table.createTag("2023-07-18", 1));
+    }
+
     private long localZoneMills(String timestamp) {
         return LocalDateTime.parse(timestamp)
                 .atZone(ZoneId.systemDefault())

Reply via email to