This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9b268f8b [core] Fix that TagManager doesn't handle case where more
than 1 tags are created based on the same snapshot (#2765)
f9b268f8b is described below
commit f9b268f8b27f27f2a98b1da4fee6072361386559
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 23 10:27:00 2024 +0800
[core] Fix that TagManager doesn't handle case where more than 1 tags are
created based on the same snapshot (#2765)
---
.../apache/paimon/operation/OrphanFilesClean.java | 4 +-
.../org/apache/paimon/table/RollbackHelper.java | 4 +-
.../org/apache/paimon/table/system/TagsTable.java | 18 ++++--
.../org/apache/paimon/tag/TagAutoCreation.java | 32 +++++-----
.../org/apache/paimon/tag/TagPeriodHandler.java | 12 ++++
.../java/org/apache/paimon/tag/TagPreview.java | 13 ++++
.../java/org/apache/paimon/utils/TagManager.java | 73 +++++++++++++++++++---
.../apache/paimon/table/system/TagsTableTest.java | 22 ++++---
.../org/apache/paimon/tag/TagAutoCreationTest.java | 71 +++++++++++++--------
.../flink/sink/BatchWriteGeneratorTagOperator.java | 27 +++++---
.../AutoTagForSavepointCommitterOperatorTest.java | 9 +--
.../sink/BatchWriteGeneratorTagOperatorTest.java | 15 +++++
12 files changed, 220 insertions(+), 80 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 81fb7f6d1..19d6affa4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -50,7 +50,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -146,8 +145,7 @@ public class OrphanFilesClean {
throws IOException, ExecutionException, InterruptedException {
// safely get all snapshots to be read
Set<Snapshot> readSnapshots = new
HashSet<>(snapshotManager.safelyGetAllSnapshots());
- SortedMap<Snapshot, String> tags = tagManager.tags();
- List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+ List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
readSnapshots.addAll(taggedSnapshots);
return FileUtils.COMMON_IO_FORK_JOIN_POOL
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index a1d5333e4..b64178455 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -122,7 +122,7 @@ public class RollbackHelper {
}
private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
- SortedMap<Snapshot, String> tags = tagManager.tags();
+ SortedMap<Snapshot, List<String>> tags = tagManager.tags();
if (tags.isEmpty()) {
return Collections.emptyList();
}
@@ -137,7 +137,7 @@ public class RollbackHelper {
break;
}
toBeCleaned.add(tag);
- fileIO.deleteQuietly(tagManager.tagPath(tags.get(tag)));
+ tags.get(tag).forEach(tagName ->
fileIO.deleteQuietly(tagManager.tagPath(tagName)));
}
// delete data files
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index eecea7d0e..98c3ec3cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -50,6 +50,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -191,9 +192,16 @@ public class TagsTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((TagsSplit) split).location;
- SortedMap<Snapshot, String> tags = new TagManager(fileIO,
location).tags();
+ SortedMap<Snapshot, List<String>> tags = new TagManager(fileIO,
location).tags();
+ Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
+ for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
+ for (String tagName : tag.getValue()) {
+ nameToSnapshot.put(tagName, tag.getKey());
+ }
+ }
+
Iterator<InternalRow> rows =
- Iterators.transform(tags.entrySet().iterator(),
this::toRow);
+ Iterators.transform(nameToSnapshot.entrySet().iterator(),
this::toRow);
if (projection != null) {
rows =
Iterators.transform(
@@ -202,10 +210,10 @@ public class TagsTable implements ReadonlyTable {
return new IteratorRecordReader<>(rows);
}
- private InternalRow toRow(Map.Entry<Snapshot, String> tag) {
- Snapshot snapshot = tag.getKey();
+ private InternalRow toRow(Map.Entry<String, Snapshot> tag) {
+ Snapshot snapshot = tag.getValue();
return GenericRow.of(
- BinaryString.fromString(tag.getValue()),
+ BinaryString.fromString(tag.getKey()),
snapshot.id(),
snapshot.schemaId(),
Timestamp.fromLocalDateTime(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 3b15d4891..27a8413bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -36,6 +36,7 @@ import java.util.SortedMap;
import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
import static
org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
/** A manager to create tags automatically. */
public class TagAutoCreation {
@@ -72,7 +73,7 @@ public class TagAutoCreation {
this.periodHandler.validateDelay(delay);
- SortedMap<Snapshot, String> tags = tagManager.tags(this::isAutoTag);
+ SortedMap<Snapshot, List<String>> tags =
tagManager.tags(periodHandler::isAutoTag);
if (tags.isEmpty()) {
this.nextSnapshot =
@@ -81,20 +82,12 @@ public class TagAutoCreation {
Snapshot lastTag = tags.lastKey();
this.nextSnapshot = lastTag.id() + 1;
- LocalDateTime time = periodHandler.tagToTime(tags.get(lastTag));
+ String tagName = checkAndGetOneAutoTag(tags.get(lastTag));
+ LocalDateTime time = periodHandler.tagToTime(tagName);
this.nextTag = periodHandler.nextTagTime(time);
}
}
- private boolean isAutoTag(String tag) {
- try {
- periodHandler.tagToTime(tag);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
public boolean forceCreatingSnapshot() {
return timeExtractor instanceof ProcessTimeExtractor
&& (nextTag == null
@@ -136,12 +129,14 @@ public class TagAutoCreation {
nextTag = periodHandler.nextTagTime(thisTag);
if (numRetainedMax != null) {
- SortedMap<Snapshot, String> tags =
tagManager.tags(this::isAutoTag);
+ // only handle auto-created tags here
+ SortedMap<Snapshot, List<String>> tags =
tagManager.tags(periodHandler::isAutoTag);
if (tags.size() > numRetainedMax) {
int toDelete = tags.size() - numRetainedMax;
int i = 0;
- for (String tag : tags.values()) {
- tagManager.deleteTag(tag, tagDeletion,
snapshotManager);
+ for (List<String> tag : tags.values()) {
+ tagManager.deleteTag(
+ checkAndGetOneAutoTag(tag), tagDeletion,
snapshotManager);
i++;
if (i == toDelete) {
break;
@@ -156,6 +151,15 @@ public class TagAutoCreation {
return t1.isAfter(t2) || t1.isEqual(t2);
}
+ public static String checkAndGetOneAutoTag(List<String> autoTags) {
+ checkState(
+ autoTags.size() == 1,
+ "There are more than 1 auto-created tags of the same snapshot:
%s. This is unexpected.",
+ String.join(",", autoTags));
+
+ return autoTags.get(0);
+ }
+
@Nullable
public static TagAutoCreation create(
CoreOptions options,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index fa18eebea..c0fbe718c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -88,6 +88,8 @@ public interface TagPeriodHandler {
LocalDateTime nextTagTime(LocalDateTime time);
+ boolean isAutoTag(String tagName);
+
/** Base implementation of {@link TagPeriodHandler}. */
abstract class BaseTagPeriodHandler implements TagPeriodHandler {
@@ -124,6 +126,16 @@ public interface TagPeriodHandler {
public LocalDateTime nextTagTime(LocalDateTime time) {
return time.plus(onePeriod());
}
+
+ @Override
+ public boolean isAutoTag(String tagName) {
+ try {
+ tagToTime(tagName);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
}
/** Hourly {@link TagPeriodHandler}. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
index cd1a3fda1..dcc03bcda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -27,7 +27,9 @@ import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -84,6 +86,7 @@ public class TagPreview {
Optional<String> findTag =
tagManager.tags().values().stream()
+ .map(this::toOneAutoTag)
.filter(t -> t.compareTo(tag) <= 0)
.max(Comparator.naturalOrder());
if (findTag.isPresent()) {
@@ -92,4 +95,14 @@ public class TagPreview {
throw new RuntimeException("Cannot find snapshot or tag for tag name:
" + tag);
}
+
+ private String toOneAutoTag(List<String> tags) {
+ List<String> autoTags = new ArrayList<>();
+ for (String tag : tags) {
+ if (periodHandler.isAutoTag(tag)) {
+ autoTags.add(tag);
+ }
+ }
+ return TagAutoCreation.checkAndGetOneAutoTag(autoTags);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index bebde6ce3..a29a3e151 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -19,6 +19,7 @@
package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
@@ -98,6 +100,25 @@ public class TagManager {
}
}
+ /** Make sure the tagNames are ALL tags of one snapshot. */
+ public void deleteAllTagsOfOneSnapshot(
+ List<String> tagNames, TagDeletion tagDeletion, SnapshotManager
snapshotManager) {
+ Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0));
+ List<Snapshot> taggedSnapshots;
+
+ // skip file deletion if snapshot exists
+ if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
+ tagNames.forEach(tagName ->
fileIO.deleteQuietly(tagPath(tagName)));
+ return;
+ } else {
+ // FileIO discovers tags by tag file, so we should read all tags
before we delete tag
+ taggedSnapshots = taggedSnapshots();
+ tagNames.forEach(tagName ->
fileIO.deleteQuietly(tagPath(tagName)));
+ }
+
+ doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
+ }
+
public void deleteTag(
String tagName, TagDeletion tagDeletion, SnapshotManager
snapshotManager) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
@@ -112,10 +133,24 @@ public class TagManager {
return;
} else {
// FileIO discovers tags by tag file, so we should read all tags
before we delete tag
- taggedSnapshots = taggedSnapshots();
+ SortedMap<Snapshot, List<String>> tags = tags();
fileIO.deleteQuietly(tagPath(tagName));
+
+ // skip data file clean if more than 1 tags are created based on
this snapshot
+ if (tags.get(taggedSnapshot).size() > 1) {
+ return;
+ }
+ taggedSnapshots = new ArrayList<>(tags.keySet());
}
+ doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
+ }
+
+ private void doClean(
+ Snapshot taggedSnapshot,
+ List<Snapshot> taggedSnapshots,
+ SnapshotManager snapshotManager,
+ TagDeletion tagDeletion) {
// collect skipping sets from the left neighbor tag and the nearest
right neighbor (either
// the earliest snapshot or right neighbor tag)
List<Snapshot> skippedSnapshots = new ArrayList<>();
@@ -141,8 +176,8 @@ public class TagManager {
} catch (Exception e) {
LOG.info(
String.format(
- "Skip cleaning data files of tag '%s' due to
failed to build skipping set.",
- tagName),
+ "Skip cleaning data files for tag of snapshot %s
due to failed to build skipping set.",
+ taggedSnapshot.id()),
e);
success = false;
}
@@ -189,7 +224,7 @@ public class TagManager {
}
/** Get all tagged snapshots with names sorted by snapshot id. */
- public SortedMap<Snapshot, String> tags() {
+ public SortedMap<Snapshot, List<String>> tags() {
return tags(tagName -> true);
}
@@ -204,8 +239,9 @@ public class TagManager {
* name.
* @throws RuntimeException if an IOException occurs during retrieval of
snapshots.
*/
- public SortedMap<Snapshot, String> tags(Predicate<String> filter) {
- TreeMap<Snapshot, String> tags = new
TreeMap<>(Comparator.comparingLong(Snapshot::id));
+ public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
+ TreeMap<Snapshot, List<String>> tags =
+ new TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
List<Path> paths =
listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
@@ -221,7 +257,10 @@ public class TagManager {
// If the tag file is not found, it might be deleted by
// other processes, so just skip this tag
Snapshot.safelyFromPath(fileIO, path)
- .ifPresent(snapshot -> tags.put(snapshot, tagName));
+ .ifPresent(
+ snapshot ->
+ tags.computeIfAbsent(snapshot, s ->
new ArrayList<>())
+ .add(tagName));
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -229,6 +268,26 @@ public class TagManager {
return tags;
}
+ public List<String> sortTagsOfOneSnapshot(List<String> tagNames) {
+ return tagNames.stream()
+ .map(
+ name -> {
+ try {
+ return fileIO.getFileStatus(tagPath(name));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+
.sorted(Comparator.comparingLong(FileStatus::getModificationTime))
+ .map(fileStatus ->
fileStatus.getPath().getName().substring(TAG_PREFIX.length()))
+ .collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ public List<String> allTagNames() {
+ return
tags().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ }
+
private int findIndex(Snapshot taggedSnapshot, List<Snapshot>
taggedSnapshots) {
for (int i = 0; i < taggedSnapshots.size(); i++) {
if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 55ba3648b..c6df41841 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -78,6 +78,7 @@ public class TagsTableTest extends TableTestBase {
.getMillisecond()));
tagsTable = (TagsTable) catalog.getTable(identifier(tableName +
"$tags"));
tagManager = table.store().newTagManager();
+ table.createTag("many-tags-test");
}
@Test
@@ -89,17 +90,18 @@ public class TagsTableTest extends TableTestBase {
private List<InternalRow> getExceptedResult() {
List<InternalRow> internalRows = new ArrayList<>();
- for (Map.Entry<Snapshot, String> tag : tagManager.tags().entrySet()) {
+ for (Map.Entry<Snapshot, List<String>> tag :
tagManager.tags().entrySet()) {
Snapshot snapshot = tag.getKey();
-
- internalRows.add(
- GenericRow.of(
- BinaryString.fromString(tag.getValue()),
- snapshot.id(),
- snapshot.schemaId(),
- Timestamp.fromLocalDateTime(
-
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
- snapshot.totalRecordCount()));
+ for (String tagName : tag.getValue()) {
+ internalRows.add(
+ GenericRow.of(
+ BinaryString.fromString(tagName),
+ snapshot.id(),
+ snapshot.schemaId(),
+ Timestamp.fromLocalDateTime(
+
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
+ snapshot.totalRecordCount()));
+ }
}
return internalRows;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 9b0233ef4..4d5c3ebce 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -59,20 +59,20 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test normal creation
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
// test not creation
commit.commit(new ManifestCommittable(1,
utcMills("2023-07-18T12:59:00")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
// test just in time
commit.commit(new ManifestCommittable(2,
utcMills("2023-07-18T13:00:00")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11",
"2023-07-18 12");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11",
"2023-07-18 12");
// test expire old tag
commit.commit(new ManifestCommittable(3,
utcMills("2023-07-18T14:00:00")));
commit.commit(new ManifestCommittable(4,
utcMills("2023-07-18T15:00:00")));
- assertThat(tagManager.tags().values())
+ assertThat(tagManager.allTagNames())
.containsOnly("2023-07-18 12", "2023-07-18 13", "2023-07-18
14");
// test restore with snapshot expiration
@@ -92,7 +92,7 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// check tags
commit.commit(new ManifestCommittable(9,
utcMills("2023-07-18T16:00:00")));
- assertThat(tagManager.tags().values())
+ assertThat(tagManager.allTagNames())
.containsOnly("2023-07-18 13", "2023-07-18 14", "2023-07-18
15");
commit.close();
}
@@ -109,15 +109,15 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test first create tag anyway
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:09")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
// test not create due to delay
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:00:09")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
// test create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:00:10")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11",
"2023-07-18 12");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11",
"2023-07-18 12");
commit.close();
}
@@ -133,11 +133,11 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test first create
commit.commit(new ManifestCommittable(0,
localZoneMills("2023-07-18T12:00:09")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
// test second create
commit.commit(new ManifestCommittable(0,
localZoneMills("2023-07-18T13:00:10")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11",
"2023-07-18 12");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11",
"2023-07-18 12");
commit.close();
}
@@ -152,15 +152,15 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test first create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:01")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 10");
// test no create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:00:01")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 10");
// test second create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T14:00:09")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10",
"2023-07-18 12");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 10",
"2023-07-18 12");
commit.close();
}
@@ -175,18 +175,17 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test first create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:01")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-17");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-17");
// test second create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-19T12:00:01")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-17",
"2023-07-18");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-17",
"2023-07-18");
// test newCommit create
commit.close();
commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-20T12:00:01")));
- assertThat(tagManager.tags().values())
- .containsOnly("2023-07-17", "2023-07-18", "2023-07-19");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-17",
"2023-07-18", "2023-07-19");
commit.close();
}
@@ -205,7 +204,7 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test first create
commit.commit(new ManifestCommittable(0,
localZoneMills("2023-07-18T12:00:09")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
table = table.copy(options.toMap());
@@ -214,7 +213,7 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test newCommit create
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-20T12:00:01")));
- assertThat(tagManager.tags().values()).contains("2023-07-18 11",
"2023-07-19");
+ assertThat(tagManager.allTagNames()).contains("2023-07-18 11",
"2023-07-19");
}
@Test
@@ -233,18 +232,20 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
// test normal creation
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
- assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
table.createTag("savepoint-11", 1);
// test newCommit create
commit.commit(new ManifestCommittable(1,
utcMills("2023-07-18T14:00:00")));
- assertThat(tagManager.tags().values()).contains("2023-07-18 11",
"2023-07-18 13");
+ assertThat(tagManager.allTagNames())
+ .containsOnly("savepoint-11", "2023-07-18 11", "2023-07-18
13");
// test expire old tag
commit.commit(new ManifestCommittable(2,
utcMills("2023-07-18T15:00:00")));
commit.commit(new ManifestCommittable(3,
utcMills("2023-07-18T16:00:00")));
- assertThat(tagManager.tags().values())
+ // only handle auto-created tags
+ assertThat(tagManager.allTagNames())
.containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18
14", "2023-07-18 15");
}
@@ -259,10 +260,10 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
TagManager tagManager = table.store().newTagManager();
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
- assertThat(tagManager.tags().values()).containsOnly("20230717");
+ assertThat(tagManager.allTagNames()).containsOnly("20230717");
commit.commit(new ManifestCommittable(1,
utcMills("2023-07-19T12:12:00")));
- assertThat(tagManager.tags().values()).contains("20230717",
"20230718");
+ assertThat(tagManager.allTagNames()).contains("20230717", "20230718");
}
@Test
@@ -276,10 +277,28 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
TagManager tagManager = table.store().newTagManager();
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
- assertThat(tagManager.tags().values()).containsOnly("20230718 11");
+ assertThat(tagManager.allTagNames()).containsOnly("20230718 11");
commit.commit(new ManifestCommittable(1,
utcMills("2023-07-18T13:13:00")));
- assertThat(tagManager.tags().values()).contains("20230718 11",
"20230718 12");
+ assertThat(tagManager.allTagNames()).contains("20230718 11", "20230718
12");
+ }
+
+ @Test
+ public void testOnlyExpireAutoCreatedTag() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_NUM_RETAINED_MAX, 1);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ table.createTag("many-tags-test");
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11",
"many-tags-test");
+
+ commit.commit(new ManifestCommittable(1,
utcMills("2023-07-18T13:13:00")));
+ assertThat(tagManager.allTagNames()).contains("2023-07-18 12",
"many-tags-test");
}
private long localZoneMills(String timestamp) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index 36ae32d15..d65ab7414 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -46,7 +46,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
-import java.util.SortedMap;
+import java.util.List;
/**
* Commit {@link Committable} for snapshot using the {@link
CommitterOperator}. When the task is
@@ -136,14 +136,23 @@ public class BatchWriteGeneratorTagOperator<CommitT,
GlobalCommitT>
}
TagManager tagManager = table.tagManager();
TagDeletion tagDeletion = table.store().newTagDeletion();
- SortedMap<Snapshot, String> tags = tagManager.tags();
- if (tags.size() > tagNumRetainedMax) {
- int toDelete = tags.size() - tagNumRetainedMax;
- int i = 0;
- for (String tag : tags.values()) {
- tagManager.deleteTag(tag, tagDeletion, snapshotManager);
- i++;
- if (i == toDelete) {
+ long tagCount = tagManager.tagCount();
+
+ while (tagCount > tagNumRetainedMax) {
+ for (List<String> tagNames : tagManager.tags().values()) {
+ if (tagCount - tagNames.size() >= tagNumRetainedMax) {
+ tagManager.deleteAllTagsOfOneSnapshot(
+ tagNames, tagDeletion, snapshotManager);
+ tagCount = tagCount - tagNames.size();
+ } else {
+ List<String> sortedTagNames =
tagManager.sortTagsOfOneSnapshot(tagNames);
+ for (String toBeDeleted : sortedTagNames) {
+ tagManager.deleteTag(toBeDeleted, tagDeletion,
snapshotManager);
+ tagCount--;
+ if (tagCount == tagNumRetainedMax) {
+ break;
+ }
+ }
break;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index 880e052c7..0dbde0579 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -36,6 +36,7 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -88,10 +89,10 @@ public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorT
Snapshot snapshot = table.snapshotManager().snapshot(2);
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(2);
- Map<Snapshot, String> tags = table.tagManager().tags();
+ Map<Snapshot, List<String>> tags = table.tagManager().tags();
assertThat(tags).containsOnlyKeys(snapshot);
assertThat(tags.get(snapshot))
-
.isEqualTo(AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + 2);
+
.containsOnly(AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + 2);
}
@Test
@@ -139,10 +140,10 @@ public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorT
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(checkpointId);
- Map<Snapshot, String> tags = table.tagManager().tags();
+ Map<Snapshot, List<String>> tags = table.tagManager().tags();
assertThat(tags).containsOnlyKeys(snapshot);
assertThat(tags.get(snapshot))
- .isEqualTo(
+ .containsOnly(
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index 894410dae..f020f65bd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -87,6 +87,21 @@ public class BatchWriteGeneratorTagOperatorTest extends
CommitterOperatorTest {
assertThat(table.tagManager().tagCount()).isEqualTo(1);
// The tag is consistent with the latest snapshot
assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot());
+
+ // test tag expiration
+ table.createTag("many-tags-test1");
+ Thread.sleep(1_000);
+ table.createTag("many-tags-test2");
+ assertThat(tagManager.tagCount()).isEqualTo(3);
+
+ write.write(GenericRow.of(2, 20L));
+ tableCommit = table.newCommit(initialCommitUser);
+ tableCommit.commit(write.prepareCommit(false, 2));
+ // note that this tag has the same name with previous tag
+ // so the previous tag will be deleted
+ committerOperator.finish();
+
+ assertThat(tagManager.allTagNames()).containsOnly("many-tags-test2",
tagName);
}
@Override