This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a2ee900c86 [core] One snapshot should have only one auto tag (#5832)
a2ee900c86 is described below
commit a2ee900c862d4af4b66455818e674e0a865ea3c5
Author: JackeyLee007 <[email protected]>
AuthorDate: Tue Jul 8 20:51:28 2025 +0800
[core] One snapshot should have only one auto tag (#5832)
---
docs/content/spark/sql-ddl.md | 1 +
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../paimon/table/AbstractFileStoreTable.java | 2 +-
.../org/apache/paimon/table/system/TagsTable.java | 5 +-
.../java/org/apache/paimon/utils/TagManager.java | 59 ++++++++++++++++++++--
.../org/apache/paimon/tag/TagAutoManagerTest.java | 18 +++++++
6 files changed, 82 insertions(+), 6 deletions(-)
diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 7a65094c1a..0019d8de18 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -340,6 +340,7 @@ ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24
HOURS;
-- create or replace a tag, create tag if it not exist, replace tag if it
exists.
ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
```
+NOTE: If tag.automatic-creation is set, only one auto-tag could be created for
one snapshot.
### Delete Tag
Delete a tag or multiple tags of a table.
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 411c8268b9..da3f61527e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -78,6 +78,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
import static
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -333,7 +334,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
@Override
public TagManager newTagManager() {
- return new TagManager(fileIO, options.path());
+ return new TagManager(fileIO, options.path(), DEFAULT_MAIN_BRANCH,
options);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7ea19d9f02..5dd13a37c0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -670,7 +670,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public TagManager tagManager() {
- return new TagManager(fileIO, path, currentBranch());
+ return new TagManager(fileIO, path, currentBranch(), coreOptions());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 236fbe3110..220f012336 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -223,7 +223,10 @@ public class TagsTable implements ReadonlyTable {
}
Path location = ((TagsSplit) split).location;
Predicate predicate = ((TagsSplit) split).tagPredicate;
- TagManager tagManager = new TagManager(fileIO, location, branch);
+
+ // There should not be any tag creation related options here for
tags table.
+ TagManager tagManager =
+ new TagManager(fileIO, location, branch,
CoreOptions.fromMap(options()));
Map<String, Tag> nameToSnapshot = new TreeMap<>();
Map<String, Tag> predicateMap = new TreeMap<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a4491f2fa8..89bc4d64d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -18,6 +18,7 @@
package org.apache.paimon.utils;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
@@ -28,6 +29,8 @@ import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.Tag;
+import org.apache.paimon.tag.TagPeriodHandler;
+import org.apache.paimon.tag.TagTimeExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +43,7 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -64,20 +68,26 @@ public class TagManager {
private final FileIO fileIO;
private final Path tablePath;
private final String branch;
+ private final CoreOptions coreOptions;
public TagManager(FileIO fileIO, Path tablePath) {
- this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+ this(fileIO, tablePath, DEFAULT_MAIN_BRANCH,
CoreOptions.fromMap(Collections.emptyMap()));
}
- /** Specify the default branch for data writing. */
public TagManager(FileIO fileIO, Path tablePath, String branch) {
+ this(fileIO, tablePath, branch,
CoreOptions.fromMap(Collections.emptyMap()));
+ }
+
+ /** Specify the default branch for data writing. */
+ public TagManager(FileIO fileIO, Path tablePath, String branch,
CoreOptions coreOptions) {
this.fileIO = fileIO;
this.tablePath = tablePath;
this.branch = BranchManager.normalizeBranch(branch);
+ this.coreOptions = coreOptions;
}
public TagManager copyWithBranch(String branchName) {
- return new TagManager(fileIO, tablePath, branchName);
+ return new TagManager(fileIO, tablePath, branchName, coreOptions);
}
/** Return the root Directory of tags. */
@@ -131,6 +141,17 @@ public class TagManager {
String tagName,
@Nullable Duration timeRetained,
@Nullable List<TagCallback> callbacks) {
+
+ if (isAutoTag(tagName)) {
+ List<String> autoTags = getSnapshotAutoTags(snapshot);
+ if (!autoTags.isEmpty()) {
+ throw new RuntimeException(
+ String.format(
+ "Snapshot %s is already auto-tagged with %s.",
+ snapshot.id(), autoTags));
+ }
+ }
+
// When timeRetained is not defined, please do not write the
tagCreatorTime field, as this
// will cause older versions (<= 0.7) of readers to be unable to read
this tag.
// When timeRetained is defined, it is fine, because timeRetained is
the new feature.
@@ -437,4 +458,36 @@ public class TagManager {
"Didn't find tag with snapshot id '%s'. This is
unexpected.",
taggedSnapshot.id()));
}
+
+ /**
+ * @param tagName
+ * @return true only if auto-tag enabled and the name is in right format
+ */
+ private boolean isAutoTag(String tagName) {
+ TagTimeExtractor extractor =
TagTimeExtractor.createForAutoTag(coreOptions);
+ if (extractor == null) {
+ return false;
+ }
+ TagPeriodHandler periodHandler = TagPeriodHandler.create(coreOptions);
+ return periodHandler.isAutoTag(tagName);
+ }
+
+ /**
+ * @param snapshot
+ * @return the auto-tag names of the snapshot, empty if auto-tag is not
enabled
+ */
+ private List<String> getSnapshotAutoTags(Snapshot snapshot) {
+ TagTimeExtractor extractor =
TagTimeExtractor.createForAutoTag(coreOptions);
+ // no auto-tagging, no auto-tags
+ if (extractor == null) {
+ return Collections.emptyList();
+ }
+
+ TagPeriodHandler periodHandler = TagPeriodHandler.create(coreOptions);
+
+ // auto-tagging, tags in auto-tag format are auto-tags
+ return tags().getOrDefault(snapshot, new ArrayList<>()).stream()
+ .filter(tag -> periodHandler.isAutoTag(tag))
+ .collect(Collectors.toList());
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 3794c8dc8a..089a639c50 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -48,6 +48,7 @@ import static
org.apache.paimon.CoreOptions.TAG_DEFAULT_TIME_RETAINED;
import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** Test for tag automatic creation. */
public class TagAutoManagerTest extends PrimaryKeyTableTestBase {
@@ -483,6 +484,23 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
commit.close();
}
+ @Test
+ public void testMultiAutoTagsOnOneSnapshot() throws Exception {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // auto-tag
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:01")));
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-17");
+
+ // create a tag in auto-tag format should incur an exception
+ assertThrows(RuntimeException.class, () ->
table.createTag("2023-07-18", 1));
+ }
+
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())