This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 87299db79 [core] Optimize `schemas` system table when specifying 
schema id (#4100)
87299db79 is described below

commit 87299db799aae3680e27740380de0c0c75ff9f1d
Author: Yubin Li <[email protected]>
AuthorDate: Thu Aug 29 21:55:13 2024 +0800

    [core] Optimize `schemas` system table when specifying schema id (#4100)
---
 .../apache/paimon/table/system/SchemasTable.java   | 45 ++++++++++++++++++----
 .../apache/paimon/flink/CatalogTableITCase.java    | 11 ++++++
 2 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index 10313a1c7..b79b75edd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
@@ -50,10 +53,13 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import javax.annotation.Nullable;
+
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -130,15 +136,23 @@ public class SchemasTable implements ReadonlyTable {
     }
 
     private class SchemasScan extends ReadOnceTableScan {
+        private @Nullable LeafPredicate schemaId;
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
+            if (predicate == null) {
+                return this;
+            }
+
+            Map<String, LeafPredicate> leafPredicates =
+                    predicate.visit(LeafPredicateExtractor.INSTANCE);
+            schemaId = leafPredicates.get("schema_id");
             return this;
         }
 
         @Override
         public Plan innerPlan() {
-            return () -> Collections.singletonList(new SchemasSplit(location));
+            return () -> Collections.singletonList(new SchemasSplit(location, 
schemaId));
         }
     }
 
@@ -149,8 +163,11 @@ public class SchemasTable implements ReadonlyTable {
 
         private final Path location;
 
-        private SchemasSplit(Path location) {
+        private final @Nullable LeafPredicate schemaId;
+
+        private SchemasSplit(Path location, @Nullable LeafPredicate schemaId) {
             this.location = location;
+            this.schemaId = schemaId;
         }
 
         public boolean equals(Object o) {
@@ -161,12 +178,13 @@ public class SchemasTable implements ReadonlyTable {
                 return false;
             }
             SchemasSplit that = (SchemasSplit) o;
-            return Objects.equals(location, that.location);
+            return Objects.equals(location, that.location)
+                    && Objects.equals(schemaId, that.schemaId);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(location);
+            return Objects.hash(location, schemaId);
         }
     }
 
@@ -201,10 +219,21 @@ public class SchemasTable implements ReadonlyTable {
             if (!(split instanceof SchemasSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            Path location = ((SchemasSplit) split).location;
-            Iterator<TableSchema> schemas =
-                    new SchemaManager(fileIO, location, 
branch).listAll().iterator();
-            Iterator<InternalRow> rows = Iterators.transform(schemas, 
this::toRow);
+            SchemasSplit schemasSplit = (SchemasSplit) split;
+            LeafPredicate predicate = schemasSplit.schemaId;
+            Path location = schemasSplit.location;
+            SchemaManager manager = new SchemaManager(fileIO, location, 
branch);
+
+            Collection<TableSchema> tableSchemas = Collections.emptyList();
+            if (predicate != null && predicate.function() instanceof Equal) {
+                Object equalValue = predicate.literals().get(0);
+                if (equalValue instanceof Long) {
+                    tableSchemas = 
Collections.singletonList(manager.schema((Long) equalValue));
+                }
+            } else {
+                tableSchemas = manager.listAll();
+            }
+            Iterator<InternalRow> rows = 
Iterators.transform(tableSchemas.iterator(), this::toRow);
             if (projection != null) {
                 rows =
                         Iterators.transform(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b8f3897dd..e1bbcbcf2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -246,6 +246,17 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                 + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
                                 + 
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
                                 + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\"}, ]]");
+
+        result =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
T$schemas where schema_id = 0");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},"
+                                + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+                                + 
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ]]");
     }
 
     @Test

Reply via email to