This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fed70b0f7 [core] added greater and less than query optimization to
snapshots table queries (#3396)
fed70b0f7 is described below
commit fed70b0f78ff9ed1c915b977c8b391bc4c8a84b4
Author: ctrlaltdilj <[email protected]>
AuthorDate: Mon Jun 3 22:04:47 2024 -0400
[core] added greater and less than query optimization to snapshots table
queries (#3396)
---
.../apache/paimon/table/system/SnapshotsTable.java | 47 +++++++++++++++++-----
.../org/apache/paimon/utils/SnapshotManager.java | 27 +++++++++++++
.../apache/paimon/flink/CatalogTableITCase.java | 20 +++++++++
3 files changed, 85 insertions(+), 9 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 991129038..ca7bc925d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -27,8 +27,12 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.LessOrEqual;
+import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -51,8 +55,6 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -63,6 +65,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -188,7 +191,8 @@ public class SnapshotsTable implements ReadonlyTable {
private final FileIO fileIO;
private int[][] projection;
- @Nullable private Long specificSnapshot;
+ private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty();
+ private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty();
public SnapshotsRead(FileIO fileIO) {
this.fileIO = fileIO;
@@ -202,9 +206,35 @@ public class SnapshotsTable implements ReadonlyTable {
LeafPredicate snapshotPred =
predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id");
- if (snapshotPred != null && snapshotPred.function() instanceof
Equal) {
- specificSnapshot = (Long) snapshotPred.literals().get(0);
+ if (snapshotPred != null) {
+ if (snapshotPred.function() instanceof Equal) {
+ optionalFilterSnapshotIdMin =
+ Optional.of((Long) snapshotPred.literals().get(0));
+ optionalFilterSnapshotIdMax =
+ Optional.of((Long) snapshotPred.literals().get(0));
+ }
+
+ if (snapshotPred.function() instanceof GreaterThan) {
+ optionalFilterSnapshotIdMin =
+ Optional.of((Long) snapshotPred.literals().get(0)
+ 1);
+ }
+
+ if (snapshotPred.function() instanceof GreaterOrEqual) {
+ optionalFilterSnapshotIdMin =
+ Optional.of((Long) snapshotPred.literals().get(0));
+ }
+
+ if (snapshotPred.function() instanceof LessThan) {
+ optionalFilterSnapshotIdMax =
+ Optional.of((Long) snapshotPred.literals().get(0)
- 1);
+ }
+
+ if (snapshotPred.function() instanceof LessOrEqual) {
+ optionalFilterSnapshotIdMax =
+ Optional.of((Long) snapshotPred.literals().get(0));
+ }
}
+
return this;
}
@@ -227,10 +257,9 @@ public class SnapshotsTable implements ReadonlyTable {
SnapshotManager snapshotManager =
new SnapshotManager(fileIO, ((SnapshotsSplit)
split).location);
Iterator<Snapshot> snapshots =
- specificSnapshot != null
- ?
Collections.singletonList(snapshotManager.snapshot(specificSnapshot))
- .iterator()
- : snapshotManager.snapshots();
+ snapshotManager.snapshotsWithinRange(
+ optionalFilterSnapshotIdMax,
optionalFilterSnapshotIdMin);
+
Iterator<InternalRow> rows = Iterators.transform(snapshots,
this::toRow);
if (projection != null) {
rows =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9813debe4..93442fd7c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -44,6 +44,7 @@ import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
@@ -355,6 +356,32 @@ public class SnapshotManager implements Serializable {
.iterator();
}
+ public Iterator<Snapshot> snapshotsWithinRange(
+ Optional<Long> optionalMaxSnapshotId, Optional<Long>
optionalMinSnapshotId)
+ throws IOException {
+ Long lowerBoundSnapshotId = earliestSnapshotId();
+ Long upperBoundSnapshotId = latestSnapshotId();
+
+ // null check on lowerBoundSnapshotId & upperBoundSnapshotId
+ if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) {
+ return Collections.emptyIterator();
+ }
+
+ if (optionalMaxSnapshotId.isPresent()) {
+ upperBoundSnapshotId = optionalMaxSnapshotId.get();
+ }
+
+ if (optionalMinSnapshotId.isPresent()) {
+ lowerBoundSnapshotId = optionalMinSnapshotId.get();
+ }
+
+ // +1 here to include the upperBoundSnapshotId
+ return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1)
+ .mapToObj(this::snapshot)
+ .sorted(Comparator.comparingLong(Snapshot::id))
+ .iterator();
+ }
+
public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(),
CHANGELOG_PREFIX)
.map(snapshotId -> changelog(snapshotId))
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index a94d91af6..2017ae4ba 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -75,6 +75,26 @@ public class CatalogTableITCase extends CatalogITCaseBase {
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM
T$snapshots WHERE snapshot_id = 2");
assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
+
+ result =
+ sql(
+ "SELECT snapshot_id, schema_id, commit_kind FROM
T$snapshots WHERE snapshot_id > 1");
+ assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
+
+ result =
+ sql(
+ "SELECT snapshot_id, schema_id, commit_kind FROM
T$snapshots WHERE snapshot_id < 2");
+ assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"));
+
+ result =
+ sql(
+ "SELECT snapshot_id, schema_id, commit_kind FROM
T$snapshots WHERE snapshot_id >= 1");
+ assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L,
"APPEND"));
+
+ result =
+ sql(
+ "SELECT snapshot_id, schema_id, commit_kind FROM
T$snapshots WHERE snapshot_id <= 2");
+ assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L,
"APPEND"));
}
@Test