This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a596bebbc [core] Read optimized table supports value filter pushdown
(#2922)
a596bebbc is described below
commit a596bebbc9510658d93a9f8d282dd2682dc6f44f
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 4 21:30:31 2024 +0800
[core] Read optimized table supports value filter pushdown (#2922)
---
.../paimon/operation/AbstractFileStoreScan.java | 4 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 4 +-
.../paimon/operation/KeyValueFileStoreScan.java | 80 ++++++++++++++++++----
.../paimon/table/AbstractFileStoreTable.java | 5 +-
.../paimon/table/source/InnerTableScanImpl.java | 4 --
.../paimon/table/system/ReadOptimizedTable.java | 1 -
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 37 +++++++---
7 files changed, 98 insertions(+), 37 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 399ee6c3c..81b173321 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -330,7 +330,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
Collectors.toList()))
.values()
.stream()
- .filter(this::filterWholeBucketByStats)
+ .map(this::filterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());
@@ -420,7 +420,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
/** Note: Keep this thread-safe. */
- protected abstract boolean filterWholeBucketByStats(List<ManifestEntry>
entries);
+ protected abstract List<ManifestEntry>
filterWholeBucketByStats(List<ManifestEntry> entries);
/** Note: Keep this thread-safe. */
private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta
manifest) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 90aa988b6..2cec4e064 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -90,8 +90,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
}
@Override
- protected boolean filterWholeBucketByStats(List<ManifestEntry> entries) {
+ protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
// We don't need to filter per-bucket entries here
- return true;
+ return entries;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 02086ceb3..90d2c1b18 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -31,6 +31,8 @@ import org.apache.paimon.stats.FieldStatsConverters;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
@@ -104,23 +106,71 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
/** Note: Keep this thread-safe. */
@Override
- protected boolean filterWholeBucketByStats(List<ManifestEntry> entries) {
- // entries come from the same bucket, if any of it doesn't meet the
request, we could filter
- // the bucket.
- if (valueFilter != null) {
- for (ManifestEntry entry : entries) {
- FieldStatsArraySerializer serializer =
-
fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
- BinaryTableStats stats = entry.file().valueStats();
- if (valueFilter.test(
- entry.file().rowCount(),
- serializer.evolution(stats.minValues()),
- serializer.evolution(stats.maxValues()),
- serializer.evolution(stats.nullCounts(),
entry.file().rowCount()))) {
- return true;
+ protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
+ if (valueFilter == null) {
+ return entries;
+ }
+
+ return noOverlapping(entries)
+ ? filterWholeBucketPerFile(entries)
+ : filterWholeBucketAllFiles(entries);
+ }
+
+ private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry>
entries) {
+ List<ManifestEntry> filtered = new ArrayList<>();
+ for (ManifestEntry entry : entries) {
+ if (filterByValueFilter(entry)) {
+ filtered.add(entry);
+ }
+ }
+ return filtered;
+ }
+
+ private List<ManifestEntry> filterWholeBucketAllFiles(List<ManifestEntry>
entries) {
+ // entries come from the same bucket, if any of it doesn't meet the
request, we could
+ // filter the bucket.
+ for (ManifestEntry entry : entries) {
+ if (filterByValueFilter(entry)) {
+ return entries;
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ private boolean filterByValueFilter(ManifestEntry entry) {
+ FieldStatsArraySerializer serializer =
+ fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
+ BinaryTableStats stats = entry.file().valueStats();
+ return valueFilter.test(
+ entry.file().rowCount(),
+ serializer.evolution(stats.minValues()),
+ serializer.evolution(stats.maxValues()),
+ serializer.evolution(stats.nullCounts(),
entry.file().rowCount()));
+ }
+
+ private static boolean noOverlapping(List<ManifestEntry> entries) {
+ if (entries.size() <= 1) {
+ return true;
+ }
+
+ Integer previousLevel = null;
+ for (ManifestEntry entry : entries) {
+ int level = entry.file().level();
+ // level 0 files have overlapping
+ if (level == 0) {
+ return false;
+ }
+
+ if (previousLevel == null) {
+ previousLevel = level;
+ } else {
+ // different level, have overlapping
+ if (previousLevel != level) {
+ return false;
}
}
}
- return valueFilter == null;
+
+ return true;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index bdc81f3c1..186f8ed3b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -155,10 +155,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public InnerTableScan newScan() {
return new InnerTableScanImpl(
- coreOptions(),
- newSnapshotReader(),
- snapshotManager(),
- DefaultValueAssigner.create(tableSchema));
+ coreOptions(), newSnapshotReader(),
DefaultValueAssigner.create(tableSchema));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 1e892fe1f..b08da568e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -24,7 +24,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
-import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -34,7 +33,6 @@ import java.util.List;
/** {@link TableScan} implementation for batch planning. */
public class InnerTableScanImpl extends AbstractInnerTableScan {
- private final SnapshotManager snapshotManager;
private final DefaultValueAssigner defaultValueAssigner;
private StartingScanner startingScanner;
@@ -45,10 +43,8 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
public InnerTableScanImpl(
CoreOptions options,
SnapshotReader snapshotReader,
- SnapshotManager snapshotManager,
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
- this.snapshotManager = snapshotManager;
this.hasNext = true;
this.defaultValueAssigner = defaultValueAssigner;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 0deac172b..35ac209a9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -107,7 +107,6 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
return new InnerTableScanImpl(
coreOptions(),
newSnapshotReader(),
- snapshotManager(),
DefaultValueAssigner.create(dataTable.schema()));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 8c578f00b..d561ce5ca 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -79,9 +80,11 @@ import java.util.function.Function;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
+import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -315,7 +318,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(1, 21));
+ Predicate predicate = and(builder.equal(2, 201L), builder.equal(1,
21));
List<Split> splits =
toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());
TableRead read = table.newRead();
@@ -395,7 +398,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(1, 21));
+ Predicate predicate = and(builder.equal(2, 201L), builder.equal(1,
21));
List<Split> splits =
toSplits(
table.newSnapshotReader()
@@ -910,7 +913,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
auditLogTable
.newSnapshotReader()
.withFilter(
- PredicateBuilder.and(
+ and(
predicateBuilder.equal(predicateBuilder.indexOf("b"), 300),
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)));
InnerTableRead read = auditLogTable.newRead();
@@ -1232,7 +1235,9 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
@Test
public void testReadOptimizedTable() throws Exception {
- FileStoreTable table = createFileStoreTable();
+ // let max level has many files
+ FileStoreTable table =
+ createFileStoreTable(options -> options.set(TARGET_FILE_SIZE,
new MemorySize(1)));
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -1251,16 +1256,14 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
DataTypes.ROW(
DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
- SnapshotReader snapshotReader = roTable.newSnapshotReader();
TableRead read = roTable.newRead();
- List<String> result =
- getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ List<String> result = getResult(read,
roTable.newScan().plan().splits(), rowDataToString);
assertThat(result).isEmpty();
write.compact(binaryRow(1), 0, true);
commit.commit(2, write.prepareCommit(true, 2));
- result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ result = getResult(read, roTable.newScan().plan().splits(),
rowDataToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 10, 100]", "+I[1,
11, 110]");
write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 101L));
@@ -1268,11 +1271,27 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.compact(binaryRow(2), 0, true);
commit.commit(3, write.prepareCommit(true, 3));
- result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ result = getResult(read, roTable.newScan().plan().splits(),
rowDataToString);
assertThat(result)
.containsExactlyInAnyOrder(
"+I[1, 10, 100]", "+I[1, 11, 110]", "+I[2, 20, 201]",
"+I[2, 21, 210]");
+ // test value filter on ro table
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ Predicate filter = builder.equal(0, 2);
+
+ // no value filter, return two files
+ List<Split> splits =
roTable.newScan().withFilter(filter).plan().splits();
+ assertThat(splits).hasSize(1);
+ assertThat(splits.get(0).convertToRawFiles().get()).hasSize(2);
+
+ // with value filter, return one files
+ filter = and(filter, builder.equal(2, 210L));
+ splits = roTable.newScan().withFilter(filter).plan().splits();
+ assertThat(splits).hasSize(1);
+ assertThat(splits.get(0).convertToRawFiles().get()).hasSize(1);
+
write.close();
commit.close();
}