This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 022da71  feat: push down filters to parquet read path (#208)
022da71 is described below

commit 022da71dde98e5ff17638720e62e1c643e1b48ed
Author: Zach <[email protected]>
AuthorDate: Mon Apr 6 15:34:57 2026 +0800

    feat: push down filters to parquet read path (#208)
---
 Cargo.toml                                         |   3 +-
 bindings/c/src/table.rs                            |   6 +-
 crates/integrations/datafusion/Cargo.toml          |   6 +
 .../datafusion/src/physical_plan/scan.rs           | 123 +++-
 crates/integrations/datafusion/src/table/mod.rs    |  36 +-
 crates/paimon/Cargo.toml                           |   1 +
 crates/paimon/src/arrow/filtering.rs               |  74 ++
 crates/paimon/src/arrow/mod.rs                     |   1 +
 crates/paimon/src/arrow/reader.rs                  | 810 ++++++++++++++++++++-
 crates/paimon/src/io/file_io.rs                    |  62 +-
 crates/paimon/src/lib.rs                           |   1 +
 crates/paimon/src/predicate_stats.rs               | 195 +++++
 crates/paimon/src/table/read_builder.rs            | 395 +++++++++-
 crates/paimon/src/table/stats_filter.rs            | 266 ++-----
 crates/paimon/src/table/table_scan.rs              | 109 ++-
 crates/test_utils.rs                               | 101 +++
 16 files changed, 1855 insertions(+), 334 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 004fb9d..cfba61f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -32,7 +32,8 @@ arrow = "57.0"
 arrow-array = { version = "57.0", features = ["ffi"] }
 arrow-schema = "57.0"
 arrow-cast = "57.0"
+arrow-ord = "57.0"
 datafusion = "52.3.0"
 datafusion-ffi = "52.3.0"
 parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+tokio = "1.39.2"
diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs
index c3f57fe..6f6637d 100644
--- a/bindings/c/src/table.rs
+++ b/bindings/c/src/table.rs
@@ -349,8 +349,10 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
     let end = (offset.saturating_add(length)).min(all_splits.len());
     let selected = &all_splits[start..end];
 
-    // Create TableRead with the stored read_type (projection)
-    let table_read = paimon::table::TableRead::new(&state.table, 
state.read_type.clone());
+    // C bindings currently persist only the projection, so reconstructing the
+    // read uses an empty predicate set.
+    let table_read =
+        paimon::table::TableRead::new(&state.table, state.read_type.clone(), 
Vec::new());
 
     match table_read.to_arrow(selected) {
         Ok(stream) => {
diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index 4fdff40..370279c 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -35,4 +35,10 @@ futures = "0.3"
 tokio = { workspace = true, features = ["rt", "time", "fs"] }
 
 [dev-dependencies]
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+parquet = { workspace = true }
+serde = "1"
+serde_json = "1"
+tempfile = "3"
 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index a8c9aea..d7dfc7a 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -26,6 +26,7 @@ use datafusion::physical_plan::execution_plan::{Boundedness, 
EmissionType};
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
 use futures::{StreamExt, TryStreamExt};
+use paimon::spec::Predicate;
 use paimon::table::Table;
 use paimon::DataSplit;
 
@@ -41,6 +42,9 @@ pub struct PaimonTableScan {
     table: Table,
     /// Projected column names (if None, reads all columns).
     projected_columns: Option<Vec<String>>,
+    /// Filter translated from DataFusion expressions and reused during 
execute()
+    /// so reader-side pruning reaches the actual read path.
+    pushed_predicate: Option<Predicate>,
     /// Pre-planned partition assignments: `planned_partitions[i]` contains the
     /// Paimon splits that DataFusion partition `i` will read.
     /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in 
`execute()`.
@@ -55,6 +59,7 @@ impl PaimonTableScan {
         schema: ArrowSchemaRef,
         table: Table,
         projected_columns: Option<Vec<String>>,
+        pushed_predicate: Option<Predicate>,
         planned_partitions: Vec<Arc<[DataSplit]>>,
         limit: Option<usize>,
     ) -> Self {
@@ -67,6 +72,7 @@ impl PaimonTableScan {
         Self {
             table,
             projected_columns,
+            pushed_predicate,
             planned_partitions,
             plan_properties,
             limit,
@@ -82,6 +88,11 @@ impl PaimonTableScan {
         &self.planned_partitions
     }
 
+    #[cfg(test)]
+    pub(crate) fn pushed_predicate(&self) -> Option<&Predicate> {
+        self.pushed_predicate.as_ref()
+    }
+
     pub fn limit(&self) -> Option<usize> {
         self.limit
     }
@@ -126,6 +137,7 @@ impl ExecutionPlan for PaimonTableScan {
         let table = self.table.clone();
         let schema = self.schema();
         let projected_columns = self.projected_columns.clone();
+        let pushed_predicate = self.pushed_predicate.clone();
 
         let fut = async move {
             let mut read_builder = table.new_read_builder();
@@ -134,6 +146,9 @@ impl ExecutionPlan for PaimonTableScan {
                 let col_refs: Vec<&str> = columns.iter().map(|s| 
s.as_str()).collect();
                 read_builder.with_projection(&col_refs);
             }
+            if let Some(filter) = pushed_predicate {
+                read_builder.with_filter(filter);
+            }
 
             let read = read_builder.new_read().map_err(to_datafusion_error)?;
             let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
@@ -173,11 +188,26 @@ impl DisplayAs for PaimonTableScan {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, 
Schema};
+    mod test_utils {
+        include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../test_utils.rs"));
+    }
+
+    use datafusion::arrow::array::Int32Array;
+    use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, 
Schema as ArrowSchema};
     use datafusion::physical_plan::ExecutionPlan;
+    use datafusion::prelude::SessionContext;
+    use futures::TryStreamExt;
+    use paimon::catalog::Identifier;
+    use paimon::io::FileIOBuilder;
+    use paimon::spec::{
+        BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as 
PaimonSchema, TableSchema,
+    };
+    use std::fs;
+    use tempfile::tempdir;
+    use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
 
     fn test_schema() -> ArrowSchemaRef {
-        Arc::new(Schema::new(vec![Field::new(
+        Arc::new(ArrowSchema::new(vec![Field::new(
             "id",
             ArrowDataType::Int32,
             false,
@@ -191,6 +221,7 @@ mod tests {
             schema,
             dummy_table(),
             None,
+            None,
             vec![Arc::from(Vec::new())],
             None,
         );
@@ -205,19 +236,16 @@ mod tests {
             Arc::from(Vec::new()),
             Arc::from(Vec::new()),
         ];
-        let scan = PaimonTableScan::new(schema, dummy_table(), None, 
planned_partitions, None);
+        let scan =
+            PaimonTableScan::new(schema, dummy_table(), None, None, 
planned_partitions, None);
         assert_eq!(scan.properties().output_partitioning().partition_count(), 
3);
     }
 
     /// Constructs a minimal Table for testing (no real files needed since we
     /// only test PlanProperties, not actual reads).
     fn dummy_table() -> Table {
-        use paimon::catalog::Identifier;
-        use paimon::io::FileIOBuilder;
-        use paimon::spec::{Schema, TableSchema};
-
         let file_io = FileIOBuilder::new("file").build().unwrap();
-        let schema = Schema::builder().build().unwrap();
+        let schema = PaimonSchema::builder().build().unwrap();
         let table_schema = TableSchema::new(0, &schema);
         Table::new(
             file_io,
@@ -226,4 +254,83 @@ mod tests {
             table_schema,
         )
     }
+
+    #[tokio::test]
+    async fn test_execute_applies_pushed_filter_during_read() {
+        let tempdir = tempdir().unwrap();
+        let table_path = local_file_path(tempdir.path());
+        let bucket_dir = tempdir.path().join("bucket-0");
+        fs::create_dir_all(&bucket_dir).unwrap();
+
+        write_int_parquet_file(
+            &bucket_dir.join("data.parquet"),
+            vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
+            Some(2),
+        );
+
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let table_schema = TableSchema::new(
+            0,
+            &paimon::spec::Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .build()
+                .unwrap(),
+        );
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "t"),
+            table_path,
+            table_schema,
+        );
+
+        let split = paimon::DataSplitBuilder::new()
+            .with_snapshot(1)
+            .with_partition(BinaryRow::new(0))
+            .with_bucket(0)
+            .with_bucket_path(local_file_path(&bucket_dir))
+            .with_total_buckets(1)
+            .with_data_files(vec![test_data_file("data.parquet", 4)])
+            .with_raw_convertible(true)
+            .build()
+            .unwrap();
+
+        let pushed_predicate = PredicateBuilder::new(table.schema().fields())
+            .greater_or_equal("value", Datum::Int(10))
+            .unwrap();
+
+        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
+            "id",
+            ArrowDataType::Int32,
+            false,
+        )]));
+        let scan = PaimonTableScan::new(
+            schema,
+            table,
+            Some(vec!["id".to_string()]),
+            Some(pushed_predicate),
+            vec![Arc::from(vec![split])],
+            None,
+        );
+
+        let ctx = SessionContext::new();
+        let stream = scan
+            .execute(0, ctx.task_ctx())
+            .expect("execute should succeed");
+        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
+
+        let actual_ids: Vec<i32> = batches
+            .iter()
+            .flat_map(|batch| {
+                let ids = batch
+                    .column(0)
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .expect("id column should be Int32Array");
+                (0..ids.len()).map(|idx| ids.value(idx)).collect::<Vec<_>>()
+            })
+            .collect();
+
+        assert_eq!(actual_ids, vec![2, 3, 4]);
+    }
 }
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 3c4a98c..a76174d 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -36,8 +36,13 @@ use crate::runtime::await_with_runtime;
 
 /// Read-only table provider for a Paimon table.
 ///
-/// Supports full table scan, column projection, and partition predicate 
pushdown.
-/// Data-level filtering remains a residual DataFusion filter.
+/// Supports full table scan, column projection, and predicate pushdown for
+/// planning. Partition predicates prune splits eagerly, while supported
+/// non-partition data predicates may also be reused by the Parquet read path
+/// for row-group pruning and partial decode-time filtering.
+///
+/// DataFusion still treats pushed filters as inexact because unsupported
+/// predicates and non-Parquet reads remain residual filters.
 #[derive(Debug, Clone)]
 pub struct PaimonTableProvider {
     table: Table,
@@ -103,8 +108,9 @@ impl TableProvider for PaimonTableProvider {
         };
 
         // Plan splits eagerly so we know partition count upfront.
+        let pushed_predicate = build_pushed_predicate(filters, 
self.table.schema().fields());
         let mut read_builder = self.table.new_read_builder();
-        if let Some(filter) = build_pushed_predicate(filters, 
self.table.schema().fields()) {
+        if let Some(filter) = pushed_predicate.clone() {
             read_builder.with_filter(filter);
         }
         // Push the limit hint to paimon-core planning to reduce splits when 
possible.
@@ -141,6 +147,7 @@ impl TableProvider for PaimonTableProvider {
             projected_schema,
             self.table.clone(),
             projected_columns,
+            pushed_predicate,
             planned_partitions,
             limit,
         )))
@@ -318,4 +325,27 @@ mod tests {
             BTreeSet::from([("2024-01-01".to_string(), 10)]),
         );
     }
+
+    #[tokio::test]
+    async fn test_scan_keeps_pushed_predicate_for_execute() {
+        let provider = create_provider("partitioned_log_table").await;
+        let filter = col("id").gt(lit(1));
+
+        let config = SessionConfig::new().with_target_partitions(8);
+        let ctx = SessionContext::new_with_config(config);
+        let state = ctx.state();
+        let plan = provider
+            .scan(&state, None, std::slice::from_ref(&filter), None)
+            .await
+            .expect("scan() should succeed");
+        let scan = plan
+            .as_any()
+            .downcast_ref::<PaimonTableScan>()
+            .expect("Expected PaimonTableScan");
+
+        let expected = build_pushed_predicate(&[filter], 
provider.table().schema().fields())
+            .expect("data filter should translate");
+
+        assert_eq!(scan.pushed_predicate(), Some(&expected));
+    }
 }
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index a4a4ae4..d5d75ae 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -57,6 +57,7 @@ indexmap = "2.5.0"
 roaring = "0.11"
 arrow-array = { workspace = true }
 arrow-cast = { workspace = true }
+arrow-ord = { workspace = true }
 arrow-schema = { workspace = true }
 futures = "0.3"
 parquet = { workspace = true, features = ["async", "zstd"] }
diff --git a/crates/paimon/src/arrow/filtering.rs 
b/crates/paimon/src/arrow/filtering.rs
new file mode 100644
index 0000000..da2da14
--- /dev/null
+++ b/crates/paimon/src/arrow/filtering.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema_evolution::create_index_mapping;
+pub(crate) use crate::predicate_stats::{predicates_may_match_with_schema, 
StatsAccessor};
+use crate::spec::{DataField, Predicate, PredicateOperator};
+
+pub(crate) fn reader_pruning_predicates(data_predicates: Vec<Predicate>) -> 
Vec<Predicate> {
+    data_predicates
+        .into_iter()
+        .filter(predicate_supported_for_reader_pruning)
+        .collect()
+}
+
+pub(crate) fn build_field_mapping(
+    table_fields: &[DataField],
+    file_fields: &[DataField],
+) -> Vec<Option<usize>> {
+    normalize_field_mapping(
+        create_index_mapping(table_fields, file_fields),
+        table_fields.len(),
+    )
+}
+
+fn predicate_supported_for_reader_pruning(predicate: &Predicate) -> bool {
+    match predicate {
+        Predicate::AlwaysFalse => true,
+        Predicate::Leaf { op, .. } => {
+            matches!(
+                op,
+                PredicateOperator::IsNull
+                    | PredicateOperator::IsNotNull
+                    | PredicateOperator::Eq
+                    | PredicateOperator::NotEq
+                    | PredicateOperator::Lt
+                    | PredicateOperator::LtEq
+                    | PredicateOperator::Gt
+                    | PredicateOperator::GtEq
+                    | PredicateOperator::In
+                    | PredicateOperator::NotIn
+            )
+        }
+        Predicate::AlwaysTrue | Predicate::And(_) | Predicate::Or(_) | 
Predicate::Not(_) => false,
+    }
+}
+
+fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
+    (0..num_fields).map(Some).collect()
+}
+
+fn normalize_field_mapping(mapping: Option<Vec<i32>>, num_fields: usize) -> 
Vec<Option<usize>> {
+    mapping
+        .map(|field_mapping| {
+            field_mapping
+                .into_iter()
+                .map(|index| usize::try_from(index).ok())
+                .collect()
+        })
+        .unwrap_or_else(|| identity_field_mapping(num_fields))
+}
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index e823c90..f524d59 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub(crate) mod filtering;
 mod reader;
 pub(crate) mod schema_evolution;
 
diff --git a/crates/paimon/src/arrow/reader.rs 
b/crates/paimon/src/arrow/reader.rs
index a676e61..91a67c6 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -16,25 +16,39 @@
 // under the License.
 
 use crate::arrow::build_target_arrow_schema;
+use crate::arrow::filtering::{
+    build_field_mapping, predicates_may_match_with_schema, StatsAccessor,
+};
 use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX};
 use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
 use crate::io::{FileIO, FileRead, FileStatus};
-use crate::spec::{DataField, DataFileMeta};
+use crate::spec::{DataField, DataFileMeta, DataType, Datum, Predicate, 
PredicateOperator};
 use crate::table::schema_manager::SchemaManager;
 use crate::table::ArrowRecordBatchStream;
 use crate::{DataSplit, Error};
-use arrow_array::RecordBatch;
+use arrow_array::{
+    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, 
Float32Array,
+    Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, 
Scalar, StringArray,
+};
 use arrow_cast::cast;
+use arrow_ord::cmp::{
+    eq as arrow_eq, gt as arrow_gt, gt_eq as arrow_gt_eq, lt as arrow_lt, 
lt_eq as arrow_lt_eq,
+    neq as arrow_neq,
+};
+use arrow_schema::ArrowError;
 
 use async_stream::try_stream;
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::{StreamExt, TryFutureExt};
-use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, 
RowSelector};
+use parquet::arrow::arrow_reader::{
+    ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, 
RowSelection, RowSelector,
+};
 use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
 use parquet::file::metadata::ParquetMetaDataReader;
 use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
 use std::collections::HashMap;
 use std::ops::Range;
 use std::sync::Arc;
@@ -46,6 +60,8 @@ pub struct ArrowReaderBuilder {
     file_io: FileIO,
     schema_manager: SchemaManager,
     table_schema_id: i64,
+    predicates: Vec<Predicate>,
+    table_fields: Vec<DataField>,
 }
 
 impl ArrowReaderBuilder {
@@ -60,9 +76,24 @@ impl ArrowReaderBuilder {
             file_io,
             schema_manager,
             table_schema_id,
+            predicates: Vec::new(),
+            table_fields: Vec::new(),
         }
     }
 
+    /// Set data predicates used for Parquet row-group pruning and partial
+    /// decode-time filtering.
+    pub(crate) fn with_predicates(mut self, predicates: Vec<Predicate>) -> 
Self {
+        self.predicates = predicates;
+        self
+    }
+
+    /// Set the full table schema fields used for filter-to-file field mapping.
+    pub(crate) fn with_table_fields(mut self, table_fields: Vec<DataField>) -> 
Self {
+        self.table_fields = table_fields;
+        self
+    }
+
     /// Build the ArrowReader with the given read type (logical row type or 
projected subset).
     /// Used to clip Parquet schema to requested columns only.
     pub fn build(self, read_type: Vec<DataField>) -> ArrowReader {
@@ -71,6 +102,8 @@ impl ArrowReaderBuilder {
             file_io: self.file_io,
             schema_manager: self.schema_manager,
             table_schema_id: self.table_schema_id,
+            predicates: self.predicates,
+            table_fields: self.table_fields,
             read_type,
         }
     }
@@ -83,6 +116,8 @@ pub struct ArrowReader {
     file_io: FileIO,
     schema_manager: SchemaManager,
     table_schema_id: i64,
+    predicates: Vec<Predicate>,
+    table_fields: Vec<DataField>,
     read_type: Vec<DataField>,
 }
 
@@ -100,6 +135,8 @@ impl ArrowReader {
         let batch_size = self.batch_size;
         let splits: Vec<DataSplit> = data_splits.to_vec();
         let read_type = self.read_type;
+        let predicates = self.predicates;
+        let table_fields = self.table_fields;
         let schema_manager = self.schema_manager;
         let table_schema_id = self.table_schema_id;
         Ok(try_stream! {
@@ -137,12 +174,16 @@ impl ArrowReader {
 
                     let mut stream = read_single_file_stream(
                         file_io.clone(),
-                        split.clone(),
-                        file_meta,
-                        read_type.clone(),
-                        data_fields,
-                        batch_size,
-                        dv,
+                        SingleFileReadRequest {
+                            split: split.clone(),
+                            file_meta,
+                            read_type: read_type.clone(),
+                            table_fields: table_fields.clone(),
+                            data_fields,
+                            predicates: predicates.clone(),
+                            batch_size,
+                            dv,
+                        },
                     )?;
                     while let Some(batch) = stream.next().await {
                         yield batch?;
@@ -186,8 +227,17 @@ impl ArrowReader {
                         };
 
                         let mut stream = read_single_file_stream(
-                            file_io.clone(), split.clone(), file_meta, 
read_type.clone(),
-                            data_fields, batch_size, None,
+                            file_io.clone(),
+                            SingleFileReadRequest {
+                                split: split.clone(),
+                                file_meta,
+                                read_type: read_type.clone(),
+                                table_fields: table_fields.clone(),
+                                data_fields,
+                                predicates: Vec::new(),
+                                batch_size,
+                                dv: None,
+                            },
                         )?;
                         while let Some(batch) = stream.next().await {
                             yield batch?;
@@ -214,6 +264,17 @@ impl ArrowReader {
     }
 }
 
+struct SingleFileReadRequest {
+    split: DataSplit,
+    file_meta: DataFileMeta,
+    read_type: Vec<DataField>,
+    table_fields: Vec<DataField>,
+    data_fields: Option<Vec<DataField>>,
+    predicates: Vec<Predicate>,
+    batch_size: Option<usize>,
+    dv: Option<Arc<DeletionVector>>,
+}
+
 /// Read a single parquet file from a split, returning a lazy stream of 
batches.
 /// Optionally applies a deletion vector.
 ///
@@ -226,14 +287,21 @@ impl ArrowReader {
 /// Reference: 
[RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java)
 fn read_single_file_stream(
     file_io: FileIO,
-    split: DataSplit,
-    file_meta: DataFileMeta,
-    read_type: Vec<DataField>,
-    data_fields: Option<Vec<DataField>>,
-    batch_size: Option<usize>,
-    dv: Option<Arc<DeletionVector>>,
+    request: SingleFileReadRequest,
 ) -> crate::Result<ArrowRecordBatchStream> {
+    let SingleFileReadRequest {
+        split,
+        file_meta,
+        read_type,
+        table_fields,
+        data_fields,
+        predicates,
+        batch_size,
+        dv,
+    } = request;
+
     let target_schema = build_target_arrow_schema(&read_type)?;
+    let file_fields = data_fields.clone().unwrap_or_else(|| 
table_fields.clone());
 
     // Compute index mapping and determine which columns to read from the 
parquet file.
     // If data_fields is provided, use field-ID-based mapping; otherwise use 
read_type names directly.
@@ -301,14 +369,33 @@ fn read_single_file_stream(
 
         let mask = ProjectionMask::roots(&parquet_schema, root_indices);
         batch_stream_builder = batch_stream_builder.with_projection(mask);
+        let row_filter =
+            build_parquet_row_filter(&parquet_schema, &predicates, 
&table_fields, &file_fields)?;
+        if let Some(row_filter) = row_filter {
+            batch_stream_builder = 
batch_stream_builder.with_row_filter(row_filter);
+        }
+
+        let predicate_row_selection = build_predicate_row_selection(
+            batch_stream_builder.metadata().row_groups(),
+            &predicates,
+            &table_fields,
+            &file_fields,
+        )?;
+        let mut row_selection = predicate_row_selection;
 
         if let Some(ref dv) = dv {
             if !dv.is_empty() {
-                let row_selection =
+                let delete_row_selection =
                     
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
-                batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+                row_selection = intersect_optional_row_selections(
+                    row_selection,
+                    Some(delete_row_selection),
+                );
             }
         }
+        if let Some(row_selection) = row_selection {
+            batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+        }
         if let Some(size) = batch_size {
             batch_stream_builder = batch_stream_builder.with_batch_size(size);
         }
@@ -551,12 +638,16 @@ fn merge_files_by_columns(
 
             let stream = read_single_file_stream(
                 file_io.clone(),
-                split.clone(),
-                data_files[file_idx].clone(),
-                file_read_type,
-                data_fields.clone(),
-                batch_size,
-                None,
+                SingleFileReadRequest {
+                    split: split.clone(),
+                    file_meta: data_files[file_idx].clone(),
+                    read_type: file_read_type,
+                    table_fields: table_fields.clone(),
+                    data_fields: data_fields.clone(),
+                    predicates: Vec::new(),
+                    batch_size,
+                    dv: None,
+                },
             )?;
             file_streams.insert(file_idx, stream);
         }
@@ -647,6 +738,623 @@ fn merge_files_by_columns(
     .boxed())
 }
 
+fn intersect_optional_row_selections(
+    left: Option<RowSelection>,
+    right: Option<RowSelection>,
+) -> Option<RowSelection> {
+    match (left, right) {
+        (Some(left), Some(right)) => Some(left.intersection(&right)),
+        (Some(selection), None) | (None, Some(selection)) => Some(selection),
+        (None, None) => None,
+    }
+}
+
+fn sanitize_filter_mask(mask: BooleanArray) -> BooleanArray {
+    if mask.null_count() == 0 {
+        return mask;
+    }
+
+    boolean_mask_from_predicate(mask.len(), |row_index| {
+        mask.is_valid(row_index) && mask.value(row_index)
+    })
+}
+
+fn evaluate_exact_leaf_predicate(
+    array: &ArrayRef,
+    data_type: &DataType,
+    op: PredicateOperator,
+    literals: &[Datum],
+) -> Result<BooleanArray, ArrowError> {
+    match op {
+        PredicateOperator::IsNull => 
Ok(boolean_mask_from_predicate(array.len(), |row_index| {
+            array.is_null(row_index)
+        })),
+        PredicateOperator::IsNotNull => 
Ok(boolean_mask_from_predicate(array.len(), |row_index| {
+            array.is_valid(row_index)
+        })),
+        PredicateOperator::In | PredicateOperator::NotIn => {
+            evaluate_set_membership_predicate(array, data_type, op, literals)
+        }
+        PredicateOperator::Eq
+        | PredicateOperator::NotEq
+        | PredicateOperator::Lt
+        | PredicateOperator::LtEq
+        | PredicateOperator::Gt
+        | PredicateOperator::GtEq => {
+            let Some(literal) = literals.first() else {
+                return Ok(BooleanArray::from(vec![true; array.len()]));
+            };
+            let Some(scalar) = literal_scalar_for_parquet_filter(literal, 
data_type)
+                .map_err(|e| ArrowError::ComputeError(e.to_string()))?
+            else {
+                return Ok(BooleanArray::from(vec![true; array.len()]));
+            };
+            let result = evaluate_column_predicate(array, &scalar, op)?;
+            Ok(sanitize_filter_mask(result))
+        }
+    }
+}
+
+fn evaluate_set_membership_predicate(
+    array: &ArrayRef,
+    data_type: &DataType,
+    op: PredicateOperator,
+    literals: &[Datum],
+) -> Result<BooleanArray, ArrowError> {
+    if literals.is_empty() {
+        return Ok(match op {
+            PredicateOperator::In => BooleanArray::from(vec![false; 
array.len()]),
+            PredicateOperator::NotIn => {
+                boolean_mask_from_predicate(array.len(), |row_index| 
array.is_valid(row_index))
+            }
+            PredicateOperator::IsNull
+            | PredicateOperator::IsNotNull
+            | PredicateOperator::Eq
+            | PredicateOperator::NotEq
+            | PredicateOperator::Lt
+            | PredicateOperator::LtEq
+            | PredicateOperator::Gt
+            | PredicateOperator::GtEq => unreachable!(),
+        });
+    }
+
+    let mut combined = match op {
+        PredicateOperator::In => BooleanArray::from(vec![false; array.len()]),
+        PredicateOperator::NotIn => {
+            boolean_mask_from_predicate(array.len(), |row_index| 
array.is_valid(row_index))
+        }
+        PredicateOperator::IsNull
+        | PredicateOperator::IsNotNull
+        | PredicateOperator::Eq
+        | PredicateOperator::NotEq
+        | PredicateOperator::Lt
+        | PredicateOperator::LtEq
+        | PredicateOperator::Gt
+        | PredicateOperator::GtEq => unreachable!(),
+    };
+
+    for literal in literals {
+        let Some(scalar) = literal_scalar_for_parquet_filter(literal, 
data_type)
+            .map_err(|e| ArrowError::ComputeError(e.to_string()))?
+        else {
+            return Ok(BooleanArray::from(vec![true; array.len()]));
+        };
+        let comparison_op = match op {
+            PredicateOperator::In => PredicateOperator::Eq,
+            PredicateOperator::NotIn => PredicateOperator::NotEq,
+            PredicateOperator::IsNull
+            | PredicateOperator::IsNotNull
+            | PredicateOperator::Eq
+            | PredicateOperator::NotEq
+            | PredicateOperator::Lt
+            | PredicateOperator::LtEq
+            | PredicateOperator::Gt
+            | PredicateOperator::GtEq => unreachable!(),
+        };
+        let mask = sanitize_filter_mask(evaluate_column_predicate(array, 
&scalar, comparison_op)?);
+        combined = combine_filter_masks(&combined, &mask, matches!(op, 
PredicateOperator::In));
+    }
+
+    Ok(combined)
+}
+
+fn combine_filter_masks(left: &BooleanArray, right: &BooleanArray, use_or: 
bool) -> BooleanArray {
+    debug_assert_eq!(left.len(), right.len());
+    boolean_mask_from_predicate(left.len(), |row_index| {
+        if use_or {
+            left.value(row_index) || right.value(row_index)
+        } else {
+            left.value(row_index) && right.value(row_index)
+        }
+    })
+}
+
+fn boolean_mask_from_predicate(
+    len: usize,
+    mut predicate: impl FnMut(usize) -> bool,
+) -> BooleanArray {
+    BooleanArray::from((0..len).map(&mut predicate).collect::<Vec<_>>())
+}
+
+struct ParquetRowGroupStats<'a> {
+    row_group: &'a RowGroupMetaData,
+    column_indices: &'a [Option<usize>],
+}
+
+impl StatsAccessor for ParquetRowGroupStats<'_> {
+    fn row_count(&self) -> i64 {
+        self.row_group.num_rows()
+    }
+
+    fn null_count(&self, index: usize) -> Option<i64> {
+        let _ = index;
+        // parquet::Statistics::null_count_opt() may return Some(0) even when
+        // the null-count statistic is absent, so treating it as authoritative
+        // would make IS NULL / IS NOT NULL pruning unsafe. Fail open here.
+        None
+    }
+
+    fn min_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+        let column_index = self.column_indices.get(index).copied().flatten()?;
+        parquet_stats_to_datum(
+            self.row_group.column(column_index).statistics()?,
+            data_type,
+            true,
+        )
+    }
+
+    fn max_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+        let column_index = self.column_indices.get(index).copied().flatten()?;
+        parquet_stats_to_datum(
+            self.row_group.column(column_index).statistics()?,
+            data_type,
+            false,
+        )
+    }
+}
+
+fn build_predicate_row_selection(
+    row_groups: &[RowGroupMetaData],
+    predicates: &[Predicate],
+    table_fields: &[DataField],
+    file_fields: &[DataField],
+) -> crate::Result<Option<RowSelection>> {
+    if predicates.is_empty() || row_groups.is_empty() {
+        return Ok(None);
+    }
+
+    let field_mapping = build_field_mapping(table_fields, file_fields);
+    let column_indices = 
build_row_group_column_indices(row_groups[0].columns(), file_fields);
+    let mut selectors = Vec::with_capacity(row_groups.len());
+    let mut all_selected = true;
+
+    for row_group in row_groups {
+        let stats = ParquetRowGroupStats {
+            row_group,
+            column_indices: &column_indices,
+        };
+        let may_match =
+            predicates_may_match_with_schema(predicates, &stats, 
&field_mapping, file_fields);
+        if !may_match {
+            all_selected = false;
+        }
+        selectors.push(if may_match {
+            RowSelector::select(row_group.num_rows() as usize)
+        } else {
+            RowSelector::skip(row_group.num_rows() as usize)
+        });
+    }
+
+    if all_selected {
+        Ok(None)
+    } else {
+        Ok(Some(selectors.into()))
+    }
+}
+
+fn build_parquet_row_filter(
+    parquet_schema: &parquet::schema::types::SchemaDescriptor,
+    predicates: &[Predicate],
+    table_fields: &[DataField],
+    file_fields: &[DataField],
+) -> crate::Result<Option<RowFilter>> {
+    // Keep decode-time filtering intentionally narrow for the submit-ready
+    // Parquet path. Unsupported predicate shapes or types remain residual
+    // filters above the reader.
+    if predicates.is_empty() {
+        return Ok(None);
+    }
+
+    let field_mapping = build_field_mapping(table_fields, file_fields);
+    let mut filters: Vec<Box<dyn ArrowPredicate>> = Vec::new();
+
+    for predicate in predicates {
+        if let Some(filter) =
+            build_parquet_arrow_predicate(parquet_schema, predicate, 
&field_mapping, file_fields)?
+        {
+            filters.push(filter);
+        }
+    }
+
+    if filters.is_empty() {
+        Ok(None)
+    } else {
+        Ok(Some(RowFilter::new(filters)))
+    }
+}
+
+fn build_parquet_arrow_predicate(
+    parquet_schema: &parquet::schema::types::SchemaDescriptor,
+    predicate: &Predicate,
+    field_mapping: &[Option<usize>],
+    file_fields: &[DataField],
+) -> crate::Result<Option<Box<dyn ArrowPredicate>>> {
+    let Predicate::Leaf {
+        index,
+        data_type: _,
+        op,
+        literals,
+        ..
+    } = predicate
+    else {
+        return Ok(None);
+    };
+    if !predicate_supported_for_parquet_row_filter(*op) {
+        return Ok(None);
+    }
+
+    let Some(file_index) = field_mapping.get(*index).copied().flatten() else {
+        return Ok(None);
+    };
+    let Some(file_field) = file_fields.get(file_index) else {
+        return Ok(None);
+    };
+    let Some(root_index) = parquet_root_index(parquet_schema, 
file_field.name()) else {
+        return Ok(None);
+    };
+    if !parquet_row_filter_literals_supported(*op, literals, 
file_field.data_type())? {
+        return Ok(None);
+    }
+
+    let projection = ProjectionMask::roots(parquet_schema, [root_index]);
+    let op = *op;
+    let data_type = file_field.data_type().clone();
+    let literals = literals.to_vec();
+    Ok(Some(Box::new(ArrowPredicateFn::new(
+        projection,
+        move |batch: RecordBatch| {
+            let Some(column) = batch.columns().first() else {
+                return Ok(BooleanArray::new_null(batch.num_rows()));
+            };
+            evaluate_exact_leaf_predicate(column, &data_type, op, &literals)
+        },
+    ))))
+}
+
+fn predicate_supported_for_parquet_row_filter(op: PredicateOperator) -> bool {
+    matches!(
+        op,
+        PredicateOperator::IsNull
+            | PredicateOperator::IsNotNull
+            | PredicateOperator::Eq
+            | PredicateOperator::NotEq
+            | PredicateOperator::Lt
+            | PredicateOperator::LtEq
+            | PredicateOperator::Gt
+            | PredicateOperator::GtEq
+            | PredicateOperator::In
+            | PredicateOperator::NotIn
+    )
+}
+
+fn parquet_row_filter_literals_supported(
+    op: PredicateOperator,
+    literals: &[Datum],
+    file_data_type: &DataType,
+) -> crate::Result<bool> {
+    match op {
+        PredicateOperator::IsNull | PredicateOperator::IsNotNull => Ok(true),
+        PredicateOperator::Eq
+        | PredicateOperator::NotEq
+        | PredicateOperator::Lt
+        | PredicateOperator::LtEq
+        | PredicateOperator::Gt
+        | PredicateOperator::GtEq => {
+            let Some(literal) = literals.first() else {
+                return Ok(false);
+            };
+            Ok(literal_scalar_for_parquet_filter(literal, 
file_data_type)?.is_some())
+        }
+        PredicateOperator::In | PredicateOperator::NotIn => {
+            for literal in literals {
+                if literal_scalar_for_parquet_filter(literal, 
file_data_type)?.is_none() {
+                    return Ok(false);
+                }
+            }
+            Ok(true)
+        }
+    }
+}
+
+fn parquet_root_index(
+    parquet_schema: &parquet::schema::types::SchemaDescriptor,
+    root_name: &str,
+) -> Option<usize> {
+    parquet_schema
+        .root_schema()
+        .get_fields()
+        .iter()
+        .position(|field| field.name() == root_name)
+}
+
+fn evaluate_column_predicate(
+    column: &ArrayRef,
+    scalar: &Scalar<ArrayRef>,
+    op: PredicateOperator,
+) -> Result<BooleanArray, ArrowError> {
+    match op {
+        PredicateOperator::Eq => arrow_eq(column, scalar),
+        PredicateOperator::NotEq => arrow_neq(column, scalar),
+        PredicateOperator::Lt => arrow_lt(column, scalar),
+        PredicateOperator::LtEq => arrow_lt_eq(column, scalar),
+        PredicateOperator::Gt => arrow_gt(column, scalar),
+        PredicateOperator::GtEq => arrow_gt_eq(column, scalar),
+        PredicateOperator::IsNull
+        | PredicateOperator::IsNotNull
+        | PredicateOperator::In
+        | PredicateOperator::NotIn => Ok(BooleanArray::new_null(column.len())),
+    }
+}
+
+fn literal_scalar_for_parquet_filter(
+    literal: &Datum,
+    file_data_type: &DataType,
+) -> crate::Result<Option<Scalar<ArrayRef>>> {
+    let array: ArrayRef = match file_data_type {
+        DataType::Boolean(_) => match literal {
+            Datum::Bool(value) => 
Arc::new(BooleanArray::new_scalar(*value).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::TinyInt(_) => {
+            match integer_literal(literal).and_then(|value| 
i8::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int8Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::SmallInt(_) => {
+            match integer_literal(literal).and_then(|value| 
i16::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int16Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::Int(_) => {
+            match integer_literal(literal).and_then(|value| 
i32::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int32Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::BigInt(_) => {
+            match integer_literal(literal).and_then(|value| 
i64::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int64Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::Float(_) => match float32_literal(literal) {
+            Some(value) => 
Arc::new(Float32Array::new_scalar(value).into_inner()),
+            None => return Ok(None),
+        },
+        DataType::Double(_) => match float64_literal(literal) {
+            Some(value) => 
Arc::new(Float64Array::new_scalar(value).into_inner()),
+            None => return Ok(None),
+        },
+        DataType::Char(_) | DataType::VarChar(_) => match literal {
+            Datum::String(value) => 
Arc::new(StringArray::new_scalar(value.as_str()).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Binary(_) | DataType::VarBinary(_) => match literal {
+            Datum::Bytes(value) => 
Arc::new(BinaryArray::new_scalar(value.as_slice()).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Date(_) => match literal {
+            Datum::Date(value) => 
Arc::new(Date32Array::new_scalar(*value).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Decimal(decimal) => match literal {
+            Datum::Decimal {
+                unscaled,
+                precision,
+                scale,
+            } if *precision <= decimal.precision() && *scale == 
decimal.scale() => {
+                let precision =
+                    u8::try_from(decimal.precision()).map_err(|_| 
Error::Unsupported {
+                        message: "Decimal precision exceeds Arrow decimal128 
range".to_string(),
+                    })?;
+                let scale =
+                    i8::try_from(decimal.scale() as i32).map_err(|_| 
Error::Unsupported {
+                        message: "Decimal scale exceeds Arrow decimal128 
range".to_string(),
+                    })?;
+                Arc::new(
+                    Decimal128Array::new_scalar(*unscaled)
+                        .into_inner()
+                        .with_precision_and_scale(precision, scale)
+                        .map_err(|e| Error::UnexpectedError {
+                            message: format!(
+                                "Failed to build decimal scalar for parquet 
row filter: {e}"
+                            ),
+                            source: Some(Box::new(e)),
+                        })?,
+                )
+            }
+            _ => return Ok(None),
+        },
+        DataType::Time(_)
+        | DataType::Timestamp(_)
+        | DataType::LocalZonedTimestamp(_)
+        | DataType::Array(_)
+        | DataType::Map(_)
+        | DataType::Multiset(_)
+        | DataType::Row(_) => return Ok(None),
+    };
+
+    Ok(Some(Scalar::new(array)))
+}
+
+fn integer_literal(literal: &Datum) -> Option<i128> {
+    match literal {
+        Datum::TinyInt(value) => Some(i128::from(*value)),
+        Datum::SmallInt(value) => Some(i128::from(*value)),
+        Datum::Int(value) => Some(i128::from(*value)),
+        Datum::Long(value) => Some(i128::from(*value)),
+        _ => None,
+    }
+}
+
+fn float32_literal(literal: &Datum) -> Option<f32> {
+    match literal {
+        Datum::Float(value) => Some(*value),
+        Datum::Double(value) => {
+            let casted = *value as f32;
+            ((casted as f64) == *value).then_some(casted)
+        }
+        _ => None,
+    }
+}
+
+fn float64_literal(literal: &Datum) -> Option<f64> {
+    match literal {
+        Datum::Float(value) => Some(f64::from(*value)),
+        Datum::Double(value) => Some(*value),
+        _ => None,
+    }
+}
+
+fn build_row_group_column_indices(
+    columns: &[parquet::file::metadata::ColumnChunkMetaData],
+    file_fields: &[DataField],
+) -> Vec<Option<usize>> {
+    let mut by_root_name: HashMap<&str, Option<usize>> = HashMap::new();
+    for (column_index, column) in columns.iter().enumerate() {
+        let Some(root_name) = column.column_path().parts().first() else {
+            continue;
+        };
+        let entry = by_root_name
+            .entry(root_name.as_str())
+            .or_insert(Some(column_index));
+        if entry.is_some() && *entry != Some(column_index) {
+            *entry = None;
+        }
+    }
+
+    file_fields
+        .iter()
+        .map(|field| by_root_name.get(field.name()).copied().flatten())
+        .collect()
+}
+
+fn parquet_stats_to_datum(
+    stats: &ParquetStatistics,
+    data_type: &DataType,
+    is_min: bool,
+) -> Option<Datum> {
+    let exact = if is_min {
+        stats.min_is_exact()
+    } else {
+        stats.max_is_exact()
+    };
+    if !exact {
+        return None;
+    }
+
+    match (stats, data_type) {
+        (ParquetStatistics::Boolean(stats), DataType::Boolean(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Bool)
+        }
+        (ParquetStatistics::Int32(stats), DataType::TinyInt(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .and_then(|value| i8::try_from(*value).ok())
+                .map(Datum::TinyInt)
+        }
+        (ParquetStatistics::Int32(stats), DataType::SmallInt(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .and_then(|value| i16::try_from(*value).ok())
+                .map(Datum::SmallInt)
+        }
+        (ParquetStatistics::Int32(stats), DataType::Int(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Int)
+        }
+        (ParquetStatistics::Int32(stats), DataType::Date(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Date)
+        }
+        (ParquetStatistics::Int32(stats), DataType::Time(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Time)
+        }
+        (ParquetStatistics::Int64(stats), DataType::BigInt(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Long)
+        }
+        (ParquetStatistics::Int64(stats), DataType::Timestamp(ts)) if 
ts.precision() <= 3 => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(|millis| Datum::Timestamp { millis, nanos: 0 })
+        }
+        (ParquetStatistics::Int64(stats), DataType::LocalZonedTimestamp(ts))
+            if ts.precision() <= 3 =>
+        {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(|millis| Datum::LocalZonedTimestamp { millis, nanos: 0 })
+        }
+        (ParquetStatistics::Float(stats), DataType::Float(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Float)
+        }
+        (ParquetStatistics::Double(stats), DataType::Double(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .copied()
+                .map(Datum::Double)
+        }
+        (ParquetStatistics::ByteArray(stats), DataType::Char(_))
+        | (ParquetStatistics::ByteArray(stats), DataType::VarChar(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .and_then(|value| std::str::from_utf8(value.data()).ok())
+                .map(|value| Datum::String(value.to_string()))
+        }
+        (ParquetStatistics::ByteArray(stats), DataType::Binary(_))
+        | (ParquetStatistics::ByteArray(stats), DataType::VarBinary(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .map(|value| Datum::Bytes(value.data().to_vec()))
+        }
+        (ParquetStatistics::FixedLenByteArray(stats), DataType::Binary(_))
+        | (ParquetStatistics::FixedLenByteArray(stats), 
DataType::VarBinary(_)) => {
+            exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+                .map(|value| Datum::Bytes(value.data().to_vec()))
+        }
+        _ => None,
+    }
+}
+
+fn exact_parquet_value<'a, T>(
+    is_min: bool,
+    min: Option<&'a T>,
+    max: Option<&'a T>,
+) -> Option<&'a T> {
+    if is_min {
+        min
+    } else {
+        max
+    }
+}
+
 /// Builds a Parquet [RowSelection] from deletion vector.
 /// Only rows not in the deletion vector are selected; deleted rows are 
skipped at read time.
 /// todo: Uses [DeletionVectorIterator] with 
[advance_to](DeletionVectorIterator::advance_to) when skipping row groups 
similar to iceberg-rust
@@ -772,3 +1480,55 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
         })
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::build_parquet_row_filter;
+    use crate::spec::{DataField, DataType, Datum, IntType, PredicateBuilder};
+    use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
+    use std::sync::Arc;
+
+    fn test_fields() -> Vec<DataField> {
+        vec![
+            DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+            DataField::new(1, "score".to_string(), 
DataType::Int(IntType::new())),
+        ]
+    }
+
+    fn test_parquet_schema() -> SchemaDescriptor {
+        SchemaDescriptor::new(Arc::new(
+            parse_message_type(
+                "
+                message test_schema {
+                  OPTIONAL INT32 id;
+                  OPTIONAL INT32 score;
+                }
+                ",
+            )
+            .expect("test schema should parse"),
+        ))
+    }
+
+    #[test]
+    fn test_build_parquet_row_filter_supports_null_and_membership_predicates() 
{
+        let fields = test_fields();
+        let builder = PredicateBuilder::new(&fields);
+        let predicates = vec![
+            builder
+                .is_null("id")
+                .expect("is null predicate should build"),
+            builder
+                .is_in("score", vec![Datum::Int(7)])
+                .expect("in predicate should build"),
+            builder
+                .is_not_in("score", vec![Datum::Int(9)])
+                .expect("not in predicate should build"),
+        ];
+
+        let row_filter =
+            build_parquet_row_filter(&test_parquet_schema(), &predicates, 
&fields, &fields)
+                .expect("parquet row filter should build");
+
+        assert!(row_filter.is_some());
+    }
+}
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index aba327f..4b0ffab 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -54,15 +54,22 @@ impl FileIO {
     ///
     /// Otherwise will return parsing error.
     pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
-        let url = Url::parse(path.as_ref())
-            .map_err(|_| Error::ConfigInvalid {
-                message: format!("Invalid URL: {}", path.as_ref()),
-            })
-            .or_else(|_| {
-                Url::from_file_path(path.as_ref()).map_err(|_| 
Error::ConfigInvalid {
-                    message: format!("Input {} is neither a valid url nor 
path", path.as_ref()),
+        let path = path.as_ref();
+        let url = if looks_like_windows_drive_path(path) {
+            Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
+                message: format!("Input {path} is neither a valid url nor 
path"),
+            })?
+        } else {
+            Url::parse(path)
+                .map_err(|_| Error::ConfigInvalid {
+                    message: format!("Invalid URL: {path}"),
                 })
-            })?;
+                .or_else(|_| {
+                    Url::from_file_path(path).map_err(|_| Error::ConfigInvalid 
{
+                        message: format!("Input {path} is neither a valid url 
nor path"),
+                    })
+                })?
+        };
         Ok(FileIOBuilder::new(url.scheme()))
     }
 
@@ -223,6 +230,14 @@ impl FileIO {
     }
 }
 
+fn looks_like_windows_drive_path(path: &str) -> bool {
+    let bytes = path.as_bytes();
+    bytes.len() >= 3
+        && bytes[0].is_ascii_alphabetic()
+        && bytes[1] == b':'
+        && matches!(bytes[2], b'\\' | b'/')
+}
+
 #[derive(Debug)]
 pub struct FileIOBuilder {
     scheme_str: Option<String>,
@@ -385,6 +400,7 @@ impl OutputFile {
 mod file_action_test {
     use std::collections::BTreeSet;
     use std::fs;
+    use tempfile::tempdir;
 
     use super::*;
     use bytes::Bytes;
@@ -397,6 +413,15 @@ mod file_action_test {
         FileIOBuilder::new("file").build().unwrap()
     }
 
+    fn local_file_path(path: &std::path::Path) -> String {
+        let normalized = path.to_string_lossy().replace('\\', "/");
+        if normalized.starts_with('/') {
+            format!("file:{normalized}")
+        } else {
+            format!("file:/{normalized}")
+        }
+    }
+
     async fn common_test_get_status(file_io: &FileIO, path: &str) {
         let output = file_io.new_output(path).unwrap();
         let mut writer = output.writer().await.unwrap();
@@ -593,6 +618,27 @@ mod file_action_test {
         let file_io = setup_fs_file_io();
         common_test_list_status_paths(&file_io, 
"file:/tmp/test_list_status_paths_fs/").await;
     }
+
+    #[test]
+    fn test_from_path_detects_local_fs_path() {
+        let dir = tempdir().unwrap();
+        let file_io = FileIO::from_path(dir.path().to_string_lossy())
+            .unwrap()
+            .build()
+            .unwrap();
+        let path = 
local_file_path(&dir.path().join("from_path_detects_local_fs_path.txt"));
+
+        let rt = tokio::runtime::Runtime::new().unwrap();
+        rt.block_on(async {
+            file_io
+                .new_output(&path)
+                .unwrap()
+                .write(Bytes::from("data"))
+                .await
+                .unwrap();
+            assert!(file_io.exists(&path).await.unwrap());
+        });
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 082f8f9..fe340f3 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -30,6 +30,7 @@ pub mod catalog;
 mod deletion_vector;
 pub mod file_index;
 pub mod io;
+mod predicate_stats;
 pub mod spec;
 pub mod table;
 
diff --git a/crates/paimon/src/predicate_stats.rs 
b/crates/paimon/src/predicate_stats.rs
new file mode 100644
index 0000000..cb44c72
--- /dev/null
+++ b/crates/paimon/src/predicate_stats.rs
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator};
+use std::cmp::Ordering;
+
+pub(crate) trait StatsAccessor {
+    fn row_count(&self) -> i64;
+    fn null_count(&self, index: usize) -> Option<i64>;
+    fn min_value(&self, index: usize, data_type: &DataType) -> Option<Datum>;
+    fn max_value(&self, index: usize, data_type: &DataType) -> Option<Datum>;
+}
+
+pub(crate) fn predicates_may_match_with_schema<T: StatsAccessor>(
+    predicates: &[Predicate],
+    stats: &T,
+    field_mapping: &[Option<usize>],
+    file_fields: &[DataField],
+) -> bool {
+    predicates.iter().all(|predicate| {
+        predicate_may_match_with_schema(predicate, stats, field_mapping, 
file_fields)
+    })
+}
+
+pub(crate) fn data_leaf_may_match<T: StatsAccessor>(
+    index: usize,
+    stats_data_type: &DataType,
+    predicate_data_type: &DataType,
+    op: PredicateOperator,
+    literals: &[Datum],
+    stats: &T,
+) -> bool {
+    let row_count = stats.row_count();
+    if row_count <= 0 {
+        return false;
+    }
+
+    let null_count = stats.null_count(index);
+    let all_null = null_count.map(|count| count == row_count);
+
+    match op {
+        PredicateOperator::IsNull => {
+            return null_count.is_none_or(|count| count > 0);
+        }
+        PredicateOperator::IsNotNull => {
+            return all_null != Some(true);
+        }
+        PredicateOperator::In | PredicateOperator::NotIn => {
+            return true;
+        }
+        PredicateOperator::Eq
+        | PredicateOperator::NotEq
+        | PredicateOperator::Lt
+        | PredicateOperator::LtEq
+        | PredicateOperator::Gt
+        | PredicateOperator::GtEq => {}
+    }
+
+    if all_null == Some(true) {
+        return false;
+    }
+
+    let literal = match literals.first() {
+        Some(literal) => literal,
+        None => return true,
+    };
+
+    let min_value = match stats
+        .min_value(index, stats_data_type)
+        .and_then(|datum| coerce_stats_datum_for_predicate(datum, 
predicate_data_type))
+    {
+        Some(value) => value,
+        None => return true,
+    };
+    let max_value = match stats
+        .max_value(index, stats_data_type)
+        .and_then(|datum| coerce_stats_datum_for_predicate(datum, 
predicate_data_type))
+    {
+        Some(value) => value,
+        None => return true,
+    };
+
+    match op {
+        PredicateOperator::Eq => {
+            !matches!(literal.partial_cmp(&min_value), Some(Ordering::Less))
+                && !matches!(literal.partial_cmp(&max_value), 
Some(Ordering::Greater))
+        }
+        PredicateOperator::NotEq => !(min_value == *literal && max_value == 
*literal),
+        PredicateOperator::Lt => !matches!(
+            min_value.partial_cmp(literal),
+            Some(Ordering::Greater | Ordering::Equal)
+        ),
+        PredicateOperator::LtEq => {
+            !matches!(min_value.partial_cmp(literal), Some(Ordering::Greater))
+        }
+        PredicateOperator::Gt => !matches!(
+            max_value.partial_cmp(literal),
+            Some(Ordering::Less | Ordering::Equal)
+        ),
+        PredicateOperator::GtEq => !matches!(max_value.partial_cmp(literal), 
Some(Ordering::Less)),
+        PredicateOperator::IsNull
+        | PredicateOperator::IsNotNull
+        | PredicateOperator::In
+        | PredicateOperator::NotIn => true,
+    }
+}
+
+pub(crate) fn missing_field_may_match(op: PredicateOperator, row_count: i64) 
-> bool {
+    if row_count <= 0 {
+        return false;
+    }
+
+    matches!(op, PredicateOperator::IsNull)
+}
+
+fn predicate_may_match_with_schema<T: StatsAccessor>(
+    predicate: &Predicate,
+    stats: &T,
+    field_mapping: &[Option<usize>],
+    file_fields: &[DataField],
+) -> bool {
+    match predicate {
+        Predicate::AlwaysTrue => true,
+        Predicate::AlwaysFalse => false,
+        Predicate::And(children) => children
+            .iter()
+            .all(|child| predicate_may_match_with_schema(child, stats, 
field_mapping, file_fields)),
+        Predicate::Or(_) | Predicate::Not(_) => true,
+        Predicate::Leaf {
+            index,
+            data_type,
+            op,
+            literals,
+            ..
+        } => match field_mapping.get(*index).copied().flatten() {
+            Some(file_index) => {
+                let Some(file_field) = file_fields.get(file_index) else {
+                    return true;
+                };
+                data_leaf_may_match(
+                    file_index,
+                    file_field.data_type(),
+                    data_type,
+                    *op,
+                    literals,
+                    stats,
+                )
+            }
+            None => missing_field_may_match(*op, stats.row_count()),
+        },
+    }
+}
+
+fn coerce_stats_datum_for_predicate(datum: Datum, predicate_data_type: 
&DataType) -> Option<Datum> {
+    match (datum, predicate_data_type) {
+        (datum @ Datum::Bool(_), DataType::Boolean(_))
+        | (datum @ Datum::TinyInt(_), DataType::TinyInt(_))
+        | (datum @ Datum::SmallInt(_), DataType::SmallInt(_))
+        | (datum @ Datum::Int(_), DataType::Int(_))
+        | (datum @ Datum::Long(_), DataType::BigInt(_))
+        | (datum @ Datum::Float(_), DataType::Float(_))
+        | (datum @ Datum::Double(_), DataType::Double(_))
+        | (datum @ Datum::String(_), DataType::VarChar(_))
+        | (datum @ Datum::String(_), DataType::Char(_))
+        | (datum @ Datum::Bytes(_), DataType::Binary(_))
+        | (datum @ Datum::Bytes(_), DataType::VarBinary(_))
+        | (datum @ Datum::Date(_), DataType::Date(_))
+        | (datum @ Datum::Time(_), DataType::Time(_))
+        | (datum @ Datum::Timestamp { .. }, DataType::Timestamp(_))
+        | (datum @ Datum::LocalZonedTimestamp { .. }, 
DataType::LocalZonedTimestamp(_))
+        | (datum @ Datum::Decimal { .. }, DataType::Decimal(_)) => Some(datum),
+        (Datum::TinyInt(value), DataType::SmallInt(_)) => 
Some(Datum::SmallInt(value as i16)),
+        (Datum::TinyInt(value), DataType::Int(_)) => Some(Datum::Int(value as 
i32)),
+        (Datum::TinyInt(value), DataType::BigInt(_)) => Some(Datum::Long(value 
as i64)),
+        (Datum::SmallInt(value), DataType::Int(_)) => Some(Datum::Int(value as 
i32)),
+        (Datum::SmallInt(value), DataType::BigInt(_)) => 
Some(Datum::Long(value as i64)),
+        (Datum::Int(value), DataType::BigInt(_)) => Some(Datum::Long(value as 
i64)),
+        (Datum::Float(value), DataType::Double(_)) => Some(Datum::Double(value 
as f64)),
+        _ => None,
+    }
+}
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index d55ecc1..4a405ea 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -20,13 +20,81 @@
 //! Reference: [Java 
ReadBuilder.withProjection](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java)
 //! and 
[TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java).
 
+use super::bucket_filter::{extract_predicate_for_keys, 
split_partition_and_data_predicates};
 use super::{ArrowRecordBatchStream, Table, TableScan};
+use crate::arrow::filtering::reader_pruning_predicates;
 use crate::arrow::ArrowReaderBuilder;
 use crate::spec::{CoreOptions, DataField, Predicate};
 use crate::Result;
 use crate::{DataSplit, Error};
 use std::collections::{HashMap, HashSet};
 
+#[derive(Debug, Clone, Default)]
+struct NormalizedFilter {
+    partition_predicate: Option<Predicate>,
+    data_predicates: Vec<Predicate>,
+    bucket_predicate: Option<Predicate>,
+}
+
+fn split_scan_predicates(table: &Table, filter: Predicate) -> 
(Option<Predicate>, Vec<Predicate>) {
+    let partition_keys = table.schema().partition_keys();
+    if partition_keys.is_empty() {
+        (None, filter.split_and())
+    } else {
+        split_partition_and_data_predicates(filter, table.schema().fields(), 
partition_keys)
+    }
+}
+
+fn bucket_predicate(table: &Table, filter: &Predicate) -> Option<Predicate> {
+    let core_options = CoreOptions::new(table.schema().options());
+    if !core_options.is_default_bucket_function() {
+        return None;
+    }
+
+    let bucket_keys = core_options.bucket_key().unwrap_or_else(|| {
+        if table.schema().primary_keys().is_empty() {
+            Vec::new()
+        } else {
+            table
+                .schema()
+                .primary_keys()
+                .iter()
+                .map(|key| key.to_string())
+                .collect()
+        }
+    });
+    if bucket_keys.is_empty() {
+        return None;
+    }
+
+    let has_all_bucket_fields = bucket_keys.iter().all(|key| {
+        table
+            .schema()
+            .fields()
+            .iter()
+            .any(|field| field.name() == key)
+    });
+    if !has_all_bucket_fields {
+        return None;
+    }
+
+    extract_predicate_for_keys(filter, table.schema().fields(), &bucket_keys)
+}
+
+fn normalize_filter(table: &Table, filter: Predicate) -> NormalizedFilter {
+    let (partition_predicate, data_predicates) = split_scan_predicates(table, 
filter.clone());
+    NormalizedFilter {
+        partition_predicate,
+        data_predicates,
+        bucket_predicate: bucket_predicate(table, &filter),
+    }
+}
+
+fn read_data_predicates(table: &Table, filter: Predicate) -> Vec<Predicate> {
+    let (_, data_predicates) = split_scan_predicates(table, filter);
+    reader_pruning_predicates(data_predicates)
+}
+
 /// Builder for table scan and table read (new_scan, new_read).
 ///
 /// Rust keeps a names-based projection API for ergonomics, while aligning the
@@ -35,7 +103,7 @@ use std::collections::{HashMap, HashSet};
 pub struct ReadBuilder<'a> {
     table: &'a Table,
     projected_fields: Option<Vec<String>>,
-    filter: Option<Predicate>,
+    filter: NormalizedFilter,
     limit: Option<usize>,
 }
 
@@ -44,7 +112,7 @@ impl<'a> ReadBuilder<'a> {
         Self {
             table,
             projected_fields: None,
-            filter: None,
+            filter: NormalizedFilter::default(),
             limit: None,
         }
     }
@@ -57,7 +125,7 @@ impl<'a> ReadBuilder<'a> {
         self
     }
 
-    /// Set a filter predicate for scan planning.
+    /// Set a filter predicate for scan planning and conservative read pruning.
     ///
     /// The predicate should use table schema field indices (as produced by
     /// [`PredicateBuilder`]). During [`TableScan::plan`], partition-only
@@ -67,10 +135,13 @@ impl<'a> ReadBuilder<'a> {
     /// Stats pruning is per file. Files with a different `schema_id`,
     /// incompatible stats layout, or inconclusive stats are kept.
     ///
-    /// [`TableRead`] does not evaluate row-level filters; callers must apply
-    /// any remaining predicates themselves.
+    /// [`TableRead`] may use supported non-partition data predicates only on
+    /// the regular Parquet read path for conservative row-group pruning and
+    /// native Parquet row filtering. Unsupported predicates, non-Parquet
+    /// reads, and data-evolution reads remain residual and should still be
+    /// applied by the caller if exact filtering semantics are required.
     pub fn with_filter(&mut self, filter: Predicate) -> &mut Self {
-        self.filter = Some(filter);
+        self.filter = normalize_filter(self.table, filter);
         self
     }
 
@@ -89,7 +160,13 @@ impl<'a> ReadBuilder<'a> {
 
     /// Create a table scan. Call [TableScan::plan] to get splits.
     pub fn new_scan(&self) -> TableScan<'a> {
-        TableScan::new(self.table, self.filter.clone(), self.limit)
+        TableScan::new(
+            self.table,
+            self.filter.partition_predicate.clone(),
+            self.filter.data_predicates.clone(),
+            self.filter.bucket_predicate.clone(),
+            self.limit,
+        )
     }
 
     /// Create a table read for consuming splits (e.g. from a scan plan).
@@ -99,7 +176,11 @@ impl<'a> ReadBuilder<'a> {
             Some(projected) => self.resolve_projected_fields(projected)?,
         };
 
-        Ok(TableRead::new(self.table, read_type))
+        Ok(TableRead::new(
+            self.table,
+            read_type,
+            reader_pruning_predicates(self.filter.data_predicates.clone()),
+        ))
     }
 
     fn resolve_projected_fields(&self, projected_fields: &[String]) -> 
Result<Vec<DataField>> {
@@ -146,12 +227,21 @@ impl<'a> ReadBuilder<'a> {
 pub struct TableRead<'a> {
     table: &'a Table,
     read_type: Vec<DataField>,
+    data_predicates: Vec<Predicate>,
 }
 
 impl<'a> TableRead<'a> {
     /// Create a new TableRead with a specific read type (projected fields).
-    pub fn new(table: &'a Table, read_type: Vec<DataField>) -> Self {
-        Self { table, read_type }
+    pub fn new(
+        table: &'a Table,
+        read_type: Vec<DataField>,
+        data_predicates: Vec<Predicate>,
+    ) -> Self {
+        Self {
+            table,
+            read_type,
+            data_predicates,
+        }
     }
 
     /// Schema (fields) that this read will produce.
@@ -164,6 +254,19 @@ impl<'a> TableRead<'a> {
         self.table
     }
 
+    /// Set a filter predicate for conservative read-side pruning.
+    ///
+    /// This is the direct-`TableRead` equivalent of 
[`ReadBuilder::with_filter`].
+    /// Supported non-partition data predicates may be used only on the regular
+    /// Parquet read path for row-group pruning and native Parquet row
+    /// filtering. Callers should still keep residual filtering at the query
+    /// layer for unsupported predicates, non-Parquet files, and data-evolution
+    /// reads.
+    pub fn with_filter(mut self, filter: Predicate) -> Self {
+        self.data_predicates = read_data_predicates(self.table, filter);
+        self
+    }
+
     /// Returns an [`ArrowRecordBatchStream`].
     pub fn to_arrow(&self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
         // todo: consider get read batch size from table
@@ -186,6 +289,8 @@ impl<'a> TableRead<'a> {
             self.table.schema_manager().clone(),
             self.table.schema().id(),
         )
+        .with_predicates(self.data_predicates.clone())
+        .with_table_fields(self.table.schema.fields().to_vec())
         .build(self.read_type().to_vec());
 
         if data_evolution {
@@ -195,3 +300,273 @@ impl<'a> TableRead<'a> {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::TableRead;
+    mod test_utils {
+        include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../test_utils.rs"));
+    }
+
+    use crate::catalog::Identifier;
+    use crate::io::FileIOBuilder;
+    use crate::spec::{
+        BinaryRow, DataType, IntType, Predicate, PredicateBuilder, Schema, 
TableSchema, VarCharType,
+    };
+    use crate::table::{DataSplitBuilder, Table};
+    use arrow_array::{Int32Array, RecordBatch};
+    use futures::TryStreamExt;
+    use std::fs;
+    use tempfile::tempdir;
+    use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
+
+    fn collect_int_column(batches: &[RecordBatch], column_name: &str) -> 
Vec<i32> {
+        batches
+            .iter()
+            .flat_map(|batch| {
+                let column_index = 
batch.schema().index_of(column_name).unwrap();
+                let array = batch.column(column_index);
+                let values = 
array.as_any().downcast_ref::<Int32Array>().unwrap();
+                (0..values.len())
+                    .map(|index| values.value(index))
+                    .collect::<Vec<_>>()
+            })
+            .collect()
+    }
+
+    #[tokio::test]
+    async fn 
test_new_read_pushes_filter_to_reader_when_filter_column_not_projected() {
+        let tempdir = tempdir().unwrap();
+        let table_path = local_file_path(tempdir.path());
+        let bucket_dir = tempdir.path().join("bucket-0");
+        fs::create_dir_all(&bucket_dir).unwrap();
+
+        let parquet_path = bucket_dir.join("data.parquet");
+        write_int_parquet_file(
+            &parquet_path,
+            vec![("id", vec![1, 2, 3, 4]), ("value", vec![1, 2, 20, 30])],
+            Some(2),
+        );
+
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .build()
+                .unwrap(),
+        );
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "t"),
+            table_path,
+            table_schema,
+        );
+
+        let split = DataSplitBuilder::new()
+            .with_snapshot(1)
+            .with_partition(BinaryRow::new(0))
+            .with_bucket(0)
+            .with_bucket_path(local_file_path(&bucket_dir))
+            .with_total_buckets(1)
+            .with_data_files(vec![test_data_file("data.parquet", 4)])
+            .with_raw_convertible(true)
+            .build()
+            .unwrap();
+
+        let predicate = PredicateBuilder::new(table.schema().fields())
+            .greater_or_equal("value", crate::spec::Datum::Int(10))
+            .unwrap();
+
+        let mut builder = table.new_read_builder();
+        builder.with_projection(&["id"]).with_filter(predicate);
+        let read = builder.new_read().unwrap();
+        let batches = read
+            .to_arrow(&[split])
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        assert_eq!(collect_int_column(&batches, "id"), vec![3, 4]);
+    }
+
+    #[tokio::test]
+    async fn test_direct_table_read_with_filter_pushes_filter_to_reader() {
+        let tempdir = tempdir().unwrap();
+        let table_path = local_file_path(tempdir.path());
+        let bucket_dir = tempdir.path().join("bucket-0");
+        fs::create_dir_all(&bucket_dir).unwrap();
+
+        let parquet_path = bucket_dir.join("data.parquet");
+        write_int_parquet_file(
+            &parquet_path,
+            vec![("id", vec![1, 2, 3, 4]), ("value", vec![1, 2, 20, 30])],
+            Some(2),
+        );
+
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .build()
+                .unwrap(),
+        );
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "t"),
+            table_path,
+            table_schema,
+        );
+
+        let split = DataSplitBuilder::new()
+            .with_snapshot(1)
+            .with_partition(BinaryRow::new(0))
+            .with_bucket(0)
+            .with_bucket_path(local_file_path(&bucket_dir))
+            .with_total_buckets(1)
+            .with_data_files(vec![test_data_file("data.parquet", 4)])
+            .with_raw_convertible(true)
+            .build()
+            .unwrap();
+
+        let predicate = PredicateBuilder::new(table.schema().fields())
+            .greater_or_equal("value", crate::spec::Datum::Int(10))
+            .unwrap();
+        let read = TableRead::new(&table, 
vec![table.schema().fields()[0].clone()], Vec::new())
+            .with_filter(predicate);
+        let batches = read
+            .to_arrow(&[split])
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        assert_eq!(collect_int_column(&batches, "id"), vec![3, 4]);
+    }
+
+    #[tokio::test]
+    async fn test_new_read_row_filter_filters_rows_within_matching_row_group() 
{
+        let tempdir = tempdir().unwrap();
+        let table_path = local_file_path(tempdir.path());
+        let bucket_dir = tempdir.path().join("bucket-0");
+        fs::create_dir_all(&bucket_dir).unwrap();
+
+        let parquet_path = bucket_dir.join("data.parquet");
+        write_int_parquet_file(
+            &parquet_path,
+            vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
+            Some(2),
+        );
+
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .build()
+                .unwrap(),
+        );
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "t"),
+            table_path,
+            table_schema,
+        );
+
+        let split = DataSplitBuilder::new()
+            .with_snapshot(1)
+            .with_partition(BinaryRow::new(0))
+            .with_bucket(0)
+            .with_bucket_path(local_file_path(&bucket_dir))
+            .with_total_buckets(1)
+            .with_data_files(vec![test_data_file("data.parquet", 4)])
+            .with_raw_convertible(true)
+            .build()
+            .unwrap();
+
+        let predicate = PredicateBuilder::new(table.schema().fields())
+            .greater_or_equal("value", crate::spec::Datum::Int(10))
+            .unwrap();
+
+        let mut builder = table.new_read_builder();
+        builder.with_projection(&["id"]).with_filter(predicate);
+        let read = builder.new_read().unwrap();
+        let batches = read
+            .to_arrow(&[split])
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        assert_eq!(collect_int_column(&batches, "id"), vec![2, 3, 4]);
+    }
+
+    #[tokio::test]
+    async fn test_reader_pruning_ignores_partition_conjuncts() {
+        let tempdir = tempdir().unwrap();
+        let table_path = local_file_path(tempdir.path());
+        let bucket_dir = tempdir.path().join("dt=2024-01-01").join("bucket-0");
+        fs::create_dir_all(&bucket_dir).unwrap();
+
+        write_int_parquet_file(
+            &bucket_dir.join("data.parquet"),
+            vec![("id", vec![1, 2, 3, 4]), ("value", vec![1, 2, 20, 30])],
+            Some(2),
+        );
+
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("dt", DataType::VarChar(VarCharType::string_type()))
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .partition_keys(["dt"])
+                .build()
+                .unwrap(),
+        );
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "t"),
+            table_path,
+            table_schema,
+        );
+
+        let split = DataSplitBuilder::new()
+            .with_snapshot(1)
+            .with_partition(BinaryRow::new(1))
+            .with_bucket(0)
+            .with_bucket_path(local_file_path(&bucket_dir))
+            .with_total_buckets(1)
+            .with_data_files(vec![test_data_file("data.parquet", 4)])
+            .with_raw_convertible(true)
+            .build()
+            .unwrap();
+
+        let predicate = Predicate::and(vec![
+            PredicateBuilder::new(table.schema().fields())
+                .equal("dt", 
crate::spec::Datum::String("2024-01-01".to_string()))
+                .unwrap(),
+            PredicateBuilder::new(table.schema().fields())
+                .greater_or_equal("value", crate::spec::Datum::Int(10))
+                .unwrap(),
+        ]);
+
+        let mut builder = table.new_read_builder();
+        builder.with_projection(&["id"]).with_filter(predicate);
+        let read = builder.new_read().unwrap();
+        let batches = read
+            .to_arrow(&[split])
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        assert_eq!(collect_int_column(&batches, "id"), vec![3, 4]);
+    }
+}
diff --git a/crates/paimon/src/table/stats_filter.rs 
b/crates/paimon/src/table/stats_filter.rs
index 171b00d..9796bf4 100644
--- a/crates/paimon/src/table/stats_filter.rs
+++ b/crates/paimon/src/table/stats_filter.rs
@@ -19,11 +19,10 @@
 
 use super::Table;
 use crate::arrow::schema_evolution::create_index_mapping;
-use crate::spec::{
-    extract_datum, BinaryRow, DataField, DataFileMeta, DataType, Datum, 
Predicate,
-    PredicateOperator,
+use crate::predicate_stats::{
+    data_leaf_may_match, missing_field_may_match, 
predicates_may_match_with_schema, StatsAccessor,
 };
-use std::cmp::Ordering;
+use crate::spec::{extract_datum, BinaryRow, DataField, DataFileMeta, DataType, 
Datum, Predicate};
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -97,11 +96,36 @@ impl FileStatsRows {
         }
     }
 
-    fn null_count(&self, stats_index: usize) -> Option<i64> {
+    fn stats_null_count(&self, stats_index: usize) -> Option<i64> {
         self.null_counts.get(stats_index).copied().flatten()
     }
 }
 
+impl StatsAccessor for FileStatsRows {
+    fn row_count(&self) -> i64 {
+        self.row_count
+    }
+
+    fn null_count(&self, index: usize) -> Option<i64> {
+        let stats_index = self.stats_index(index)?;
+        self.stats_null_count(stats_index)
+    }
+
+    fn min_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+        let stats_index = self.stats_index(index)?;
+        self.min_values
+            .as_ref()
+            .and_then(|row| extract_stats_datum(row, stats_index, data_type))
+    }
+
+    fn max_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+        let stats_index = self.stats_index(index)?;
+        self.max_values
+            .as_ref()
+            .and_then(|row| extract_stats_datum(row, stats_index, data_type))
+    }
+}
+
 #[derive(Debug)]
 pub(super) struct ResolvedStatsSchema {
     file_fields: Vec<DataField>,
@@ -156,10 +180,8 @@ pub(super) fn data_file_matches_predicates(
     }
 
     let stats = FileStatsRows::from_data_file(file, schema_fields);
-
-    predicates
-        .iter()
-        .all(|predicate| data_predicate_may_match(predicate, &stats))
+    let field_mapping = identity_field_mapping(schema_fields.len());
+    predicates_may_match_with_schema(predicates, &stats, &field_mapping, 
schema_fields)
 }
 
 async fn resolve_stats_schema(
@@ -218,203 +240,12 @@ pub(super) async fn 
data_file_matches_predicates_for_table(
     };
 
     let stats = FileStatsRows::from_data_file(file, &resolved.file_fields);
-
-    predicates.iter().all(|predicate| {
-        data_predicate_may_match_with_schema(
-            predicate,
-            &stats,
-            &resolved.field_mapping,
-            &resolved.file_fields,
-        )
-    })
-}
-
-fn data_predicate_may_match(predicate: &Predicate, stats: &FileStatsRows) -> 
bool {
-    match predicate {
-        Predicate::AlwaysTrue => true,
-        Predicate::AlwaysFalse => false,
-        Predicate::And(children) => children
-            .iter()
-            .all(|child| data_predicate_may_match(child, stats)),
-        Predicate::Or(_) | Predicate::Not(_) => true,
-        Predicate::Leaf {
-            index,
-            data_type,
-            op,
-            literals,
-            ..
-        } => {
-            let Some(stats_idx) = stats.stats_index(*index) else {
-                return true;
-            };
-            data_leaf_may_match(stats_idx, data_type, data_type, *op, 
literals, stats)
-        }
-    }
-}
-
-fn data_predicate_may_match_with_schema(
-    predicate: &Predicate,
-    stats: &FileStatsRows,
-    field_mapping: &[Option<usize>],
-    file_fields: &[DataField],
-) -> bool {
-    match predicate {
-        Predicate::AlwaysTrue => true,
-        Predicate::AlwaysFalse => false,
-        Predicate::And(children) => children.iter().all(|child| {
-            data_predicate_may_match_with_schema(child, stats, field_mapping, 
file_fields)
-        }),
-        Predicate::Or(_) | Predicate::Not(_) => true,
-        Predicate::Leaf {
-            index,
-            data_type,
-            op,
-            literals,
-            ..
-        } => match field_mapping.get(*index).copied().flatten() {
-            Some(file_index) => {
-                let Some(file_field) = file_fields.get(file_index) else {
-                    return true;
-                };
-                let Some(stats_idx) = stats.stats_index(file_index) else {
-                    return true;
-                };
-                data_leaf_may_match(
-                    stats_idx,
-                    file_field.data_type(),
-                    data_type,
-                    *op,
-                    literals,
-                    stats,
-                )
-            }
-            None => missing_field_may_match(*op, stats.row_count),
-        },
-    }
-}
-
-pub(super) fn data_leaf_may_match(
-    index: usize,
-    stats_data_type: &DataType,
-    predicate_data_type: &DataType,
-    op: PredicateOperator,
-    literals: &[Datum],
-    stats: &FileStatsRows,
-) -> bool {
-    let row_count = stats.row_count;
-    if row_count <= 0 {
-        return false;
-    }
-
-    let null_count = stats.null_count(index);
-    let all_null = null_count.map(|count| count == row_count);
-
-    match op {
-        PredicateOperator::IsNull => {
-            return null_count.is_none_or(|count| count > 0);
-        }
-        PredicateOperator::IsNotNull => {
-            return all_null != Some(true);
-        }
-        PredicateOperator::In | PredicateOperator::NotIn => {
-            return true;
-        }
-        PredicateOperator::Eq
-        | PredicateOperator::NotEq
-        | PredicateOperator::Lt
-        | PredicateOperator::LtEq
-        | PredicateOperator::Gt
-        | PredicateOperator::GtEq => {}
-    }
-
-    if all_null == Some(true) {
-        return false;
-    }
-
-    let literal = match literals.first() {
-        Some(literal) => literal,
-        None => return true,
-    };
-
-    let min_value = match stats
-        .min_values
-        .as_ref()
-        .and_then(|row| extract_stats_datum(row, index, stats_data_type))
-        .and_then(|datum| coerce_stats_datum_for_predicate(datum, 
predicate_data_type))
-    {
-        Some(value) => value,
-        None => return true,
-    };
-    let max_value = match stats
-        .max_values
-        .as_ref()
-        .and_then(|row| extract_stats_datum(row, index, stats_data_type))
-        .and_then(|datum| coerce_stats_datum_for_predicate(datum, 
predicate_data_type))
-    {
-        Some(value) => value,
-        None => return true,
-    };
-
-    match op {
-        PredicateOperator::Eq => {
-            !matches!(literal.partial_cmp(&min_value), Some(Ordering::Less))
-                && !matches!(literal.partial_cmp(&max_value), 
Some(Ordering::Greater))
-        }
-        PredicateOperator::NotEq => !(min_value == *literal && max_value == 
*literal),
-        PredicateOperator::Lt => !matches!(
-            min_value.partial_cmp(literal),
-            Some(Ordering::Greater | Ordering::Equal)
-        ),
-        PredicateOperator::LtEq => {
-            !matches!(min_value.partial_cmp(literal), Some(Ordering::Greater))
-        }
-        PredicateOperator::Gt => !matches!(
-            max_value.partial_cmp(literal),
-            Some(Ordering::Less | Ordering::Equal)
-        ),
-        PredicateOperator::GtEq => !matches!(max_value.partial_cmp(literal), 
Some(Ordering::Less)),
-        PredicateOperator::IsNull
-        | PredicateOperator::IsNotNull
-        | PredicateOperator::In
-        | PredicateOperator::NotIn => true,
-    }
-}
-
-fn missing_field_may_match(op: PredicateOperator, row_count: i64) -> bool {
-    if row_count <= 0 {
-        return false;
-    }
-
-    matches!(op, PredicateOperator::IsNull)
-}
-
-fn coerce_stats_datum_for_predicate(datum: Datum, predicate_data_type: 
&DataType) -> Option<Datum> {
-    match (datum, predicate_data_type) {
-        (datum @ Datum::Bool(_), DataType::Boolean(_))
-        | (datum @ Datum::TinyInt(_), DataType::TinyInt(_))
-        | (datum @ Datum::SmallInt(_), DataType::SmallInt(_))
-        | (datum @ Datum::Int(_), DataType::Int(_))
-        | (datum @ Datum::Long(_), DataType::BigInt(_))
-        | (datum @ Datum::Float(_), DataType::Float(_))
-        | (datum @ Datum::Double(_), DataType::Double(_))
-        | (datum @ Datum::String(_), DataType::VarChar(_))
-        | (datum @ Datum::String(_), DataType::Char(_))
-        | (datum @ Datum::Bytes(_), DataType::Binary(_))
-        | (datum @ Datum::Bytes(_), DataType::VarBinary(_))
-        | (datum @ Datum::Date(_), DataType::Date(_))
-        | (datum @ Datum::Time(_), DataType::Time(_))
-        | (datum @ Datum::Timestamp { .. }, DataType::Timestamp(_))
-        | (datum @ Datum::LocalZonedTimestamp { .. }, 
DataType::LocalZonedTimestamp(_))
-        | (datum @ Datum::Decimal { .. }, DataType::Decimal(_)) => Some(datum),
-        (Datum::TinyInt(value), DataType::SmallInt(_)) => 
Some(Datum::SmallInt(value as i16)),
-        (Datum::TinyInt(value), DataType::Int(_)) => Some(Datum::Int(value as 
i32)),
-        (Datum::TinyInt(value), DataType::BigInt(_)) => Some(Datum::Long(value 
as i64)),
-        (Datum::SmallInt(value), DataType::Int(_)) => Some(Datum::Int(value as 
i32)),
-        (Datum::SmallInt(value), DataType::BigInt(_)) => 
Some(Datum::Long(value as i64)),
-        (Datum::Int(value), DataType::BigInt(_)) => Some(Datum::Long(value as 
i64)),
-        (Datum::Float(value), DataType::Double(_)) => Some(Datum::Double(value 
as f64)),
-        _ => None,
-    }
+    predicates_may_match_with_schema(
+        predicates,
+        &stats,
+        &resolved.field_mapping,
+        &resolved.file_fields,
+    )
 }
 
 fn extract_stats_datum(row: &BinaryRow, index: usize, data_type: &DataType) -> 
Option<Datum> {
@@ -463,16 +294,18 @@ pub(super) fn data_evolution_group_matches_predicates(
     let mut sorted_files: Vec<&DataFileMeta> = group.iter().collect();
     sorted_files.sort_by(|a, b| 
b.max_sequence_number.cmp(&a.max_sequence_number));
 
-    // For each table field, find which file (index in sorted_files) provides 
it,
-    // and the field's offset within that file's stats.
+    // For each table field, find which file (index in sorted_files) provides 
it.
+    // The field index remains a table-field index so FileStatsRows can resolve
+    // it through its own schema-to-stats mapping.
     let field_sources: Vec<Option<(usize, usize)>> = table_fields
         .iter()
-        .map(|field| {
+        .enumerate()
+        .map(|(field_idx, field)| {
             for (file_idx, file) in sorted_files.iter().enumerate() {
                 let file_columns = file_stats_columns(file, table_fields);
-                for (stats_idx, col_name) in file_columns.iter().enumerate() {
+                for col_name in &file_columns {
                     if *col_name == field.name() {
-                        return Some((file_idx, stats_idx));
+                        return Some((file_idx, field_idx));
                     }
                 }
             }
@@ -544,13 +377,20 @@ fn data_evolution_predicate_may_match(
             let Some(source) = field_sources.get(*index).copied().flatten() 
else {
                 return missing_field_may_match(*op, row_count);
             };
-            let (file_idx, stats_idx) = source;
+            let (file_idx, field_index) = source;
             let stats = &file_stats[file_idx];
             let stats_data_type = table_fields
                 .get(*index)
                 .map(|f| f.data_type())
                 .unwrap_or(data_type);
-            data_leaf_may_match(stats_idx, stats_data_type, data_type, *op, 
literals, stats)
+            data_leaf_may_match(
+                field_index,
+                stats_data_type,
+                data_type,
+                *op,
+                literals,
+                stats,
+            )
         }
     }
 }
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 9f2ebf9..da2dc24 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -20,16 +20,15 @@
 //! Reference: 
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/table_scan.py)
 //! and 
[FullStartingScanner](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/scanner/full_starting_scanner.py).
 
-use super::bucket_filter::{
-    compute_target_buckets, extract_predicate_for_keys, 
split_partition_and_data_predicates,
-};
+use super::bucket_filter::compute_target_buckets;
 use super::stats_filter::{
     data_evolution_group_matches_predicates, data_file_matches_predicates,
-    data_file_matches_predicates_for_table, data_leaf_may_match, 
group_by_overlapping_row_id,
-    FileStatsRows, ResolvedStatsSchema,
+    data_file_matches_predicates_for_table, group_by_overlapping_row_id, 
FileStatsRows,
+    ResolvedStatsSchema,
 };
 use super::Table;
 use crate::io::FileIO;
+use crate::predicate_stats::data_leaf_may_match;
 use crate::spec::{
     eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, 
IndexManifest,
     ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot,
@@ -299,17 +298,27 @@ fn partition_matches_predicate(
 #[derive(Debug, Clone)]
 pub struct TableScan<'a> {
     table: &'a Table,
-    filter: Option<Predicate>,
+    partition_predicate: Option<Predicate>,
+    data_predicates: Vec<Predicate>,
+    bucket_predicate: Option<Predicate>,
     /// Optional limit on the number of rows to return.
     /// When set, the scan will try to return only enough splits to satisfy 
the limit.
     limit: Option<usize>,
 }
 
 impl<'a> TableScan<'a> {
-    pub fn new(table: &'a Table, filter: Option<Predicate>, limit: 
Option<usize>) -> Self {
+    pub fn new(
+        table: &'a Table,
+        partition_predicate: Option<Predicate>,
+        data_predicates: Vec<Predicate>,
+        bucket_predicate: Option<Predicate>,
+        limit: Option<usize>,
+    ) -> Self {
         Self {
             table,
-            filter,
+            partition_predicate,
+            data_predicates,
+            bucket_predicate,
             limit,
         }
     }
@@ -416,24 +425,12 @@ impl<'a> TableScan<'a> {
         let target_split_size = core_options.source_split_target_size();
         let open_file_cost = core_options.source_split_open_file_cost();
 
-        // Compute predicates before reading manifests so they can be pushed 
down.
-        let partition_keys = self.table.schema().partition_keys();
-        let (partition_predicate, data_predicates) = if let Some(filter) = 
self.filter.clone() {
-            if partition_keys.is_empty() {
-                (None, filter.split_and())
-            } else {
-                split_partition_and_data_predicates(
-                    filter,
-                    self.table.schema().fields(),
-                    partition_keys,
-                )
-            }
-        } else {
-            (None, Vec::new())
-        };
-
         // Resolve partition fields for manifest-file-level stats pruning.
-        let partition_fields: Vec<DataField> = partition_keys
+        let partition_keys = self.table.schema().partition_keys();
+        let partition_fields: Vec<DataField> = self
+            .table
+            .schema()
+            .partition_keys()
             .iter()
             .filter_map(|key| {
                 self.table
@@ -449,17 +446,17 @@ impl<'a> TableScan<'a> {
         let pushdown_data_predicates = if data_evolution_enabled {
             &[][..]
         } else {
-            &data_predicates
+            self.data_predicates.as_slice()
         };
 
         let has_primary_keys = !self.table.schema().primary_keys().is_empty();
 
         // Compute bucket predicate and key fields for per-entry bucket 
pruning.
         // Only supported for the default bucket function (MurmurHash3-based).
-        let (bucket_predicate, bucket_key_fields): (Option<Predicate>, 
Vec<DataField>) =
-            if !core_options.is_default_bucket_function() {
-                (None, Vec::new())
-            } else if let Some(filter) = &self.filter {
+        let bucket_key_fields: Vec<DataField> =
+            if self.bucket_predicate.is_none() || 
!core_options.is_default_bucket_function() {
+                Vec::new()
+            } else {
                 let bucket_keys = core_options.bucket_key().unwrap_or_else(|| {
                     if has_primary_keys {
                         self.table
@@ -472,33 +469,17 @@ impl<'a> TableScan<'a> {
                         Vec::new()
                     }
                 });
-                if bucket_keys.is_empty() {
-                    (None, Vec::new())
-                } else {
-                    let fields: Vec<DataField> = bucket_keys
-                        .iter()
-                        .filter_map(|key| {
-                            self.table
-                                .schema()
-                                .fields()
-                                .iter()
-                                .find(|f| f.name() == key)
-                                .cloned()
-                        })
-                        .collect();
-                    if fields.len() == bucket_keys.len() {
-                        let pred = extract_predicate_for_keys(
-                            filter,
-                            self.table.schema().fields(),
-                            &bucket_keys,
-                        );
-                        (pred, fields)
-                    } else {
-                        (None, Vec::new())
-                    }
-                }
-            } else {
-                (None, Vec::new())
+                bucket_keys
+                    .iter()
+                    .filter_map(|key| {
+                        self.table
+                            .schema()
+                            .fields()
+                            .iter()
+                            .find(|f| f.name() == key)
+                            .cloned()
+                    })
+                    .collect::<Vec<_>>()
             };
 
         let entries = read_all_manifest_entries(
@@ -507,12 +488,12 @@ impl<'a> TableScan<'a> {
             &snapshot,
             deletion_vectors_enabled,
             has_primary_keys,
-            partition_predicate.as_ref(),
+            self.partition_predicate.as_ref(),
             &partition_fields,
             pushdown_data_predicates,
             self.table.schema().id(),
             self.table.schema().fields(),
-            bucket_predicate.as_ref(),
+            self.bucket_predicate.as_ref(),
             &bucket_key_fields,
         )
         .await?;
@@ -523,7 +504,7 @@ impl<'a> TableScan<'a> {
 
         // For non-data-evolution tables, cross-schema files were kept 
(fail-open)
         // by the pushdown. Apply the full schema-aware filter for those files.
-        let entries = if data_predicates.is_empty() || data_evolution_enabled {
+        let entries = if self.data_predicates.is_empty() || 
data_evolution_enabled {
             entries
         } else {
             let current_schema_id = self.table.schema().id();
@@ -541,7 +522,7 @@ impl<'a> TableScan<'a> {
                         || data_file_matches_predicates_for_table(
                             self.table,
                             entry.file(),
-                            &data_predicates,
+                            &self.data_predicates,
                             &mut schema_cache,
                         )
                         .await
@@ -625,7 +606,7 @@ impl<'a> TableScan<'a> {
                 let row_id_groups = group_by_overlapping_row_id(data_files);
 
                 // Filter groups by merged stats before splitting.
-                let row_id_groups: Vec<Vec<DataFileMeta>> = if 
data_predicates.is_empty() {
+                let row_id_groups: Vec<Vec<DataFileMeta>> = if 
self.data_predicates.is_empty() {
                     row_id_groups
                 } else {
                     row_id_groups
@@ -633,7 +614,7 @@ impl<'a> TableScan<'a> {
                         .filter(|group| {
                             data_evolution_group_matches_predicates(
                                 group,
-                                &data_predicates,
+                                &self.data_predicates,
                                 self.table.schema().fields(),
                             )
                         })
@@ -688,7 +669,7 @@ impl<'a> TableScan<'a> {
         // Apply limit pushdown only when there are no data predicates.
         // With data predicates, merged_row_count() reflects pre-filter row 
counts,
         // so stopping early could return fewer rows than the limit after 
filtering.
-        let splits = if data_predicates.is_empty() {
+        let splits = if self.data_predicates.is_empty() {
             self.apply_limit_pushdown(splits)
         } else {
             splits
diff --git a/crates/test_utils.rs b/crates/test_utils.rs
new file mode 100644
index 0000000..48f6fa2
--- /dev/null
+++ b/crates/test_utils.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+use std::path::Path;
+use std::sync::Arc;
+
+use arrow_array::{Array, Int32Array, RecordBatch};
+use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as 
ArrowSchema};
+use chrono::Utc;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use serde::de::DeserializeOwned;
+
+pub(crate) fn write_int_parquet_file(
+    path: &Path,
+    columns: Vec<(&str, Vec<i32>)>,
+    max_row_group_size: Option<usize>,
+) {
+    let schema = Arc::new(ArrowSchema::new(
+        columns
+            .iter()
+            .map(|(name, _)| ArrowField::new(*name, ArrowDataType::Int32, 
false))
+            .collect::<Vec<_>>(),
+    ));
+    let arrays: Vec<Arc<dyn Array>> = columns
+        .iter()
+        .map(|(_, values)| Arc::new(Int32Array::from(values.clone())) as 
Arc<dyn Array>)
+        .collect();
+    let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
+
+    let props = max_row_group_size.map(|size| {
+        WriterProperties::builder()
+            .set_max_row_group_size(size)
+            .build()
+    });
+    let file = File::create(path).unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, props).unwrap();
+    writer.write(&batch).unwrap();
+    writer.close().unwrap();
+}
+
+pub(crate) fn local_file_path(path: &Path) -> String {
+    let normalized = path.to_string_lossy().replace('\\', "/");
+    if normalized.starts_with('/') {
+        format!("file:{normalized}")
+    } else {
+        format!("file:/{normalized}")
+    }
+}
+
+pub(crate) fn test_data_file<T>(file_name: &str, row_count: i64) -> T
+where
+    T: DeserializeOwned,
+{
+    serde_json::from_value(serde_json::json!({
+        "_FILE_NAME": file_name,
+        "_FILE_SIZE": 0,
+        "_ROW_COUNT": row_count,
+        "_MIN_KEY": [],
+        "_MAX_KEY": [],
+        "_KEY_STATS": {
+            "_MIN_VALUES": [],
+            "_MAX_VALUES": [],
+            "_NULL_COUNTS": []
+        },
+        "_VALUE_STATS": {
+            "_MIN_VALUES": [],
+            "_MAX_VALUES": [],
+            "_NULL_COUNTS": []
+        },
+        "_MIN_SEQUENCE_NUMBER": 0,
+        "_MAX_SEQUENCE_NUMBER": 0,
+        "_SCHEMA_ID": 0,
+        "_LEVEL": 1,
+        "_EXTRA_FILES": [],
+        "_CREATION_TIME": Utc::now().timestamp_millis(),
+        "_DELETE_ROW_COUNT": null,
+        "_EMBEDDED_FILE_INDEX": null,
+        "_FILE_SOURCE": null,
+        "_VALUE_STATS_COLS": null,
+        "_FIRST_ROW_ID": null,
+        "_WRITE_COLS": null,
+        "_EXTERNAL_PATH": null
+    }))
+    .unwrap()
+}

Reply via email to