This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c574ae357 [core] Optimize tags system table when specifying tag_name
(#4111)
c574ae357 is described below
commit c574ae3571c0e1b6d577af6cdb7f0b144e21e331
Author: herefree <[email protected]>
AuthorDate: Tue Sep 3 14:00:19 2024 +0800
[core] Optimize tags system table when specifying tag_name (#4111)
---
.../org/apache/paimon/table/system/TagsTable.java | 39 ++++++++++++++++++----
.../java/org/apache/paimon/utils/TagManager.java | 5 +++
.../apache/paimon/flink/CatalogTableITCase.java | 5 +++
3 files changed, 43 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index d92876e4c..b9c1c6718 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -51,6 +54,8 @@ import org.apache.paimon.utils.TagManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -129,16 +134,23 @@ public class TagsTable implements ReadonlyTable {
}
private class TagsScan extends ReadOnceTableScan {
+ private @Nullable LeafPredicate tagName;
@Override
public InnerTableScan withFilter(Predicate predicate) {
+ if (predicate == null) {
+ return this;
+ }
// TODO
+ Map<String, LeafPredicate> leafPredicates =
+ predicate.visit(LeafPredicateExtractor.INSTANCE);
+ tagName = leafPredicates.get("tag_name");
return this;
}
@Override
public Plan innerPlan() {
- return () -> Collections.singletonList(new TagsSplit(location));
+ return () -> Collections.singletonList(new TagsSplit(location,
tagName));
}
}
@@ -148,8 +160,11 @@ public class TagsTable implements ReadonlyTable {
private final Path location;
- private TagsSplit(Path location) {
+ private final @Nullable LeafPredicate tagName;
+
+ private TagsSplit(Path location, @Nullable LeafPredicate tagName) {
this.location = location;
+ this.tagName = tagName;
}
@Override
@@ -161,7 +176,7 @@ public class TagsTable implements ReadonlyTable {
return false;
}
TagsSplit that = (TagsSplit) o;
- return Objects.equals(location, that.location);
+ return Objects.equals(location, that.location) &&
Objects.equals(tagName, that.tagName);
}
@Override
@@ -202,10 +217,22 @@ public class TagsTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((TagsSplit) split).location;
- List<Pair<Tag, String>> tags = new TagManager(fileIO, location,
branch).tagObjects();
+ LeafPredicate predicate = ((TagsSplit) split).tagName;
+ TagManager tagManager = new TagManager(fileIO, location, branch);
+
Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
- for (Pair<Tag, String> tag : tags) {
- nameToSnapshot.put(tag.getValue(), tag.getKey());
+
+ if (predicate != null
+ && predicate.function() instanceof Equal
+ && predicate.literals().get(0) instanceof BinaryString) {
+ String equalValue = predicate.literals().get(0).toString();
+ if (tagManager.tagExists(equalValue)) {
+ nameToSnapshot.put(equalValue, tagManager.tag(equalValue));
+ }
+ } else {
+ for (Pair<Tag, String> tag : tagManager.tagObjects()) {
+ nameToSnapshot.put(tag.getValue(), tag.getKey());
+ }
}
Iterator<InternalRow> rows =
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 2833ca33c..84b298a0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -382,4 +382,9 @@ public class TagManager {
"Didn't find tag with snapshot id '%s'.This is
unexpected.",
taggedSnapshot.id()));
}
+
+ /** Read tag for tagName. */
+ public Tag tag(String tagName) {
+ return Tag.fromPath(fileIO, tagPath(tagName));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e1bbcbcf2..3785c3db8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -812,6 +812,11 @@ public class CatalogTableITCase extends CatalogITCaseBase {
"SELECT tag_name, snapshot_id, schema_id, record_count
FROM T$tags ORDER BY tag_name");
assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L),
Row.of("tag2", 2L, 0L, 2L));
+
+ result =
+ sql(
+ "SELECT tag_name, snapshot_id, schema_id, record_count
FROM T$tags where tag_name = 'tag1' ");
+ assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L));
}
@Test