This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c574ae357 [core] Optimize tags system table when specifying tag_name 
(#4111)
c574ae357 is described below

commit c574ae3571c0e1b6d577af6cdb7f0b144e21e331
Author: herefree <[email protected]>
AuthorDate: Tue Sep 3 14:00:19 2024 +0800

    [core] Optimize tags system table when specifying tag_name (#4111)
---
 .../org/apache/paimon/table/system/TagsTable.java  | 39 ++++++++++++++++++----
 .../java/org/apache/paimon/utils/TagManager.java   |  5 +++
 .../apache/paimon/flink/CatalogTableITCase.java    |  5 +++
 3 files changed, 43 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index d92876e4c..b9c1c6718 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -51,6 +54,8 @@ import org.apache.paimon.utils.TagManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -129,16 +134,23 @@ public class TagsTable implements ReadonlyTable {
     }
 
     private class TagsScan extends ReadOnceTableScan {
+        private @Nullable LeafPredicate tagName;
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
+            if (predicate == null) {
+                return this;
+            }
             // TODO
+            Map<String, LeafPredicate> leafPredicates =
+                    predicate.visit(LeafPredicateExtractor.INSTANCE);
+            tagName = leafPredicates.get("tag_name");
             return this;
         }
 
         @Override
         public Plan innerPlan() {
-            return () -> Collections.singletonList(new TagsSplit(location));
+            return () -> Collections.singletonList(new TagsSplit(location, 
tagName));
         }
     }
 
@@ -148,8 +160,11 @@ public class TagsTable implements ReadonlyTable {
 
         private final Path location;
 
-        private TagsSplit(Path location) {
+        private final @Nullable LeafPredicate tagName;
+
+        private TagsSplit(Path location, @Nullable LeafPredicate tagName) {
             this.location = location;
+            this.tagName = tagName;
         }
 
         @Override
@@ -161,7 +176,7 @@ public class TagsTable implements ReadonlyTable {
                 return false;
             }
             TagsSplit that = (TagsSplit) o;
-            return Objects.equals(location, that.location);
+            return Objects.equals(location, that.location) && 
Objects.equals(tagName, that.tagName);
         }
 
         @Override
@@ -202,10 +217,22 @@ public class TagsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((TagsSplit) split).location;
-            List<Pair<Tag, String>> tags = new TagManager(fileIO, location, 
branch).tagObjects();
+            LeafPredicate predicate = ((TagsSplit) split).tagName;
+            TagManager tagManager = new TagManager(fileIO, location, branch);
+
             Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
-            for (Pair<Tag, String> tag : tags) {
-                nameToSnapshot.put(tag.getValue(), tag.getKey());
+
+            if (predicate != null
+                    && predicate.function() instanceof Equal
+                    && predicate.literals().get(0) instanceof BinaryString) {
+                String equalValue = predicate.literals().get(0).toString();
+                if (tagManager.tagExists(equalValue)) {
+                    nameToSnapshot.put(equalValue, tagManager.tag(equalValue));
+                }
+            } else {
+                for (Pair<Tag, String> tag : tagManager.tagObjects()) {
+                    nameToSnapshot.put(tag.getValue(), tag.getKey());
+                }
             }
 
             Iterator<InternalRow> rows =
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 2833ca33c..84b298a0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -382,4 +382,9 @@ public class TagManager {
                         "Didn't find tag with snapshot id '%s'.This is 
unexpected.",
                         taggedSnapshot.id()));
     }
+
+    /** Read tag for tagName. */
+    public Tag tag(String tagName) {
+        return Tag.fromPath(fileIO, tagPath(tagName));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e1bbcbcf2..3785c3db8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -812,6 +812,11 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         "SELECT tag_name, snapshot_id, schema_id, record_count 
FROM T$tags ORDER BY tag_name");
 
         assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), 
Row.of("tag2", 2L, 0L, 2L));
+
+        result =
+                sql(
+                        "SELECT tag_name, snapshot_id, schema_id, record_count 
FROM T$tags where tag_name = 'tag1' ");
+        assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L));
     }
 
     @Test

Reply via email to