This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b268f8b [core] Fix that TagManager doesn't handle case where more 
than 1 tags are created based on the same snapshot (#2765)
f9b268f8b is described below

commit f9b268f8b27f27f2a98b1da4fee6072361386559
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 23 10:27:00 2024 +0800

    [core] Fix that TagManager doesn't handle case where more than 1 tags are 
created based on the same snapshot (#2765)
---
 .../apache/paimon/operation/OrphanFilesClean.java  |  4 +-
 .../org/apache/paimon/table/RollbackHelper.java    |  4 +-
 .../org/apache/paimon/table/system/TagsTable.java  | 18 ++++--
 .../org/apache/paimon/tag/TagAutoCreation.java     | 32 +++++-----
 .../org/apache/paimon/tag/TagPeriodHandler.java    | 12 ++++
 .../java/org/apache/paimon/tag/TagPreview.java     | 13 ++++
 .../java/org/apache/paimon/utils/TagManager.java   | 73 +++++++++++++++++++---
 .../apache/paimon/table/system/TagsTableTest.java  | 22 ++++---
 .../org/apache/paimon/tag/TagAutoCreationTest.java | 71 +++++++++++++--------
 .../flink/sink/BatchWriteGeneratorTagOperator.java | 27 +++++---
 .../AutoTagForSavepointCommitterOperatorTest.java  |  9 +--
 .../sink/BatchWriteGeneratorTagOperatorTest.java   | 15 +++++
 12 files changed, 220 insertions(+), 80 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 81fb7f6d1..19d6affa4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -50,7 +50,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -146,8 +145,7 @@ public class OrphanFilesClean {
             throws IOException, ExecutionException, InterruptedException {
         // safely get all snapshots to be read
         Set<Snapshot> readSnapshots = new 
HashSet<>(snapshotManager.safelyGetAllSnapshots());
-        SortedMap<Snapshot, String> tags = tagManager.tags();
-        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
         readSnapshots.addAll(taggedSnapshots);
 
         return FileUtils.COMMON_IO_FORK_JOIN_POOL
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index a1d5333e4..b64178455 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -122,7 +122,7 @@ public class RollbackHelper {
     }
 
     private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
-        SortedMap<Snapshot, String> tags = tagManager.tags();
+        SortedMap<Snapshot, List<String>> tags = tagManager.tags();
         if (tags.isEmpty()) {
             return Collections.emptyList();
         }
@@ -137,7 +137,7 @@ public class RollbackHelper {
                 break;
             }
             toBeCleaned.add(tag);
-            fileIO.deleteQuietly(tagManager.tagPath(tags.get(tag)));
+            tags.get(tag).forEach(tagName -> 
fileIO.deleteQuietly(tagManager.tagPath(tagName)));
         }
 
         // delete data files
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index eecea7d0e..98c3ec3cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -50,6 +50,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -191,9 +192,16 @@ public class TagsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((TagsSplit) split).location;
-            SortedMap<Snapshot, String> tags = new TagManager(fileIO, 
location).tags();
+            SortedMap<Snapshot, List<String>> tags = new TagManager(fileIO, 
location).tags();
+            Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
+            for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
+                for (String tagName : tag.getValue()) {
+                    nameToSnapshot.put(tagName, tag.getKey());
+                }
+            }
+
             Iterator<InternalRow> rows =
-                    Iterators.transform(tags.entrySet().iterator(), 
this::toRow);
+                    Iterators.transform(nameToSnapshot.entrySet().iterator(), 
this::toRow);
             if (projection != null) {
                 rows =
                         Iterators.transform(
@@ -202,10 +210,10 @@ public class TagsTable implements ReadonlyTable {
             return new IteratorRecordReader<>(rows);
         }
 
-        private InternalRow toRow(Map.Entry<Snapshot, String> tag) {
-            Snapshot snapshot = tag.getKey();
+        private InternalRow toRow(Map.Entry<String, Snapshot> tag) {
+            Snapshot snapshot = tag.getValue();
             return GenericRow.of(
-                    BinaryString.fromString(tag.getValue()),
+                    BinaryString.fromString(tag.getKey()),
                     snapshot.id(),
                     snapshot.schemaId(),
                     Timestamp.fromLocalDateTime(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 3b15d4891..27a8413bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -36,6 +36,7 @@ import java.util.SortedMap;
 
 import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
 import static 
org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** A manager to create tags automatically. */
 public class TagAutoCreation {
@@ -72,7 +73,7 @@ public class TagAutoCreation {
 
         this.periodHandler.validateDelay(delay);
 
-        SortedMap<Snapshot, String> tags = tagManager.tags(this::isAutoTag);
+        SortedMap<Snapshot, List<String>> tags = 
tagManager.tags(periodHandler::isAutoTag);
 
         if (tags.isEmpty()) {
             this.nextSnapshot =
@@ -81,20 +82,12 @@ public class TagAutoCreation {
             Snapshot lastTag = tags.lastKey();
             this.nextSnapshot = lastTag.id() + 1;
 
-            LocalDateTime time = periodHandler.tagToTime(tags.get(lastTag));
+            String tagName = checkAndGetOneAutoTag(tags.get(lastTag));
+            LocalDateTime time = periodHandler.tagToTime(tagName);
             this.nextTag = periodHandler.nextTagTime(time);
         }
     }
 
-    private boolean isAutoTag(String tag) {
-        try {
-            periodHandler.tagToTime(tag);
-            return true;
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
     public boolean forceCreatingSnapshot() {
         return timeExtractor instanceof ProcessTimeExtractor
                 && (nextTag == null
@@ -136,12 +129,14 @@ public class TagAutoCreation {
             nextTag = periodHandler.nextTagTime(thisTag);
 
             if (numRetainedMax != null) {
-                SortedMap<Snapshot, String> tags = 
tagManager.tags(this::isAutoTag);
+                // only handle auto-created tags here
+                SortedMap<Snapshot, List<String>> tags = 
tagManager.tags(periodHandler::isAutoTag);
                 if (tags.size() > numRetainedMax) {
                     int toDelete = tags.size() - numRetainedMax;
                     int i = 0;
-                    for (String tag : tags.values()) {
-                        tagManager.deleteTag(tag, tagDeletion, 
snapshotManager);
+                    for (List<String> tag : tags.values()) {
+                        tagManager.deleteTag(
+                                checkAndGetOneAutoTag(tag), tagDeletion, 
snapshotManager);
                         i++;
                         if (i == toDelete) {
                             break;
@@ -156,6 +151,15 @@ public class TagAutoCreation {
         return t1.isAfter(t2) || t1.isEqual(t2);
     }
 
+    public static String checkAndGetOneAutoTag(List<String> autoTags) {
+        checkState(
+                autoTags.size() == 1,
+                "There are more than 1 auto-created tags of the same snapshot: 
%s. This is unexpected.",
+                String.join(",", autoTags));
+
+        return autoTags.get(0);
+    }
+
     @Nullable
     public static TagAutoCreation create(
             CoreOptions options,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index fa18eebea..c0fbe718c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -88,6 +88,8 @@ public interface TagPeriodHandler {
 
     LocalDateTime nextTagTime(LocalDateTime time);
 
+    boolean isAutoTag(String tagName);
+
     /** Base implementation of {@link TagPeriodHandler}. */
     abstract class BaseTagPeriodHandler implements TagPeriodHandler {
 
@@ -124,6 +126,16 @@ public interface TagPeriodHandler {
         public LocalDateTime nextTagTime(LocalDateTime time) {
             return time.plus(onePeriod());
         }
+
+        @Override
+        public boolean isAutoTag(String tagName) {
+            try {
+                tagToTime(tagName);
+                return true;
+            } catch (Exception e) {
+                return false;
+            }
+        }
     }
 
     /** Hourly {@link TagPeriodHandler}. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
index cd1a3fda1..dcc03bcda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -27,7 +27,9 @@ import org.apache.paimon.utils.TagManager;
 import javax.annotation.Nullable;
 
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -84,6 +86,7 @@ public class TagPreview {
 
         Optional<String> findTag =
                 tagManager.tags().values().stream()
+                        .map(this::toOneAutoTag)
                         .filter(t -> t.compareTo(tag) <= 0)
                         .max(Comparator.naturalOrder());
         if (findTag.isPresent()) {
@@ -92,4 +95,14 @@ public class TagPreview {
 
         throw new RuntimeException("Cannot find snapshot or tag for tag name: 
" + tag);
     }
+
+    private String toOneAutoTag(List<String> tags) {
+        List<String> autoTags = new ArrayList<>();
+        for (String tag : tags) {
+            if (periodHandler.isAutoTag(tag)) {
+                autoTags.add(tag);
+            }
+        }
+        return TagAutoCreation.checkAndGetOneAutoTag(autoTags);
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index bebde6ce3..a29a3e151 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.SortedMap;
@@ -98,6 +100,25 @@ public class TagManager {
         }
     }
 
+    /** Make sure the tagNames are ALL tags of one snapshot. */
+    public void deleteAllTagsOfOneSnapshot(
+            List<String> tagNames, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
+        Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0));
+        List<Snapshot> taggedSnapshots;
+
+        // skip file deletion if snapshot exists
+        if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
+            tagNames.forEach(tagName -> 
fileIO.deleteQuietly(tagPath(tagName)));
+            return;
+        } else {
+            // FileIO discovers tags by tag file, so we should read all tags 
before we delete tag
+            taggedSnapshots = taggedSnapshots();
+            tagNames.forEach(tagName -> 
fileIO.deleteQuietly(tagPath(tagName)));
+        }
+
+        doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
+    }
+
     public void deleteTag(
             String tagName, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
@@ -112,10 +133,24 @@ public class TagManager {
             return;
         } else {
             // FileIO discovers tags by tag file, so we should read all tags 
before we delete tag
-            taggedSnapshots = taggedSnapshots();
+            SortedMap<Snapshot, List<String>> tags = tags();
             fileIO.deleteQuietly(tagPath(tagName));
+
+            // skip data file clean if more than 1 tags are created based on 
this snapshot
+            if (tags.get(taggedSnapshot).size() > 1) {
+                return;
+            }
+            taggedSnapshots = new ArrayList<>(tags.keySet());
         }
 
+        doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
+    }
+
+    private void doClean(
+            Snapshot taggedSnapshot,
+            List<Snapshot> taggedSnapshots,
+            SnapshotManager snapshotManager,
+            TagDeletion tagDeletion) {
         // collect skipping sets from the left neighbor tag and the nearest 
right neighbor (either
         // the earliest snapshot or right neighbor tag)
         List<Snapshot> skippedSnapshots = new ArrayList<>();
@@ -141,8 +176,8 @@ public class TagManager {
         } catch (Exception e) {
             LOG.info(
                     String.format(
-                            "Skip cleaning data files of tag '%s' due to 
failed to build skipping set.",
-                            tagName),
+                            "Skip cleaning data files for tag of snapshot %s 
due to failed to build skipping set.",
+                            taggedSnapshot.id()),
                     e);
             success = false;
         }
@@ -189,7 +224,7 @@ public class TagManager {
     }
 
     /** Get all tagged snapshots with names sorted by snapshot id. */
-    public SortedMap<Snapshot, String> tags() {
+    public SortedMap<Snapshot, List<String>> tags() {
         return tags(tagName -> true);
     }
 
@@ -204,8 +239,9 @@ public class TagManager {
      *     name.
      * @throws RuntimeException if an IOException occurs during retrieval of 
snapshots.
      */
-    public SortedMap<Snapshot, String> tags(Predicate<String> filter) {
-        TreeMap<Snapshot, String> tags = new 
TreeMap<>(Comparator.comparingLong(Snapshot::id));
+    public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
+        TreeMap<Snapshot, List<String>> tags =
+                new TreeMap<>(Comparator.comparingLong(Snapshot::id));
         try {
             List<Path> paths =
                     listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
@@ -221,7 +257,10 @@ public class TagManager {
                 // If the tag file is not found, it might be deleted by
                 // other processes, so just skip this tag
                 Snapshot.safelyFromPath(fileIO, path)
-                        .ifPresent(snapshot -> tags.put(snapshot, tagName));
+                        .ifPresent(
+                                snapshot ->
+                                        tags.computeIfAbsent(snapshot, s -> 
new ArrayList<>())
+                                                .add(tagName));
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -229,6 +268,26 @@ public class TagManager {
         return tags;
     }
 
+    public List<String> sortTagsOfOneSnapshot(List<String> tagNames) {
+        return tagNames.stream()
+                .map(
+                        name -> {
+                            try {
+                                return fileIO.getFileStatus(tagPath(name));
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        })
+                
.sorted(Comparator.comparingLong(FileStatus::getModificationTime))
+                .map(fileStatus -> 
fileStatus.getPath().getName().substring(TAG_PREFIX.length()))
+                .collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    public List<String> allTagNames() {
+        return 
tags().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+    }
+
     private int findIndex(Snapshot taggedSnapshot, List<Snapshot> 
taggedSnapshots) {
         for (int i = 0; i < taggedSnapshots.size(); i++) {
             if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 55ba3648b..c6df41841 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -78,6 +78,7 @@ public class TagsTableTest extends TableTestBase {
                                 .getMillisecond()));
         tagsTable = (TagsTable) catalog.getTable(identifier(tableName + 
"$tags"));
         tagManager = table.store().newTagManager();
+        table.createTag("many-tags-test");
     }
 
     @Test
@@ -89,17 +90,18 @@ public class TagsTableTest extends TableTestBase {
 
     private List<InternalRow> getExceptedResult() {
         List<InternalRow> internalRows = new ArrayList<>();
-        for (Map.Entry<Snapshot, String> tag : tagManager.tags().entrySet()) {
+        for (Map.Entry<Snapshot, List<String>> tag : 
tagManager.tags().entrySet()) {
             Snapshot snapshot = tag.getKey();
-
-            internalRows.add(
-                    GenericRow.of(
-                            BinaryString.fromString(tag.getValue()),
-                            snapshot.id(),
-                            snapshot.schemaId(),
-                            Timestamp.fromLocalDateTime(
-                                    
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
-                            snapshot.totalRecordCount()));
+            for (String tagName : tag.getValue()) {
+                internalRows.add(
+                        GenericRow.of(
+                                BinaryString.fromString(tagName),
+                                snapshot.id(),
+                                snapshot.schemaId(),
+                                Timestamp.fromLocalDateTime(
+                                        
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
+                                snapshot.totalRecordCount()));
+            }
         }
         return internalRows;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 9b0233ef4..4d5c3ebce 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -59,20 +59,20 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test normal creation
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         // test not creation
         commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-18T12:59:00")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         // test just in time
         commit.commit(new ManifestCommittable(2, 
utcMills("2023-07-18T13:00:00")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11", 
"2023-07-18 12");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11", 
"2023-07-18 12");
 
         // test expire old tag
         commit.commit(new ManifestCommittable(3, 
utcMills("2023-07-18T14:00:00")));
         commit.commit(new ManifestCommittable(4, 
utcMills("2023-07-18T15:00:00")));
-        assertThat(tagManager.tags().values())
+        assertThat(tagManager.allTagNames())
                 .containsOnly("2023-07-18 12", "2023-07-18 13", "2023-07-18 
14");
 
         // test restore with snapshot expiration
@@ -92,7 +92,7 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // check tags
         commit.commit(new ManifestCommittable(9, 
utcMills("2023-07-18T16:00:00")));
-        assertThat(tagManager.tags().values())
+        assertThat(tagManager.allTagNames())
                 .containsOnly("2023-07-18 13", "2023-07-18 14", "2023-07-18 
15");
         commit.close();
     }
@@ -109,15 +109,15 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test first create tag anyway
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:00:09")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         // test not create due to delay
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T13:00:09")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         // test create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T13:00:10")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11", 
"2023-07-18 12");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11", 
"2023-07-18 12");
         commit.close();
     }
 
@@ -133,11 +133,11 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test first create
         commit.commit(new ManifestCommittable(0, 
localZoneMills("2023-07-18T12:00:09")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         // test second create
         commit.commit(new ManifestCommittable(0, 
localZoneMills("2023-07-18T13:00:10")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11", 
"2023-07-18 12");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11", 
"2023-07-18 12");
         commit.close();
     }
 
@@ -152,15 +152,15 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test first create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:00:01")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 10");
 
         // test no create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T13:00:01")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 10");
 
         // test second create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T14:00:09")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10", 
"2023-07-18 12");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 10", 
"2023-07-18 12");
         commit.close();
     }
 
@@ -175,18 +175,17 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test first create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:00:01")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-17");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-17");
 
         // test second create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-19T12:00:01")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-17", 
"2023-07-18");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-17", 
"2023-07-18");
 
         // test newCommit create
         commit.close();
         commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-20T12:00:01")));
-        assertThat(tagManager.tags().values())
-                .containsOnly("2023-07-17", "2023-07-18", "2023-07-19");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-17", 
"2023-07-18", "2023-07-19");
         commit.close();
     }
 
@@ -205,7 +204,7 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test first create
         commit.commit(new ManifestCommittable(0, 
localZoneMills("2023-07-18T12:00:09")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
         table = table.copy(options.toMap());
@@ -214,7 +213,7 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test newCommit create
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-20T12:00:01")));
-        assertThat(tagManager.tags().values()).contains("2023-07-18 11", 
"2023-07-19");
+        assertThat(tagManager.allTagNames()).contains("2023-07-18 11", 
"2023-07-19");
     }
 
     @Test
@@ -233,18 +232,20 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
 
         // test normal creation
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
-        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
 
         table.createTag("savepoint-11", 1);
 
         // test newCommit create
         commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-18T14:00:00")));
-        assertThat(tagManager.tags().values()).contains("2023-07-18 11", 
"2023-07-18 13");
+        assertThat(tagManager.allTagNames())
+                .containsOnly("savepoint-11", "2023-07-18 11", "2023-07-18 
13");
 
         // test expire old tag
         commit.commit(new ManifestCommittable(2, 
utcMills("2023-07-18T15:00:00")));
         commit.commit(new ManifestCommittable(3, 
utcMills("2023-07-18T16:00:00")));
-        assertThat(tagManager.tags().values())
+        // only handle auto-created tags
+        assertThat(tagManager.allTagNames())
                 .containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 
14", "2023-07-18 15");
     }
 
@@ -259,10 +260,10 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
         TagManager tagManager = table.store().newTagManager();
 
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
-        assertThat(tagManager.tags().values()).containsOnly("20230717");
+        assertThat(tagManager.allTagNames()).containsOnly("20230717");
 
         commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-19T12:12:00")));
-        assertThat(tagManager.tags().values()).contains("20230717", 
"20230718");
+        assertThat(tagManager.allTagNames()).contains("20230717", "20230718");
     }
 
     @Test
@@ -276,10 +277,28 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
         TagManager tagManager = table.store().newTagManager();
 
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
-        assertThat(tagManager.tags().values()).containsOnly("20230718 11");
+        assertThat(tagManager.allTagNames()).containsOnly("20230718 11");
 
         commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-18T13:13:00")));
-        assertThat(tagManager.tags().values()).contains("20230718 11", 
"20230718 12");
+        assertThat(tagManager.allTagNames()).contains("20230718 11", "20230718 
12");
+    }
+
+    @Test
+    public void testOnlyExpireAutoCreatedTag() {
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+        options.set(TAG_NUM_RETAINED_MAX, 1);
+        FileStoreTable table = this.table.copy(options.toMap());
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        TagManager tagManager = table.store().newTagManager();
+
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
+        table.createTag("many-tags-test");
+        assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11", 
"many-tags-test");
+
+        commit.commit(new ManifestCommittable(1, 
utcMills("2023-07-18T13:13:00")));
+        assertThat(tagManager.allTagNames()).contains("2023-07-18 12", 
"many-tags-test");
     }
 
     private long localZoneMills(String timestamp) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index 36ae32d15..d65ab7414 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -46,7 +46,7 @@ import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
-import java.util.SortedMap;
+import java.util.List;
 
 /**
  * Commit {@link Committable} for snapshot using the {@link 
CommitterOperator}. When the task is
@@ -136,14 +136,23 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
             }
             TagManager tagManager = table.tagManager();
             TagDeletion tagDeletion = table.store().newTagDeletion();
-            SortedMap<Snapshot, String> tags = tagManager.tags();
-            if (tags.size() > tagNumRetainedMax) {
-                int toDelete = tags.size() - tagNumRetainedMax;
-                int i = 0;
-                for (String tag : tags.values()) {
-                    tagManager.deleteTag(tag, tagDeletion, snapshotManager);
-                    i++;
-                    if (i == toDelete) {
+            long tagCount = tagManager.tagCount();
+
+            while (tagCount > tagNumRetainedMax) {
+                for (List<String> tagNames : tagManager.tags().values()) {
+                    if (tagCount - tagNames.size() >= tagNumRetainedMax) {
+                        tagManager.deleteAllTagsOfOneSnapshot(
+                                tagNames, tagDeletion, snapshotManager);
+                        tagCount = tagCount - tagNames.size();
+                    } else {
+                        List<String> sortedTagNames = 
tagManager.sortTagsOfOneSnapshot(tagNames);
+                        for (String toBeDeleted : sortedTagNames) {
+                            tagManager.deleteTag(toBeDeleted, tagDeletion, 
snapshotManager);
+                            tagCount--;
+                            if (tagCount == tagNumRetainedMax) {
+                                break;
+                            }
+                        }
                         break;
                     }
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index 880e052c7..0dbde0579 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -88,10 +89,10 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
         Snapshot snapshot = table.snapshotManager().snapshot(2);
         assertThat(snapshot).isNotNull();
         assertThat(snapshot.id()).isEqualTo(2);
-        Map<Snapshot, String> tags = table.tagManager().tags();
+        Map<Snapshot, List<String>> tags = table.tagManager().tags();
         assertThat(tags).containsOnlyKeys(snapshot);
         assertThat(tags.get(snapshot))
-                
.isEqualTo(AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + 2);
+                
.containsOnly(AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + 2);
     }
 
     @Test
@@ -139,10 +140,10 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
         assertThat(snapshot).isNotNull();
         assertThat(snapshot.id()).isEqualTo(checkpointId);
 
-        Map<Snapshot, String> tags = table.tagManager().tags();
+        Map<Snapshot, List<String>> tags = table.tagManager().tags();
         assertThat(tags).containsOnlyKeys(snapshot);
         assertThat(tags.get(snapshot))
-                .isEqualTo(
+                .containsOnly(
                         
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index 894410dae..f020f65bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -87,6 +87,21 @@ public class BatchWriteGeneratorTagOperatorTest extends 
CommitterOperatorTest {
         assertThat(table.tagManager().tagCount()).isEqualTo(1);
         // The tag is consistent with the latest snapshot
         
assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot());
+
+        // test tag expiration
+        table.createTag("many-tags-test1");
+        Thread.sleep(1_000);
+        table.createTag("many-tags-test2");
+        assertThat(tagManager.tagCount()).isEqualTo(3);
+
+        write.write(GenericRow.of(2, 20L));
+        tableCommit = table.newCommit(initialCommitUser);
+        tableCommit.commit(write.prepareCommit(false, 2));
+        // note that this tag has the same name with previous tag
+        // so the previous tag will be deleted
+        committerOperator.finish();
+
+        assertThat(tagManager.allTagNames()).containsOnly("many-tags-test2", 
tagName);
     }
 
     @Override


Reply via email to