This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fed70b0f7 [core] added greater and less than query optimization to 
snapshots table queries (#3396)
fed70b0f7 is described below

commit fed70b0f78ff9ed1c915b977c8b391bc4c8a84b4
Author: ctrlaltdilj <[email protected]>
AuthorDate: Mon Jun 3 22:04:47 2024 -0400

    [core] added greater and less than query optimization to snapshots table 
queries (#3396)
---
 .../apache/paimon/table/system/SnapshotsTable.java | 47 +++++++++++++++++-----
 .../org/apache/paimon/utils/SnapshotManager.java   | 27 +++++++++++++
 .../apache/paimon/flink/CatalogTableITCase.java    | 20 +++++++++
 3 files changed, 85 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 991129038..ca7bc925d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -27,8 +27,12 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.LessOrEqual;
+import org.apache.paimon.predicate.LessThan;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -51,8 +55,6 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -63,6 +65,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -188,7 +191,8 @@ public class SnapshotsTable implements ReadonlyTable {
 
         private final FileIO fileIO;
         private int[][] projection;
-        @Nullable private Long specificSnapshot;
+        private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty();
+        private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty();
 
         public SnapshotsRead(FileIO fileIO) {
             this.fileIO = fileIO;
@@ -202,9 +206,35 @@ public class SnapshotsTable implements ReadonlyTable {
 
             LeafPredicate snapshotPred =
                     
predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id");
-            if (snapshotPred != null && snapshotPred.function() instanceof 
Equal) {
-                specificSnapshot = (Long) snapshotPred.literals().get(0);
+            if (snapshotPred != null) {
+                if (snapshotPred.function() instanceof Equal) {
+                    optionalFilterSnapshotIdMin =
+                            Optional.of((Long) snapshotPred.literals().get(0));
+                    optionalFilterSnapshotIdMax =
+                            Optional.of((Long) snapshotPred.literals().get(0));
+                }
+
+                if (snapshotPred.function() instanceof GreaterThan) {
+                    optionalFilterSnapshotIdMin =
+                            Optional.of((Long) snapshotPred.literals().get(0) 
+ 1);
+                }
+
+                if (snapshotPred.function() instanceof GreaterOrEqual) {
+                    optionalFilterSnapshotIdMin =
+                            Optional.of((Long) snapshotPred.literals().get(0));
+                }
+
+                if (snapshotPred.function() instanceof LessThan) {
+                    optionalFilterSnapshotIdMax =
+                            Optional.of((Long) snapshotPred.literals().get(0) 
- 1);
+                }
+
+                if (snapshotPred.function() instanceof LessOrEqual) {
+                    optionalFilterSnapshotIdMax =
+                            Optional.of((Long) snapshotPred.literals().get(0));
+                }
             }
+
             return this;
         }
 
@@ -227,10 +257,9 @@ public class SnapshotsTable implements ReadonlyTable {
             SnapshotManager snapshotManager =
                     new SnapshotManager(fileIO, ((SnapshotsSplit) 
split).location);
             Iterator<Snapshot> snapshots =
-                    specificSnapshot != null
-                            ? 
Collections.singletonList(snapshotManager.snapshot(specificSnapshot))
-                                    .iterator()
-                            : snapshotManager.snapshots();
+                    snapshotManager.snapshotsWithinRange(
+                            optionalFilterSnapshotIdMax, 
optionalFilterSnapshotIdMin);
+
             Iterator<InternalRow> rows = Iterators.transform(snapshots, 
this::toRow);
             if (projection != null) {
                 rows =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9813debe4..93442fd7c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -44,6 +44,7 @@ import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.getBranchPath;
@@ -355,6 +356,32 @@ public class SnapshotManager implements Serializable {
                 .iterator();
     }
 
+    public Iterator<Snapshot> snapshotsWithinRange(
+            Optional<Long> optionalMaxSnapshotId, Optional<Long> 
optionalMinSnapshotId)
+            throws IOException {
+        Long lowerBoundSnapshotId = earliestSnapshotId();
+        Long upperBoundSnapshotId = latestSnapshotId();
+
+        // null check on lowerBoundSnapshotId & upperBoundSnapshotId
+        if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) {
+            return Collections.emptyIterator();
+        }
+
+        if (optionalMaxSnapshotId.isPresent()) {
+            upperBoundSnapshotId = optionalMaxSnapshotId.get();
+        }
+
+        if (optionalMinSnapshotId.isPresent()) {
+            lowerBoundSnapshotId = optionalMinSnapshotId.get();
+        }
+
+        // +1 here to include the upperBoundSnapshotId
+        return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1)
+                .mapToObj(this::snapshot)
+                .sorted(Comparator.comparingLong(Snapshot::id))
+                .iterator();
+    }
+
     public Iterator<Changelog> changelogs() throws IOException {
         return listVersionedFiles(fileIO, changelogDirectory(), 
CHANGELOG_PREFIX)
                 .map(snapshotId -> changelog(snapshotId))
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index a94d91af6..2017ae4ba 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -75,6 +75,26 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                 sql(
                         "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id = 2");
         assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id > 1");
+        assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id < 2");
+        assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id >= 1");
+        assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, 
"APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id <= 2");
+        assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, 
"APPEND"));
     }
 
     @Test

Reply via email to