This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new abed79048 [core] Introduce executeFilter to TableRead (#2336)
abed79048 is described below

commit abed7904823ab801308d7e82515f01bd6b20b3b6
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 20 20:04:04 2023 +0800

    [core] Introduce executeFilter to TableRead (#2336)
---
 .../main/java/org/apache/paimon/predicate/And.java |  11 ++
 .../apache/paimon/predicate/CompoundPredicate.java |   8 ++
 .../org/apache/paimon/predicate/LeafPredicate.java |  11 ++
 .../main/java/org/apache/paimon/predicate/Or.java  |  11 ++
 .../org/apache/paimon/predicate/Predicate.java     |   8 ++
 .../predicate/PredicateProjectionConverter.java    |  66 ++++++++++++
 .../paimon/table/AbstractFileStoreTable.java       |  50 ---------
 .../paimon/table/AppendOnlyFileStoreTable.java     |  22 +---
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  13 +--
 .../paimon/table/source/AbstractDataTableRead.java | 115 +++++++++++++++++++++
 .../apache/paimon/table/source/InnerTableRead.java |   5 +
 .../paimon/table/source/KeyValueTableRead.java     |   8 +-
 .../org/apache/paimon/table/source/TableRead.java  |   2 +
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  67 ++++++++++++
 .../flink/source/TestChangelogDataReadWrite.java   |   9 +-
 15 files changed, 319 insertions(+), 87 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/And.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
index 011449ecf..98e53947d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.predicate;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FieldStats;
 
 import java.util.ArrayList;
@@ -43,6 +44,16 @@ public class And extends CompoundPredicate.Function {
         return true;
     }
 
+    @Override
+    public boolean test(InternalRow row, List<Predicate> children) {
+        for (Predicate child : children) {
+            if (!child.test(row)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats, 
List<Predicate> children) {
         for (Predicate child : children) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
index dcaa5c949..9d9c2259e 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.predicate;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FieldStats;
 
 import java.io.Serializable;
@@ -52,6 +53,11 @@ public class CompoundPredicate implements Predicate {
         return function.test(values, children);
     }
 
+    @Override
+    public boolean test(InternalRow row) {
+        return function.test(row, children);
+    }
+
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         return function.test(rowCount, fieldStats, children);
@@ -81,6 +87,8 @@ public class CompoundPredicate implements Predicate {
 
         public abstract boolean test(Object[] values, List<Predicate> 
children);
 
+        public abstract boolean test(InternalRow row, List<Predicate> 
children);
+
         public abstract boolean test(
                 long rowCount, FieldStats[] fieldStats, List<Predicate> 
children);
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 134384252..638708eaf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.predicate;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.data.serializer.ListSerializer;
 import org.apache.paimon.data.serializer.NullableSerializer;
@@ -25,6 +26,7 @@ import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.InternalRowUtils;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -82,11 +84,20 @@ public class LeafPredicate implements Predicate {
         return literals;
     }
 
+    public LeafPredicate copyWithNewIndex(int fieldIndex) {
+        return new LeafPredicate(function, type, fieldIndex, fieldName, 
literals);
+    }
+
     @Override
     public boolean test(Object[] values) {
         return function.test(type, values[fieldIndex], literals);
     }
 
+    @Override
+    public boolean test(InternalRow row) {
+        return function.test(type, InternalRowUtils.get(row, fieldIndex, 
type), literals);
+    }
+
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         FieldStats stats = fieldStats[fieldIndex];
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
index 07603e740..e5beb0ddf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.predicate;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FieldStats;
 
 import java.util.ArrayList;
@@ -43,6 +44,16 @@ public class Or extends CompoundPredicate.Function {
         return false;
     }
 
+    @Override
+    public boolean test(InternalRow row, List<Predicate> children) {
+        for (Predicate child : children) {
+            if (child.test(row)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats, 
List<Predicate> children) {
         for (Predicate child : children) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
index 699efd7b8..3c9d04b18 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.predicate;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FieldStats;
 
 import java.io.Serializable;
@@ -40,6 +41,13 @@ public interface Predicate extends Serializable {
      */
     boolean test(Object[] values);
 
+    /**
+     * Test based on the specific input row.
+     *
+     * @return return true when hit, false when not hit.
+     */
+    boolean test(InternalRow row);
+
     /**
      * Test based on the statistical information to determine whether a hit is 
possible.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
new file mode 100644
index 000000000..8bd627f39
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link PredicateVisitor} which converts {@link Predicate} with 
projection. */
+public class PredicateProjectionConverter implements 
PredicateVisitor<Optional<Predicate>> {
+
+    private final Map<Integer, Integer> reversed;
+
+    public PredicateProjectionConverter(int[] projection) {
+        this.reversed = new HashMap<>();
+        for (int i = 0; i < projection.length; i++) {
+            reversed.put(projection[i], i);
+        }
+    }
+
+    @Override
+    public Optional<Predicate> visit(LeafPredicate predicate) {
+        int index = predicate.index();
+        Integer adjusted = reversed.get(index);
+        if (adjusted == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(predicate.copyWithNewIndex(adjusted));
+    }
+
+    @Override
+    public Optional<Predicate> visit(CompoundPredicate predicate) {
+        List<Predicate> converted = new ArrayList<>();
+        boolean isAnd = predicate.function() instanceof And;
+        for (Predicate child : predicate.children()) {
+            Optional<Predicate> optional = child.visit(this);
+            if (optional.isPresent()) {
+                converted.add(optional.get());
+            } else {
+                if (!isAnd) {
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.of(new CompoundPredicate(predicate.function(), 
converted));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4a21e50c7..840173ff3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
@@ -33,7 +31,6 @@ import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
@@ -46,12 +43,9 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerStreamTableScanImpl;
-import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.InnerTableScanImpl;
-import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
@@ -401,50 +395,6 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
     }
 
-    abstract InnerTableRead innerRead();
-
-    @Override
-    public InnerTableRead newRead() {
-        InnerTableRead innerTableRead = innerRead();
-        DefaultValueAssigner defaultValueAssigner = 
DefaultValueAssigner.create(tableSchema);
-        if (!defaultValueAssigner.needToAssign()) {
-            return innerTableRead;
-        }
-
-        return new InnerTableRead() {
-            @Override
-            public InnerTableRead withFilter(Predicate predicate) {
-                
innerTableRead.withFilter(defaultValueAssigner.handlePredicate(predicate));
-                return this;
-            }
-
-            @Override
-            public InnerTableRead withProjection(int[][] projection) {
-                defaultValueAssigner.handleProject(projection);
-                innerTableRead.withProjection(projection);
-                return this;
-            }
-
-            @Override
-            public TableRead withIOManager(IOManager ioManager) {
-                innerTableRead.withIOManager(ioManager);
-                return this;
-            }
-
-            @Override
-            public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-                return defaultValueAssigner.assignFieldsDefaultValue(
-                        innerTableRead.createReader(split));
-            }
-
-            @Override
-            public InnerTableRead forceKeepDelete() {
-                innerTableRead.forceKeepDelete();
-                return this;
-            }
-        };
-    }
-
     @Override
     public void createTag(String tagName, long fromSnapshotId) {
         SnapshotManager snapshotManager = snapshotManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 272870cc4..dc865d93c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table;
 import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -34,12 +33,12 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.AbstractDataTableRead;
 import org.apache.paimon.table.source.AppendOnlySplitGenerator;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Preconditions;
 
@@ -110,28 +109,17 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public InnerTableRead innerRead() {
+    public InnerTableRead newRead() {
         AppendOnlyFileStoreRead read = store().newRead();
-        return new InnerTableRead() {
-            @Override
-            public InnerTableRead withFilter(Predicate predicate) {
-                read.withFilter(predicate);
-                return this;
-            }
+        return new AbstractDataTableRead<InternalRow>(read, schema()) {
 
             @Override
-            public InnerTableRead withProjection(int[][] projection) {
+            public void projection(int[][] projection) {
                 read.withProjection(projection);
-                return this;
-            }
-
-            @Override
-            public TableRead withIOManager(IOManager ioManager) {
-                return this;
             }
 
             @Override
-            public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            public RecordReader<InternalRow> reader(Split split) throws 
IOException {
                 return read.createReader((DataSplit) split);
             }
         };
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 02c6a36c2..feb643937 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -151,19 +151,12 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public InnerTableRead innerRead() {
-        return new KeyValueTableRead(store().newRead()) {
+    public InnerTableRead newRead() {
+        return new KeyValueTableRead(store().newRead(), schema()) {
 
             @Override
-            public InnerTableRead withFilter(Predicate predicate) {
-                read.withFilter(predicate);
-                return this;
-            }
-
-            @Override
-            public InnerTableRead withProjection(int[][] projection) {
+            public void projection(int[][] projection) {
                 read.withValueProjection(projection);
-                return this;
             }
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
new file mode 100644
index 000000000..930cddcd5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.operation.FileStoreRead;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateProjectionConverter;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.Projection;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** A {@link InnerTableRead} for data table. */
+public abstract class AbstractDataTableRead<T> implements InnerTableRead {
+
+    private final FileStoreRead<T> fileStoreRead;
+    private final DefaultValueAssigner defaultValueAssigner;
+
+    private int[][] projection;
+    private boolean executeFilter = false;
+    private Predicate predicate;
+
+    public AbstractDataTableRead(FileStoreRead<T> fileStoreRead, TableSchema 
schema) {
+        this.fileStoreRead = fileStoreRead;
+        this.defaultValueAssigner = schema == null ? null : 
DefaultValueAssigner.create(schema);
+    }
+
+    public abstract void projection(int[][] projection);
+
+    public abstract RecordReader<InternalRow> reader(Split split) throws 
IOException;
+
+    @Override
+    public TableRead withIOManager(IOManager ioManager) {
+        return this;
+    }
+
+    @Override
+    public final InnerTableRead withFilter(Predicate predicate) {
+        this.predicate = predicate;
+        if (defaultValueAssigner != null) {
+            predicate = defaultValueAssigner.handlePredicate(predicate);
+        }
+        fileStoreRead.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public TableRead executeFilter() {
+        this.executeFilter = true;
+        return this;
+    }
+
+    @Override
+    public final InnerTableRead withProjection(int[][] projection) {
+        this.projection = projection;
+        this.defaultValueAssigner.handleProject(projection);
+        projection(projection);
+        return this;
+    }
+
+    @Override
+    public final RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        RecordReader<InternalRow> reader = reader(split);
+        if (defaultValueAssigner != null) {
+            reader = defaultValueAssigner.assignFieldsDefaultValue(reader);
+        }
+        if (executeFilter) {
+            reader = executeFilter(reader);
+        }
+
+        return reader;
+    }
+
+    private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow> 
reader) {
+        if (predicate == null) {
+            return reader;
+        }
+
+        Predicate predicate = this.predicate;
+        if (projection != null) {
+            Optional<Predicate> optional =
+                    predicate.visit(
+                            new PredicateProjectionConverter(
+                                    
Projection.of(projection).toTopLevelIndexes()));
+            if (!optional.isPresent()) {
+                return reader;
+            }
+            predicate = optional.get();
+        }
+
+        Predicate finalFilter = predicate;
+        return reader.filter(finalFilter::test);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index ea07bc969..a538fc0e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -50,4 +50,9 @@ public interface InnerTableRead extends TableRead {
     default InnerTableRead forceKeepDelete() {
         return this;
     }
+
+    @Override
+    default TableRead executeFilter() {
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index a838806ee..aaebc1532 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.KeyValueFileStoreRead;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
 
 import javax.annotation.Nullable;
 
@@ -32,11 +33,12 @@ import java.io.IOException;
  * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading 
of {@link
  * InternalRow}.
  */
-public abstract class KeyValueTableRead implements InnerTableRead {
+public abstract class KeyValueTableRead extends 
AbstractDataTableRead<KeyValue> {
 
     protected final KeyValueFileStoreRead read;
 
-    protected KeyValueTableRead(KeyValueFileStoreRead read) {
+    protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema 
schema) {
+        super(read, schema);
         this.read = read;
     }
 
@@ -47,7 +49,7 @@ public abstract class KeyValueTableRead implements 
InnerTableRead {
     }
 
     @Override
-    public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+    public final RecordReader<InternalRow> reader(Split split) throws 
IOException {
         return new RowDataRecordReader(read.createReader((DataSplit) split));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index adc8dbed9..72b54ae6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -37,6 +37,8 @@ import java.util.List;
 @Public
 public interface TableRead {
 
+    TableRead executeFilter();
+
     TableRead withIOManager(IOManager ioManager);
 
     RecordReader<InternalRow> createReader(Split split) throws IOException;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 768372162..d91e0a320 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
@@ -117,6 +118,72 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testBatchFilterWithExecution() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+
+        // simple
+        TableRead read = table.newRead().withFilter(builder.equal(2, 
201L)).executeFilter();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+
+        // or
+        read =
+                table.newRead()
+                        .withFilter(
+                                PredicateBuilder.or(builder.equal(2, 201L), 
builder.equal(2, 500L)))
+                        .executeFilter();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+
+        // projection all in
+        read =
+                table.newRead()
+                        .withFilter(
+                                PredicateBuilder.or(builder.equal(2, 201L), 
builder.equal(2, 500L)))
+                        .withProjection(new int[] {3, 2})
+                        .executeFilter();
+        Function<InternalRow, String> toString =
+                rowData -> rowData.getLong(1) + "|" + new 
String(rowData.getBinary(0));
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
toString)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, toString))
+                .hasSameElementsAs(Arrays.asList("201|binary", "201|binary"));
+
+        // projection contains unknown index or
+        read =
+                table.newRead()
+                        .withFilter(
+                                PredicateBuilder.or(builder.equal(2, 201L), 
builder.equal(0, 1)))
+                        .withProjection(new int[] {3, 2})
+                        .executeFilter();
+        assertThat(getResult(read, splits, binaryRow(2), 0, toString))
+                .hasSameElementsAs(
+                        Arrays.asList("200|binary", "201|binary", 
"202|binary", "201|binary"));
+
+        // projection contains unknown index and
+        read =
+                table.newRead()
+                        .withFilter(
+                                PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(0, 1)))
+                        .withProjection(new int[] {3, 2})
+                        .executeFilter();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
toString)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, toString))
+                .hasSameElementsAs(Arrays.asList("201|binary", "201|binary"));
+    }
+
     @Test
     public void testSplitOrder() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 0f9d1cd30..55a671879 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -34,7 +34,6 @@ import 
org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.operation.KeyValueFileStoreRead;
 import org.apache.paimon.operation.KeyValueFileStoreWrite;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
@@ -131,14 +130,10 @@ public class TestChangelogDataReadWrite {
                         pathFactory,
                         EXTRACTOR,
                         new CoreOptions(new HashMap<>()));
-        return new KeyValueTableRead(read) {
-            @Override
-            public KeyValueTableRead withFilter(Predicate predicate) {
-                throw new UnsupportedOperationException();
-            }
+        return new KeyValueTableRead(read, null) {
 
             @Override
-            public KeyValueTableRead withProjection(int[][] projection) {
+            public void projection(int[][] projection) {
                 throw new UnsupportedOperationException();
             }
 

Reply via email to