This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 9e60f8eb4645a63d65974172341838f0088bf063 Author: ctrlaltdilj <[email protected]> AuthorDate: Mon Jun 3 22:04:47 2024 -0400 [core] added greater and less than query optimization to snapshots table queries (#3396) --- .../apache/paimon/table/system/SnapshotsTable.java | 47 +++++++++++++++++----- .../org/apache/paimon/utils/SnapshotManager.java | 27 +++++++++++++ .../apache/paimon/flink/CatalogTableITCase.java | 20 +++++++++ 3 files changed, 85 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 991129038..ca7bc925d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -27,8 +27,12 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -51,8 +55,6 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import javax.annotation.Nullable; - import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; @@ -63,6 +65,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -188,7 +191,8 @@ public class SnapshotsTable implements ReadonlyTable { private final FileIO fileIO; private int[][] projection; - @Nullable private Long specificSnapshot; + private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty(); + private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty(); public SnapshotsRead(FileIO fileIO) { this.fileIO = fileIO; @@ -202,9 +206,35 @@ public class SnapshotsTable implements ReadonlyTable { LeafPredicate snapshotPred = predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id"); - if (snapshotPred != null && snapshotPred.function() instanceof Equal) { - specificSnapshot = (Long) snapshotPred.literals().get(0); + if (snapshotPred != null) { + if (snapshotPred.function() instanceof Equal) { + optionalFilterSnapshotIdMin = + Optional.of((Long) snapshotPred.literals().get(0)); + optionalFilterSnapshotIdMax = + Optional.of((Long) snapshotPred.literals().get(0)); + } + + if (snapshotPred.function() instanceof GreaterThan) { + optionalFilterSnapshotIdMin = + Optional.of((Long) snapshotPred.literals().get(0) + 1); + } + + if (snapshotPred.function() instanceof GreaterOrEqual) { + optionalFilterSnapshotIdMin = + Optional.of((Long) snapshotPred.literals().get(0)); + } + + if (snapshotPred.function() instanceof LessThan) { + optionalFilterSnapshotIdMax = + Optional.of((Long) snapshotPred.literals().get(0) - 1); + } + + if (snapshotPred.function() instanceof LessOrEqual) { + optionalFilterSnapshotIdMax = + Optional.of((Long) snapshotPred.literals().get(0)); + } } + return this; } @@ -227,10 +257,9 @@ public class SnapshotsTable implements ReadonlyTable { SnapshotManager snapshotManager = new SnapshotManager(fileIO, ((SnapshotsSplit) split).location); Iterator<Snapshot> snapshots = - specificSnapshot != null - ? Collections.singletonList(snapshotManager.snapshot(specificSnapshot)) - .iterator() - : snapshotManager.snapshots(); + snapshotManager.snapshotsWithinRange( + optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin); + Iterator<InternalRow> rows = Iterators.transform(snapshots, this::toRow); if (projection != null) { rows = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index dbbc8fffd..e120bab3c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -44,6 +44,7 @@ import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.LongStream; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; @@ -375,6 +376,32 @@ public class SnapshotManager implements Serializable { .iterator(); } + public Iterator<Snapshot> snapshotsWithinRange( + Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) + throws IOException { + Long lowerBoundSnapshotId = earliestSnapshotId(); + Long upperBoundSnapshotId = latestSnapshotId(); + + // null check on lowerBoundSnapshotId & upperBoundSnapshotId + if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) { + return Collections.emptyIterator(); + } + + if (optionalMaxSnapshotId.isPresent()) { + upperBoundSnapshotId = optionalMaxSnapshotId.get(); + } + + if (optionalMinSnapshotId.isPresent()) { + lowerBoundSnapshotId = optionalMinSnapshotId.get(); + } + + // +1 here to include the upperBoundSnapshotId + return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1) + .mapToObj(this::snapshot) + .sorted(Comparator.comparingLong(Snapshot::id)) + .iterator(); + } + public Iterator<Changelog> changelogs() throws IOException { return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) .map(this::changelog) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index a94d91af6..2017ae4ba 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -75,6 +75,26 @@ public class CatalogTableITCase extends CatalogITCaseBase { sql( "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id = 2"); assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id > 1"); + assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id < 2"); + assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id >= 1"); + assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id <= 2"); + assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); } @Test
