This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 56197f215 [core] Fix value filter pushdown in KeyValueFileStoreScan
(#3995)
56197f215 is described below
commit 56197f2151fa728f7d71762061658c646efb16f6
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 20 15:42:08 2024 +0800
[core] Fix value filter pushdown in KeyValueFileStoreScan (#3995)
This closes #3995.
---
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../paimon/operation/AbstractFileStoreScan.java | 2 +-
.../paimon/operation/KeyValueFileStoreScan.java | 31 ++++++++++++++---
.../paimon/table/source/StreamTableScanTest.java | 39 +++++++++++++++++++---
4 files changed, 64 insertions(+), 11 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 26341d045..383b2458d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -235,7 +235,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
forWrite,
options.scanManifestParallelism(),
options.deletionVectorsEnabled(),
- options.mergeEngine());
+ options.mergeEngine(),
+ options.changelogProducer());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 485e6a6a6..cfd1a4bf8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -86,7 +86,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
- private ScanMode scanMode = ScanMode.ALL;
+ protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
private Long dataFileTimeMills = null;
private Filter<String> fileNameFilter = null;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index ab19cc49f..e569a0a30 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.manifest.ManifestEntry;
@@ -30,6 +31,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.stats.SimpleStatsConverters;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
@@ -49,6 +51,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private Predicate valueFilter;
private final boolean deletionVectorsEnabled;
private final MergeEngine mergeEngine;
+ private final ChangelogProducer changelogProducer;
public KeyValueFileStoreScan(
RowType partitionType,
@@ -63,7 +66,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
boolean deletionVectorsEnabled,
- MergeEngine mergeEngine) {
+ MergeEngine mergeEngine,
+ ChangelogProducer changelogProducer) {
super(
partitionType,
bucketFilter,
@@ -85,6 +89,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
schema.id());
this.deletionVectorsEnabled = deletionVectorsEnabled;
this.mergeEngine = mergeEngine;
+ this.changelogProducer = changelogProducer;
}
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -104,9 +109,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
Predicate filter = null;
SimpleStatsConverter serializer = null;
SimpleStats stats = null;
- if ((deletionVectorsEnabled || mergeEngine == FIRST_ROW)
- && entry.level() > 0
- && valueFilter != null) {
+ if (isValueFilterEnabled(entry)) {
filter = valueFilter;
serializer =
fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
stats = entry.file().valueStats();
@@ -129,10 +132,28 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
serializer.evolution(stats.nullCounts(),
entry.file().rowCount()));
}
+ private boolean isValueFilterEnabled(ManifestEntry entry) {
+ if (valueFilter == null) {
+ return false;
+ }
+
+ switch (scanMode) {
+ case ALL:
+ return (deletionVectorsEnabled || mergeEngine == FIRST_ROW) &&
entry.level() > 0;
+ case DELTA:
+ return false;
+ case CHANGELOG:
+ return changelogProducer == ChangelogProducer.LOOKUP
+ || changelogProducer ==
ChangelogProducer.FULL_COMPACTION;
+ default:
+ throw new UnsupportedOperationException("Unsupported scan
mode: " + scanMode);
+ }
+ }
+
/** Note: Keep this thread-safe. */
@Override
protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
- if (valueFilter == null) {
+ if (valueFilter == null || scanMode != ScanMode.ALL) {
return entries;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
index 8456ba2ca..44c29ff30 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
@@ -49,7 +50,11 @@ public class StreamTableScanTest extends ScannerTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- StreamTableScan scan = table.newStreamScan();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(readBuilder.readType());
+ readBuilder.withFilter(predicateBuilder.lessOrEqual(2, 300L));
+ StreamTableScan scan = readBuilder.newStreamScan();
// first call without any snapshot, should return empty plan
assertThat(scan.plan().splits()).isEmpty();
@@ -85,6 +90,9 @@ public class StreamTableScanTest extends ScannerTestBase {
write.write(rowData(1, 50, 500L));
commit.commit(3, write.prepareCommit(true, 3));
+ write.write(rowData(1, 60, 600L));
+ commit.commit(4, write.prepareCommit(true, 4));
+
// first incremental call, should return incremental records from 3rd
commit
plan = scan.plan();
assertThat(getResult(read, plan.splits()))
@@ -95,6 +103,11 @@ public class StreamTableScanTest extends ScannerTestBase {
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+ // test value filter not affect to incremental scan
+ plan = scan.plan();
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Collections.singletonList("+I 1|60|600"));
+
// no more new snapshots, should return empty plan
assertThat(scan.plan().splits()).isEmpty();
@@ -111,7 +124,11 @@ public class StreamTableScanTest extends ScannerTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- StreamTableScan scan = table.newStreamScan();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(readBuilder.readType());
+ readBuilder.withFilter(predicateBuilder.lessOrEqual(2, 300L));
+ StreamTableScan scan = readBuilder.newStreamScan();
// first call without any snapshot, should return empty plan
assertThat(scan.plan().splits()).isEmpty();
@@ -156,8 +173,7 @@ public class StreamTableScanTest extends ScannerTestBase {
commit.commit(4, write.prepareCommit(true, 4));
// full compaction done, read new changelog
- plan = scan.plan();
- assertThat(getResult(read, plan.splits()))
+ assertThat(getResult(read, scan.plan().splits()))
.hasSameElementsAs(
Arrays.asList(
"-U 1|10|101",
@@ -166,6 +182,21 @@ public class StreamTableScanTest extends ScannerTestBase {
"+U 1|20|201",
"+I 1|50|500"));
+ // clear all records
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 0L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 0L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 50, 0L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(5, write.prepareCommit(true, 5));
+ assertThat(getResult(read, scan.plan().splits()))
+ .hasSameElementsAs(Arrays.asList("-D 1|10|103", "-D 1|20|201",
"-D 1|50|500"));
+
+ // test value filter to changelog
+ write.write(rowData(1, 60, 600L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(6, write.prepareCommit(true, 6));
+ assertThat(getResult(read, scan.plan().splits())).isEmpty();
+
// no more new snapshots, should return empty plan
assertThat(scan.plan().splits()).isEmpty();