This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 56197f215 [core] Fix value filter pushdown in KeyValueFileStoreScan 
(#3995)
56197f215 is described below

commit 56197f2151fa728f7d71762061658c646efb16f6
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 20 15:42:08 2024 +0800

    [core] Fix value filter pushdown in KeyValueFileStoreScan (#3995)
    
    This closes #3995.
---
 .../java/org/apache/paimon/KeyValueFileStore.java  |  3 +-
 .../paimon/operation/AbstractFileStoreScan.java    |  2 +-
 .../paimon/operation/KeyValueFileStoreScan.java    | 31 ++++++++++++++---
 .../paimon/table/source/StreamTableScanTest.java   | 39 +++++++++++++++++++---
 4 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 26341d045..383b2458d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -235,7 +235,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 forWrite,
                 options.scanManifestParallelism(),
                 options.deletionVectorsEnabled(),
-                options.mergeEngine());
+                options.mergeEngine(),
+                options.changelogProducer());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 485e6a6a6..cfd1a4bf8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -86,7 +86,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Snapshot specifiedSnapshot = null;
     private Filter<Integer> bucketFilter = null;
     private List<ManifestFileMeta> specifiedManifests = null;
-    private ScanMode scanMode = ScanMode.ALL;
+    protected ScanMode scanMode = ScanMode.ALL;
     private Filter<Integer> levelFilter = null;
     private Long dataFileTimeMills = null;
     private Filter<String> fileNameFilter = null;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index ab19cc49f..e569a0a30 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -30,6 +31,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
 import org.apache.paimon.stats.SimpleStatsConverters;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -49,6 +51,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     private Predicate valueFilter;
     private final boolean deletionVectorsEnabled;
     private final MergeEngine mergeEngine;
+    private final ChangelogProducer changelogProducer;
 
     public KeyValueFileStoreScan(
             RowType partitionType,
@@ -63,7 +66,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             boolean checkNumOfBuckets,
             Integer scanManifestParallelism,
             boolean deletionVectorsEnabled,
-            MergeEngine mergeEngine) {
+            MergeEngine mergeEngine,
+            ChangelogProducer changelogProducer) {
         super(
                 partitionType,
                 bucketFilter,
@@ -85,6 +89,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                         schema.id());
         this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.mergeEngine = mergeEngine;
+        this.changelogProducer = changelogProducer;
     }
 
     public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -104,9 +109,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         Predicate filter = null;
         SimpleStatsConverter serializer = null;
         SimpleStats stats = null;
-        if ((deletionVectorsEnabled || mergeEngine == FIRST_ROW)
-                && entry.level() > 0
-                && valueFilter != null) {
+        if (isValueFilterEnabled(entry)) {
             filter = valueFilter;
             serializer = 
fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
             stats = entry.file().valueStats();
@@ -129,10 +132,28 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 serializer.evolution(stats.nullCounts(), 
entry.file().rowCount()));
     }
 
+    private boolean isValueFilterEnabled(ManifestEntry entry) {
+        if (valueFilter == null) {
+            return false;
+        }
+
+        switch (scanMode) {
+            case ALL:
+                return (deletionVectorsEnabled || mergeEngine == FIRST_ROW) && 
entry.level() > 0;
+            case DELTA:
+                return false;
+            case CHANGELOG:
+                return changelogProducer == ChangelogProducer.LOOKUP
+                        || changelogProducer == 
ChangelogProducer.FULL_COMPACTION;
+            default:
+                throw new UnsupportedOperationException("Unsupported scan 
mode: " + scanMode);
+        }
+    }
+
     /** Note: Keep this thread-safe. */
     @Override
     protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
-        if (valueFilter == null) {
+        if (valueFilter == null || scanMode != ScanMode.ALL) {
             return entries;
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
index 8456ba2ca..44c29ff30 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.StreamTableCommit;
@@ -49,7 +50,11 @@ public class StreamTableScanTest extends ScannerTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
-        StreamTableScan scan = table.newStreamScan();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(readBuilder.readType());
+        readBuilder.withFilter(predicateBuilder.lessOrEqual(2, 300L));
+        StreamTableScan scan = readBuilder.newStreamScan();
 
         // first call without any snapshot, should return empty plan
         assertThat(scan.plan().splits()).isEmpty();
@@ -85,6 +90,9 @@ public class StreamTableScanTest extends ScannerTestBase {
         write.write(rowData(1, 50, 500L));
         commit.commit(3, write.prepareCommit(true, 3));
 
+        write.write(rowData(1, 60, 600L));
+        commit.commit(4, write.prepareCommit(true, 4));
+
         // first incremental call, should return incremental records from 3rd 
commit
         plan = scan.plan();
         assertThat(getResult(read, plan.splits()))
@@ -95,6 +103,11 @@ public class StreamTableScanTest extends ScannerTestBase {
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
 
+        // test value filter not affect to incremental scan
+        plan = scan.plan();
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Collections.singletonList("+I 1|60|600"));
+
         // no more new snapshots, should return empty plan
         assertThat(scan.plan().splits()).isEmpty();
 
@@ -111,7 +124,11 @@ public class StreamTableScanTest extends ScannerTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
-        StreamTableScan scan = table.newStreamScan();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(readBuilder.readType());
+        readBuilder.withFilter(predicateBuilder.lessOrEqual(2, 300L));
+        StreamTableScan scan = readBuilder.newStreamScan();
 
         // first call without any snapshot, should return empty plan
         assertThat(scan.plan().splits()).isEmpty();
@@ -156,8 +173,7 @@ public class StreamTableScanTest extends ScannerTestBase {
         commit.commit(4, write.prepareCommit(true, 4));
 
         // full compaction done, read new changelog
-        plan = scan.plan();
-        assertThat(getResult(read, plan.splits()))
+        assertThat(getResult(read, scan.plan().splits()))
                 .hasSameElementsAs(
                         Arrays.asList(
                                 "-U 1|10|101",
@@ -166,6 +182,21 @@ public class StreamTableScanTest extends ScannerTestBase {
                                 "+U 1|20|201",
                                 "+I 1|50|500"));
 
+        // clear all records
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 0L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 0L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 50, 0L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(5, write.prepareCommit(true, 5));
+        assertThat(getResult(read, scan.plan().splits()))
+                .hasSameElementsAs(Arrays.asList("-D 1|10|103", "-D 1|20|201", 
"-D 1|50|500"));
+
+        // test value filter to changelog
+        write.write(rowData(1, 60, 600L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(6, write.prepareCommit(true, 6));
+        assertThat(getResult(read, scan.plan().splits())).isEmpty();
+
         // no more new snapshots, should return empty plan
         assertThat(scan.plan().splits()).isEmpty();
 

Reply via email to