This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new abed79048 [core] Introduce executeFilter to TableRead (#2336)
abed79048 is described below
commit abed7904823ab801308d7e82515f01bd6b20b3b6
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 20 20:04:04 2023 +0800
[core] Introduce executeFilter to TableRead (#2336)
---
.../main/java/org/apache/paimon/predicate/And.java | 11 ++
.../apache/paimon/predicate/CompoundPredicate.java | 8 ++
.../org/apache/paimon/predicate/LeafPredicate.java | 11 ++
.../main/java/org/apache/paimon/predicate/Or.java | 11 ++
.../org/apache/paimon/predicate/Predicate.java | 8 ++
.../predicate/PredicateProjectionConverter.java | 66 ++++++++++++
.../paimon/table/AbstractFileStoreTable.java | 50 ---------
.../paimon/table/AppendOnlyFileStoreTable.java | 22 +---
.../paimon/table/PrimaryKeyFileStoreTable.java | 13 +--
.../paimon/table/source/AbstractDataTableRead.java | 115 +++++++++++++++++++++
.../apache/paimon/table/source/InnerTableRead.java | 5 +
.../paimon/table/source/KeyValueTableRead.java | 8 +-
.../org/apache/paimon/table/source/TableRead.java | 2 +
.../paimon/table/AppendOnlyFileStoreTableTest.java | 67 ++++++++++++
.../flink/source/TestChangelogDataReadWrite.java | 9 +-
15 files changed, 319 insertions(+), 87 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
index 011449ecf..98e53947d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/And.java
@@ -18,6 +18,7 @@
package org.apache.paimon.predicate;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FieldStats;
import java.util.ArrayList;
@@ -43,6 +44,16 @@ public class And extends CompoundPredicate.Function {
return true;
}
+ @Override
+ public boolean test(InternalRow row, List<Predicate> children) {
+ for (Predicate child : children) {
+ if (!child.test(row)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public boolean test(long rowCount, FieldStats[] fieldStats,
List<Predicate> children) {
for (Predicate child : children) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
index dcaa5c949..9d9c2259e 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/CompoundPredicate.java
@@ -18,6 +18,7 @@
package org.apache.paimon.predicate;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FieldStats;
import java.io.Serializable;
@@ -52,6 +53,11 @@ public class CompoundPredicate implements Predicate {
return function.test(values, children);
}
+ @Override
+ public boolean test(InternalRow row) {
+ return function.test(row, children);
+ }
+
@Override
public boolean test(long rowCount, FieldStats[] fieldStats) {
return function.test(rowCount, fieldStats, children);
@@ -81,6 +87,8 @@ public class CompoundPredicate implements Predicate {
public abstract boolean test(Object[] values, List<Predicate>
children);
+ public abstract boolean test(InternalRow row, List<Predicate>
children);
+
public abstract boolean test(
long rowCount, FieldStats[] fieldStats, List<Predicate>
children);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 134384252..638708eaf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -18,6 +18,7 @@
package org.apache.paimon.predicate;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.ListSerializer;
import org.apache.paimon.data.serializer.NullableSerializer;
@@ -25,6 +26,7 @@ import org.apache.paimon.format.FieldStats;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.InternalRowUtils;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -82,11 +84,20 @@ public class LeafPredicate implements Predicate {
return literals;
}
+ public LeafPredicate copyWithNewIndex(int fieldIndex) {
+ return new LeafPredicate(function, type, fieldIndex, fieldName,
literals);
+ }
+
@Override
public boolean test(Object[] values) {
return function.test(type, values[fieldIndex], literals);
}
+ @Override
+ public boolean test(InternalRow row) {
+ return function.test(type, InternalRowUtils.get(row, fieldIndex,
type), literals);
+ }
+
@Override
public boolean test(long rowCount, FieldStats[] fieldStats) {
FieldStats stats = fieldStats[fieldIndex];
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
index 07603e740..e5beb0ddf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Or.java
@@ -18,6 +18,7 @@
package org.apache.paimon.predicate;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FieldStats;
import java.util.ArrayList;
@@ -43,6 +44,16 @@ public class Or extends CompoundPredicate.Function {
return false;
}
+ @Override
+ public boolean test(InternalRow row, List<Predicate> children) {
+ for (Predicate child : children) {
+ if (child.test(row)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public boolean test(long rowCount, FieldStats[] fieldStats,
List<Predicate> children) {
for (Predicate child : children) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
index 699efd7b8..3c9d04b18 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
@@ -19,6 +19,7 @@
package org.apache.paimon.predicate;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FieldStats;
import java.io.Serializable;
@@ -40,6 +41,13 @@ public interface Predicate extends Serializable {
*/
boolean test(Object[] values);
+ /**
+ * Test based on the specific input row.
+ *
+ * @return return true when hit, false when not hit.
+ */
+ boolean test(InternalRow row);
+
/**
* Test based on the statistical information to determine whether a hit is
possible.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
new file mode 100644
index 000000000..8bd627f39
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link PredicateVisitor} which converts {@link Predicate} with
projection. */
+public class PredicateProjectionConverter implements
PredicateVisitor<Optional<Predicate>> {
+
+ private final Map<Integer, Integer> reversed;
+
+ public PredicateProjectionConverter(int[] projection) {
+ this.reversed = new HashMap<>();
+ for (int i = 0; i < projection.length; i++) {
+ reversed.put(projection[i], i);
+ }
+ }
+
+ @Override
+ public Optional<Predicate> visit(LeafPredicate predicate) {
+ int index = predicate.index();
+ Integer adjusted = reversed.get(index);
+ if (adjusted == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(predicate.copyWithNewIndex(adjusted));
+ }
+
+ @Override
+ public Optional<Predicate> visit(CompoundPredicate predicate) {
+ List<Predicate> converted = new ArrayList<>();
+ boolean isAnd = predicate.function() instanceof And;
+ for (Predicate child : predicate.children()) {
+ Optional<Predicate> optional = child.visit(this);
+ if (optional.isPresent()) {
+ converted.add(optional.get());
+ } else {
+ if (!isAnd) {
+ return Optional.empty();
+ }
+ }
+ }
+ return Optional.of(new CompoundPredicate(predicate.function(),
converted));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4a21e50c7..840173ff3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
@@ -33,7 +31,6 @@ import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
@@ -46,12 +43,9 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
-import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.InnerTableScanImpl;
-import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
@@ -401,50 +395,6 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}
- abstract InnerTableRead innerRead();
-
- @Override
- public InnerTableRead newRead() {
- InnerTableRead innerTableRead = innerRead();
- DefaultValueAssigner defaultValueAssigner =
DefaultValueAssigner.create(tableSchema);
- if (!defaultValueAssigner.needToAssign()) {
- return innerTableRead;
- }
-
- return new InnerTableRead() {
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
-
innerTableRead.withFilter(defaultValueAssigner.handlePredicate(predicate));
- return this;
- }
-
- @Override
- public InnerTableRead withProjection(int[][] projection) {
- defaultValueAssigner.handleProject(projection);
- innerTableRead.withProjection(projection);
- return this;
- }
-
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- innerTableRead.withIOManager(ioManager);
- return this;
- }
-
- @Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- return defaultValueAssigner.assignFieldsDefaultValue(
- innerTableRead.createReader(split));
- }
-
- @Override
- public InnerTableRead forceKeepDelete() {
- innerTableRead.forceKeepDelete();
- return this;
- }
- };
- }
-
@Override
public void createTag(String tagName, long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 272870cc4..dc865d93c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table;
import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -34,12 +33,12 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Preconditions;
@@ -110,28 +109,17 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public InnerTableRead innerRead() {
+ public InnerTableRead newRead() {
AppendOnlyFileStoreRead read = store().newRead();
- return new InnerTableRead() {
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- read.withFilter(predicate);
- return this;
- }
+ return new AbstractDataTableRead<InternalRow>(read, schema()) {
@Override
- public InnerTableRead withProjection(int[][] projection) {
+ public void projection(int[][] projection) {
read.withProjection(projection);
- return this;
- }
-
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- return this;
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> reader(Split split) throws
IOException {
return read.createReader((DataSplit) split);
}
};
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 02c6a36c2..feb643937 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -151,19 +151,12 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public InnerTableRead innerRead() {
- return new KeyValueTableRead(store().newRead()) {
+ public InnerTableRead newRead() {
+ return new KeyValueTableRead(store().newRead(), schema()) {
@Override
- public InnerTableRead withFilter(Predicate predicate) {
- read.withFilter(predicate);
- return this;
- }
-
- @Override
- public InnerTableRead withProjection(int[][] projection) {
+ public void projection(int[][] projection) {
read.withValueProjection(projection);
- return this;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
new file mode 100644
index 000000000..930cddcd5
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.operation.FileStoreRead;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateProjectionConverter;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.Projection;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** A {@link InnerTableRead} for data table. */
+public abstract class AbstractDataTableRead<T> implements InnerTableRead {
+
+ private final FileStoreRead<T> fileStoreRead;
+ private final DefaultValueAssigner defaultValueAssigner;
+
+ private int[][] projection;
+ private boolean executeFilter = false;
+ private Predicate predicate;
+
+ public AbstractDataTableRead(FileStoreRead<T> fileStoreRead, TableSchema
schema) {
+ this.fileStoreRead = fileStoreRead;
+ this.defaultValueAssigner = schema == null ? null :
DefaultValueAssigner.create(schema);
+ }
+
+ public abstract void projection(int[][] projection);
+
+ public abstract RecordReader<InternalRow> reader(Split split) throws
IOException;
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public final InnerTableRead withFilter(Predicate predicate) {
+ this.predicate = predicate;
+ if (defaultValueAssigner != null) {
+ predicate = defaultValueAssigner.handlePredicate(predicate);
+ }
+ fileStoreRead.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public TableRead executeFilter() {
+ this.executeFilter = true;
+ return this;
+ }
+
+ @Override
+ public final InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ this.defaultValueAssigner.handleProject(projection);
+ projection(projection);
+ return this;
+ }
+
+ @Override
+ public final RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ RecordReader<InternalRow> reader = reader(split);
+ if (defaultValueAssigner != null) {
+ reader = defaultValueAssigner.assignFieldsDefaultValue(reader);
+ }
+ if (executeFilter) {
+ reader = executeFilter(reader);
+ }
+
+ return reader;
+ }
+
+ private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow>
reader) {
+ if (predicate == null) {
+ return reader;
+ }
+
+ Predicate predicate = this.predicate;
+ if (projection != null) {
+ Optional<Predicate> optional =
+ predicate.visit(
+ new PredicateProjectionConverter(
+
Projection.of(projection).toTopLevelIndexes()));
+ if (!optional.isPresent()) {
+ return reader;
+ }
+ predicate = optional.get();
+ }
+
+ Predicate finalFilter = predicate;
+ return reader.filter(finalFilter::test);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index ea07bc969..a538fc0e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -50,4 +50,9 @@ public interface InnerTableRead extends TableRead {
default InnerTableRead forceKeepDelete() {
return this;
}
+
+ @Override
+ default TableRead executeFilter() {
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index a838806ee..aaebc1532 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
import javax.annotation.Nullable;
@@ -32,11 +33,12 @@ import java.io.IOException;
* An abstraction layer above {@link KeyValueFileStoreRead} to provide reading
of {@link
* InternalRow}.
*/
-public abstract class KeyValueTableRead implements InnerTableRead {
+public abstract class KeyValueTableRead extends
AbstractDataTableRead<KeyValue> {
protected final KeyValueFileStoreRead read;
- protected KeyValueTableRead(KeyValueFileStoreRead read) {
+ protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema
schema) {
+ super(read, schema);
this.read = read;
}
@@ -47,7 +49,7 @@ public abstract class KeyValueTableRead implements
InnerTableRead {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public final RecordReader<InternalRow> reader(Split split) throws
IOException {
return new RowDataRecordReader(read.createReader((DataSplit) split));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index adc8dbed9..72b54ae6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -37,6 +37,8 @@ import java.util.List;
@Public
public interface TableRead {
+ TableRead executeFilter();
+
TableRead withIOManager(IOManager ioManager);
RecordReader<InternalRow> createReader(Split split) throws IOException;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 768372162..d91e0a320 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
@@ -117,6 +118,72 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testBatchFilterWithExecution() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+
+ // simple
+ TableRead read = table.newRead().withFilter(builder.equal(2,
201L)).executeFilter();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+
+ // or
+ read =
+ table.newRead()
+ .withFilter(
+ PredicateBuilder.or(builder.equal(2, 201L),
builder.equal(2, 500L)))
+ .executeFilter();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+
+ // projection all in
+ read =
+ table.newRead()
+ .withFilter(
+ PredicateBuilder.or(builder.equal(2, 201L),
builder.equal(2, 500L)))
+ .withProjection(new int[] {3, 2})
+ .executeFilter();
+ Function<InternalRow, String> toString =
+ rowData -> rowData.getLong(1) + "|" + new
String(rowData.getBinary(0));
+ assertThat(getResult(read, splits, binaryRow(1), 0,
toString)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0, toString))
+ .hasSameElementsAs(Arrays.asList("201|binary", "201|binary"));
+
+ // projection contains unknown index or
+ read =
+ table.newRead()
+ .withFilter(
+ PredicateBuilder.or(builder.equal(2, 201L),
builder.equal(0, 1)))
+ .withProjection(new int[] {3, 2})
+ .executeFilter();
+ assertThat(getResult(read, splits, binaryRow(2), 0, toString))
+ .hasSameElementsAs(
+ Arrays.asList("200|binary", "201|binary",
"202|binary", "201|binary"));
+
+ // projection contains unknown index and
+ read =
+ table.newRead()
+ .withFilter(
+ PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(0, 1)))
+ .withProjection(new int[] {3, 2})
+ .executeFilter();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
toString)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0, toString))
+ .hasSameElementsAs(Arrays.asList("201|binary", "201|binary"));
+ }
+
@Test
public void testSplitOrder() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 0f9d1cd30..55a671879 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -34,7 +34,6 @@ import
org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
@@ -131,14 +130,10 @@ public class TestChangelogDataReadWrite {
pathFactory,
EXTRACTOR,
new CoreOptions(new HashMap<>()));
- return new KeyValueTableRead(read) {
- @Override
- public KeyValueTableRead withFilter(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
+ return new KeyValueTableRead(read, null) {
@Override
- public KeyValueTableRead withProjection(int[][] projection) {
+ public void projection(int[][] projection) {
throw new UnsupportedOperationException();
}