This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87299db79 [core] Optimize `schemas` system table when specifying
schema id (#4100)
87299db79 is described below
commit 87299db799aae3680e27740380de0c0c75ff9f1d
Author: Yubin Li <[email protected]>
AuthorDate: Thu Aug 29 21:55:13 2024 +0800
[core] Optimize `schemas` system table when specifying schema id (#4100)
---
.../apache/paimon/table/system/SchemasTable.java | 45 ++++++++++++++++++----
.../apache/paimon/flink/CatalogTableITCase.java | 11 ++++++
2 files changed, 48 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index 10313a1c7..b79b75edd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
@@ -50,10 +53,13 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import javax.annotation.Nullable;
+
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -130,15 +136,23 @@ public class SchemasTable implements ReadonlyTable {
}
private class SchemasScan extends ReadOnceTableScan {
+ private @Nullable LeafPredicate schemaId;
@Override
public InnerTableScan withFilter(Predicate predicate) {
+ if (predicate == null) {
+ return this;
+ }
+
+ Map<String, LeafPredicate> leafPredicates =
+ predicate.visit(LeafPredicateExtractor.INSTANCE);
+ schemaId = leafPredicates.get("schema_id");
return this;
}
@Override
public Plan innerPlan() {
- return () -> Collections.singletonList(new SchemasSplit(location));
+ return () -> Collections.singletonList(new SchemasSplit(location,
schemaId));
}
}
@@ -149,8 +163,11 @@ public class SchemasTable implements ReadonlyTable {
private final Path location;
- private SchemasSplit(Path location) {
+ private final @Nullable LeafPredicate schemaId;
+
+ private SchemasSplit(Path location, @Nullable LeafPredicate schemaId) {
this.location = location;
+ this.schemaId = schemaId;
}
public boolean equals(Object o) {
@@ -161,12 +178,13 @@ public class SchemasTable implements ReadonlyTable {
return false;
}
SchemasSplit that = (SchemasSplit) o;
- return Objects.equals(location, that.location);
+ return Objects.equals(location, that.location)
+ && Objects.equals(schemaId, that.schemaId);
}
@Override
public int hashCode() {
- return Objects.hash(location);
+ return Objects.hash(location, schemaId);
}
}
@@ -201,10 +219,21 @@ public class SchemasTable implements ReadonlyTable {
if (!(split instanceof SchemasSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- Path location = ((SchemasSplit) split).location;
- Iterator<TableSchema> schemas =
- new SchemaManager(fileIO, location,
branch).listAll().iterator();
- Iterator<InternalRow> rows = Iterators.transform(schemas,
this::toRow);
+ SchemasSplit schemasSplit = (SchemasSplit) split;
+ LeafPredicate predicate = schemasSplit.schemaId;
+ Path location = schemasSplit.location;
+ SchemaManager manager = new SchemaManager(fileIO, location,
branch);
+
+ Collection<TableSchema> tableSchemas = Collections.emptyList();
+ if (predicate != null && predicate.function() instanceof Equal) {
+ Object equalValue = predicate.literals().get(0);
+ if (equalValue instanceof Long) {
+ tableSchemas =
Collections.singletonList(manager.schema((Long) equalValue));
+ }
+ } else {
+ tableSchemas = manager.listAll();
+ }
+ Iterator<InternalRow> rows =
Iterators.transform(tableSchemas.iterator(), this::toRow);
if (projection != null) {
rows =
Iterators.transform(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b8f3897dd..e1bbcbcf2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -246,6 +246,17 @@ public class CatalogTableITCase extends CatalogITCaseBase {
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5
h\",\"b.bb.bbb\":\"val2\"}, ]]");
+
+ result =
+ sql(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
T$schemas where schema_id = 0");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT
NULL\"},"
+ + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ +
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ +
"{\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ]]");
}
@Test