This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 3b6f9ea6f58d763dea017a8e512df70ed7a3586a
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 16 16:23:13 2024 +0800

    [core] Optimize snapshots table when specific snapshot id (#3339)
---
 .../paimon/predicate/LeafPredicateExtractor.java   | 44 ++++++++++++++++++++++
 .../org/apache/paimon/table/system/FilesTable.java | 28 +++++---------
 .../apache/paimon/table/system/SnapshotsTable.java | 42 +++++++++++++--------
 .../apache/paimon/flink/CatalogTableITCase.java    | 10 ++++-
 4 files changed, 89 insertions(+), 35 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
new file mode 100644
index 000000000..a5cf772c0
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Extract leaf predicate for field names. */
+public class LeafPredicateExtractor implements PredicateVisitor<Map<String, 
LeafPredicate>> {
+
+    public static final LeafPredicateExtractor INSTANCE = new 
LeafPredicateExtractor();
+
+    @Override
+    public Map<String, LeafPredicate> visit(LeafPredicate predicate) {
+        return Collections.singletonMap(predicate.fieldName(), predicate);
+    }
+
+    @Override
+    public Map<String, LeafPredicate> visit(CompoundPredicate predicate) {
+        if (predicate.function() instanceof And) {
+            Map<String, LeafPredicate> leafPredicates = new HashMap<>();
+            predicate.children().stream().map(p -> 
p.visit(this)).forEach(leafPredicates::putAll);
+            return leafPredicates;
+        }
+        return Collections.emptyMap();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 0501dab08..1f6eb2e0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -28,8 +28,8 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -162,25 +162,15 @@ public class FilesTable implements ReadonlyTable {
 
         @Override
         public InnerTableScan withFilter(Predicate pushdown) {
-            List<Predicate> predicates = PredicateBuilder.splitAnd(pushdown);
-            for (Predicate predicate : predicates) {
-                if (predicate instanceof LeafPredicate) {
-                    LeafPredicate leaf = (LeafPredicate) predicate;
-                    switch (leaf.fieldName()) {
-                        case "partition":
-                            this.partitionPredicate = leaf;
-                            break;
-                        case "bucket":
-                            this.bucketPredicate = leaf;
-                            break;
-                        case "level":
-                            this.levelPredicate = leaf;
-                            break;
-                        default:
-                            break;
-                    }
-                }
+            if (pushdown == null) {
+                return this;
             }
+
+            Map<String, LeafPredicate> leafPredicates =
+                    pushdown.visit(LeafPredicateExtractor.INSTANCE);
+            this.partitionPredicate = leafPredicates.get("partition");
+            this.bucketPredicate = leafPredicates.get("bucket");
+            this.levelPredicate = leafPredicates.get("level");
             return this;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 8d6b545d9..b8ff9cd8b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -47,6 +50,8 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -140,19 +145,13 @@ public class SnapshotsTable implements ReadonlyTable {
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
-            // TODO
+            // do filter in read
             return this;
         }
 
         @Override
         public Plan innerPlan() {
-            long rowCount;
-            try {
-                rowCount = new SnapshotManager(fileIO, 
location).snapshotCount();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            return () -> Collections.singletonList(new 
SnapshotsSplit(rowCount, location));
+            return () -> Collections.singletonList(new 
SnapshotsSplit(location));
         }
     }
 
@@ -160,17 +159,16 @@ public class SnapshotsTable implements ReadonlyTable {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private SnapshotsSplit(long rowCount, Path location) {
+        private SnapshotsSplit(Path location) {
             this.location = location;
-            this.rowCount = rowCount;
         }
 
         @Override
         public long rowCount() {
-            return rowCount;
+            // dummy 1, just 1 parallelism
+            return 1;
         }
 
         @Override
@@ -195,6 +193,7 @@ public class SnapshotsTable implements ReadonlyTable {
 
         private final FileIO fileIO;
         private int[][] projection;
+        @Nullable private Long specificSnapshot;
 
         public SnapshotsRead(FileIO fileIO) {
             this.fileIO = fileIO;
@@ -202,7 +201,15 @@ public class SnapshotsTable implements ReadonlyTable {
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            if (predicate == null) {
+                return this;
+            }
+
+            LeafPredicate snapshotPred =
+                    
predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id");
+            if (snapshotPred != null && snapshotPred.function() instanceof 
Equal) {
+                specificSnapshot = (Long) snapshotPred.literals().get(0);
+            }
             return this;
         }
 
@@ -222,8 +229,13 @@ public class SnapshotsTable implements ReadonlyTable {
             if (!(split instanceof SnapshotsSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            Path location = ((SnapshotsSplit) split).location;
-            Iterator<Snapshot> snapshots = new SnapshotManager(fileIO, 
location).snapshots();
+            SnapshotManager snapshotManager =
+                    new SnapshotManager(fileIO, ((SnapshotsSplit) 
split).location);
+            Iterator<Snapshot> snapshots =
+                    specificSnapshot != null
+                            ? 
Collections.singletonList(snapshotManager.snapshot(specificSnapshot))
+                                    .iterator()
+                            : snapshotManager.snapshots();
             Iterator<InternalRow> rows = Iterators.transform(snapshots, 
this::toRow);
             if (projection != null) {
                 rows =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 6b167c4e7..a94d91af6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -64,9 +64,17 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         sql("INSERT INTO T VALUES (3, 4)");
 
         List<Row> result = sql("SELECT snapshot_id, schema_id, commit_kind 
FROM T$snapshots");
+        assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), 
Row.of(2L, 0L, "APPEND"));
 
-        // check correctness and sequence snapshots.
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE schema_id = 0");
         assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), 
Row.of(2L, 0L, "APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id = 2");
+        assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
     }
 
     @Test

Reply via email to