This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 0e609d70da0abef12bf2d89c217fed641704fd41 Author: monster <[email protected]> AuthorDate: Tue Dec 19 16:30:48 2023 +0800 [core] Fix creation failure caused by savepoint tag (#2515) --- .../org/apache/paimon/tag/TagAutoCreation.java | 4 +-- .../java/org/apache/paimon/utils/TagManager.java | 26 ++++++++++++++---- .../org/apache/paimon/tag/TagAutoCreationTest.java | 31 ++++++++++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index e02da25c1..c4e9f7948 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -67,7 +67,7 @@ public class TagAutoCreation { this.periodHandler.validateDelay(delay); - SortedMap<Snapshot, String> tags = tagManager.tags(); + SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint")); if (tags.isEmpty()) { this.nextSnapshot = @@ -122,7 +122,7 @@ public class TagAutoCreation { nextTag = periodHandler.nextTagTime(thisTag); if (numRetainedMax != null) { - SortedMap<Snapshot, String> tags = tagManager.tags(); + SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint")); if (tags.size() > numRetainedMax) { int toDelete = tags.size() - numRetainedMax; int i = 0; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 927dbd373..bd526ee2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -179,6 +179,21 @@ public class TagManager { /** Get all tagged snapshots with names sorted by snapshot id. */ public SortedMap<Snapshot, String> tags() { + return tags(tagName -> true); + } + + /** + * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate + * determines which tag names should be included in the result. Only snapshots with tag names + * that pass the predicate test are included. + * + * @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the + * test are excluded from the result. + * @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag + * name. + * @throws RuntimeException if an IOException occurs during retrieval of snapshots. + */ + public SortedMap<Snapshot, String> tags(Predicate<String> filter) { TreeMap<Snapshot, String> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { List<Path> paths = @@ -187,14 +202,15 @@ public class TagManager { .collect(Collectors.toList()); for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); + + if (!filter.test(tagName)) { + continue; + } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag Snapshot.safelyFromPath(fileIO, path) - .ifPresent( - snapshot -> - tags.put( - snapshot, - path.getName().substring(TAG_PREFIX.length()))); + .ifPresent(snapshot -> tags.put(snapshot, tagName)); } } catch (IOException e) { throw new RuntimeException(e); diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java index 53dd6aed2..3b753093b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java @@ -215,6 +215,37 @@ public class TagAutoCreationTest extends PrimaryKeyTableTestBase { assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-19"); } + @Test + public void testSavepointTag() { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + FileStoreTable table; + TableCommitImpl commit; + TagManager tagManager; + table = this.table.copy(options.toMap()); + + commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11"); + + table.createTag("savepoint-11", 1); + + // test newCommit create + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-18 13"); + + // test expire old tag + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:00:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + assertThat(tagManager.tags().values()) + .containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15"); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault())
