This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a596bebbc [core] Read optimized table supports value filter pushdown 
(#2922)
a596bebbc is described below

commit a596bebbc9510658d93a9f8d282dd2682dc6f44f
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 4 21:30:31 2024 +0800

    [core] Read optimized table supports value filter pushdown (#2922)
---
 .../paimon/operation/AbstractFileStoreScan.java    |  4 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |  4 +-
 .../paimon/operation/KeyValueFileStoreScan.java    | 80 ++++++++++++++++++----
 .../paimon/table/AbstractFileStoreTable.java       |  5 +-
 .../paimon/table/source/InnerTableScanImpl.java    |  4 --
 .../paimon/table/system/ReadOptimizedTable.java    |  1 -
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 37 +++++++---
 7 files changed, 98 insertions(+), 37 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 399ee6c3c..81b173321 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -330,7 +330,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                                         Collectors.toList()))
                         .values()
                         .stream()
-                        .filter(this::filterWholeBucketByStats)
+                        .map(this::filterWholeBucketByStats)
                         .flatMap(Collection::stream)
                         .collect(Collectors.toList());
 
@@ -420,7 +420,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     /** Note: Keep this thread-safe. */
-    protected abstract boolean filterWholeBucketByStats(List<ManifestEntry> 
entries);
+    protected abstract List<ManifestEntry> 
filterWholeBucketByStats(List<ManifestEntry> entries);
 
     /** Note: Keep this thread-safe. */
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 90aa988b6..2cec4e064 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -90,8 +90,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     }
 
     @Override
-    protected boolean filterWholeBucketByStats(List<ManifestEntry> entries) {
+    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
         // We don't need to filter per-bucket entries here
-        return true;
+        return entries;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 02086ceb3..90d2c1b18 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -31,6 +31,8 @@ import org.apache.paimon.stats.FieldStatsConverters;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /** {@link FileStoreScan} for {@link KeyValueFileStore}. */
@@ -104,23 +106,71 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
 
     /** Note: Keep this thread-safe. */
     @Override
-    protected boolean filterWholeBucketByStats(List<ManifestEntry> entries) {
-        // entries come from the same bucket, if any of it doesn't meet the 
request, we could filter
-        // the bucket.
-        if (valueFilter != null) {
-            for (ManifestEntry entry : entries) {
-                FieldStatsArraySerializer serializer =
-                        
fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
-                BinaryTableStats stats = entry.file().valueStats();
-                if (valueFilter.test(
-                        entry.file().rowCount(),
-                        serializer.evolution(stats.minValues()),
-                        serializer.evolution(stats.maxValues()),
-                        serializer.evolution(stats.nullCounts(), 
entry.file().rowCount()))) {
-                    return true;
+    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
+        if (valueFilter == null) {
+            return entries;
+        }
+
+        return noOverlapping(entries)
+                ? filterWholeBucketPerFile(entries)
+                : filterWholeBucketAllFiles(entries);
+    }
+
+    private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry> 
entries) {
+        List<ManifestEntry> filtered = new ArrayList<>();
+        for (ManifestEntry entry : entries) {
+            if (filterByValueFilter(entry)) {
+                filtered.add(entry);
+            }
+        }
+        return filtered;
+    }
+
+    private List<ManifestEntry> filterWholeBucketAllFiles(List<ManifestEntry> 
entries) {
+        // entries come from the same bucket, if any of it doesn't meet the 
request, we could
+        // filter the bucket.
+        for (ManifestEntry entry : entries) {
+            if (filterByValueFilter(entry)) {
+                return entries;
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    private boolean filterByValueFilter(ManifestEntry entry) {
+        FieldStatsArraySerializer serializer =
+                fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
+        BinaryTableStats stats = entry.file().valueStats();
+        return valueFilter.test(
+                entry.file().rowCount(),
+                serializer.evolution(stats.minValues()),
+                serializer.evolution(stats.maxValues()),
+                serializer.evolution(stats.nullCounts(), 
entry.file().rowCount()));
+    }
+
+    private static boolean noOverlapping(List<ManifestEntry> entries) {
+        if (entries.size() <= 1) {
+            return true;
+        }
+
+        Integer previousLevel = null;
+        for (ManifestEntry entry : entries) {
+            int level = entry.file().level();
+            // level 0 files have overlapping
+            if (level == 0) {
+                return false;
+            }
+
+            if (previousLevel == null) {
+                previousLevel = level;
+            } else {
+                // different level, have overlapping
+                if (previousLevel != level) {
+                    return false;
                 }
             }
         }
-        return valueFilter == null;
+
+        return true;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index bdc81f3c1..186f8ed3b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -155,10 +155,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public InnerTableScan newScan() {
         return new InnerTableScanImpl(
-                coreOptions(),
-                newSnapshotReader(),
-                snapshotManager(),
-                DefaultValueAssigner.create(tableSchema));
+                coreOptions(), newSnapshotReader(), 
DefaultValueAssigner.create(tableSchema));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 1e892fe1f..b08da568e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -24,7 +24,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
-import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
@@ -34,7 +33,6 @@ import java.util.List;
 /** {@link TableScan} implementation for batch planning. */
 public class InnerTableScanImpl extends AbstractInnerTableScan {
 
-    private final SnapshotManager snapshotManager;
     private final DefaultValueAssigner defaultValueAssigner;
 
     private StartingScanner startingScanner;
@@ -45,10 +43,8 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
     public InnerTableScanImpl(
             CoreOptions options,
             SnapshotReader snapshotReader,
-            SnapshotManager snapshotManager,
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
-        this.snapshotManager = snapshotManager;
         this.hasNext = true;
         this.defaultValueAssigner = defaultValueAssigner;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 0deac172b..35ac209a9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -107,7 +107,6 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         return new InnerTableScanImpl(
                 coreOptions(),
                 newSnapshotReader(),
-                snapshotManager(),
                 DefaultValueAssigner.create(dataTable.schema()));
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 8c578f00b..d561ce5ca 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -79,9 +80,11 @@ import java.util.function.Function;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
+import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
 import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
 import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.predicate.PredicateBuilder.and;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -315,7 +318,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
         PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(1, 21));
+        Predicate predicate = and(builder.equal(2, 201L), builder.equal(1, 
21));
         List<Split> splits =
                 
toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());
         TableRead read = table.newRead();
@@ -395,7 +398,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
         PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(1, 21));
+        Predicate predicate = and(builder.equal(2, 201L), builder.equal(1, 
21));
         List<Split> splits =
                 toSplits(
                         table.newSnapshotReader()
@@ -910,7 +913,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                 auditLogTable
                         .newSnapshotReader()
                         .withFilter(
-                                PredicateBuilder.and(
+                                and(
                                         
predicateBuilder.equal(predicateBuilder.indexOf("b"), 300),
                                         
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)));
         InnerTableRead read = auditLogTable.newRead();
@@ -1232,7 +1235,9 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
     @Test
     public void testReadOptimizedTable() throws Exception {
-        FileStoreTable table = createFileStoreTable();
+        // let max level has many files
+        FileStoreTable table =
+                createFileStoreTable(options -> options.set(TARGET_FILE_SIZE, 
new MemorySize(1)));
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
 
@@ -1251,16 +1256,14 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 DataTypes.ROW(
                                         DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()));
 
-        SnapshotReader snapshotReader = roTable.newSnapshotReader();
         TableRead read = roTable.newRead();
-        List<String> result =
-                getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        List<String> result = getResult(read, 
roTable.newScan().plan().splits(), rowDataToString);
         assertThat(result).isEmpty();
 
         write.compact(binaryRow(1), 0, true);
         commit.commit(2, write.prepareCommit(true, 2));
 
-        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        result = getResult(read, roTable.newScan().plan().splits(), 
rowDataToString);
         assertThat(result).containsExactlyInAnyOrder("+I[1, 10, 100]", "+I[1, 
11, 110]");
 
         write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 101L));
@@ -1268,11 +1271,27 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.compact(binaryRow(2), 0, true);
         commit.commit(3, write.prepareCommit(true, 3));
 
-        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        result = getResult(read, roTable.newScan().plan().splits(), 
rowDataToString);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         "+I[1, 10, 100]", "+I[1, 11, 110]", "+I[2, 20, 201]", 
"+I[2, 21, 210]");
 
+        // test value filter on ro table
+
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+        Predicate filter = builder.equal(0, 2);
+
+        // no value filter, return two files
+        List<Split> splits = 
roTable.newScan().withFilter(filter).plan().splits();
+        assertThat(splits).hasSize(1);
+        assertThat(splits.get(0).convertToRawFiles().get()).hasSize(2);
+
+        // with value filter, return one files
+        filter = and(filter, builder.equal(2, 210L));
+        splits = roTable.newScan().withFilter(filter).plan().splits();
+        assertThat(splits).hasSize(1);
+        assertThat(splits.get(0).convertToRawFiles().get()).hasSize(1);
+
         write.close();
         commit.close();
     }

Reply via email to