This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0cb1a83 feat(table): support partition predicate pruning in TableScan
(#167)
0cb1a83 is described below
commit 0cb1a8390131f95ded9519bdc7083f1432d4fed1
Author: Zach <[email protected]>
AuthorDate: Wed Apr 1 22:57:20 2026 +0800
feat(table): support partition predicate pruning in TableScan (#167)
---
crates/integration_tests/tests/read_tables.rs | 309 +++++++++-
crates/paimon/src/spec/mod.rs | 1 +
crates/paimon/src/spec/predicate.rs | 822 +++++++++++++++++++++++---
crates/paimon/src/table/read_builder.rs | 23 +-
crates/paimon/src/table/table_scan.rs | 162 ++++-
5 files changed, 1228 insertions(+), 89 deletions(-)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 1b0032a..ec96653 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -22,7 +22,7 @@ use futures::TryStreamExt;
use paimon::api::ConfigResponse;
use paimon::catalog::{Identifier, RESTCatalog};
use paimon::common::Options;
-use paimon::spec::{DataType, IntType, Schema, VarCharType};
+use paimon::spec::{DataType, IntType, Predicate, Schema, VarCharType};
use paimon::{Catalog, Error, FileSystemCatalog, Plan};
use std::collections::{HashMap, HashSet};
@@ -88,6 +88,27 @@ async fn scan_and_read_with_fs_catalog(
scan_and_read(&catalog, table_name, projection).await
}
+async fn scan_and_read_with_filter(
+ table: &paimon::Table,
+ filter: Predicate,
+) -> (Plan, Vec<RecordBatch>) {
+ let mut read_builder = table.new_read_builder();
+ read_builder.with_filter(filter);
+ let scan = read_builder.new_scan();
+ let plan = scan.plan().await.expect("Failed to plan scan");
+
+ let read = read_builder.new_read().expect("Failed to create read");
+ let stream = read
+ .to_arrow(plan.splits())
+ .expect("Failed to create arrow stream");
+ let batches: Vec<_> = stream
+ .try_collect()
+ .await
+ .expect("Failed to collect batches");
+
+ (plan, batches)
+}
+
fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
let mut rows = Vec::new();
for batch in batches {
@@ -107,6 +128,55 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32,
String)> {
rows
}
+fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> {
+ let mut rows = Vec::new();
+ for batch in batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let dt = batch
+ .column_by_name("dt")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("dt");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
+ }
+ }
+ rows.sort_by_key(|(id, _, _)| *id);
+ rows
+}
+
+fn extract_plan_partitions(plan: &Plan) -> HashSet<String> {
+ plan.splits()
+ .iter()
+ .map(|split| {
+ split
+ .partition()
+ .get_string(0)
+ .expect("Failed to decode dt")
+ .to_string()
+ })
+ .collect()
+}
+
+fn extract_plan_multi_partitions(plan: &Plan) -> HashSet<(String, i32)> {
+ plan.splits()
+ .iter()
+ .map(|split| {
+ let partition = split.partition();
+ (
+ partition.get_string(0).expect("dt").to_string(),
+ partition.get_int(1).expect("hr"),
+ )
+ })
+ .collect()
+}
+
#[tokio::test]
async fn test_read_log_table() {
let (plan, batches) = scan_and_read_with_fs_catalog("simple_log_table",
None).await;
@@ -397,6 +467,7 @@ async fn test_read_projection_empty() {
);
}
}
+
#[tokio::test]
async fn test_read_projection_unknown_column() {
let catalog = create_file_system_catalog();
@@ -460,6 +531,242 @@ async fn test_read_projection_duplicate_column() {
);
}
+// ---------------------------------------------------------------------------
+// Partition filter integration tests
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn test_read_partitioned_table_with_filter() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ // Build a filter: dt = '2024-01-01'
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+ let filter = pb
+ .equal("dt", Datum::String("2024-01-01".into()))
+ .expect("Failed to build predicate");
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let seen_partitions = extract_plan_partitions(&plan);
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-01".into()]),
+ "Only the filtered partition should be in the plan"
+ );
+
+ let rows = extract_id_name_dt(&batches);
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".into(), "2024-01-01".into()),
+ (2, "bob".into(), "2024-01-01".into()),
+ ]
+ );
+}
+
+#[tokio::test]
+async fn test_read_multi_partitioned_table_with_filter() {
+ use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"multi_partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ // Filter: dt = '2024-01-01' AND hr = 10
+ let filter = Predicate::and(vec![
+ pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+ pb.equal("hr", Datum::Int(10)).unwrap(),
+ ]);
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let partitions = extract_plan_multi_partitions(&plan);
+ assert_eq!(
+ partitions,
+ HashSet::from([("2024-01-01".into(), 10)]),
+ "Only dt=2024-01-01, hr=10 should survive"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![(1, "alice".to_string()), (2, "bob".to_string()),],
+ "Only rows from dt=2024-01-01, hr=10 should be returned"
+ );
+}
+
+#[tokio::test]
+async fn
test_read_partitioned_table_data_only_filter_preserves_all_partitions() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ // Data-only filter: id > 10 — should NOT prune any partitions,
+ // and is still ignored at read level in Phase 2.
+ let filter = pb
+ .greater_than("id", Datum::Int(10))
+ .expect("Failed to build predicate");
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let seen_partitions = extract_plan_partitions(&plan);
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
+ "Data-only filter should not prune any partitions"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ ],
+ "Data predicate is not applied at read level; all rows are still
returned"
+ );
+}
+
+/// Mixed AND: partition predicate prunes partitions, but data predicate is
+/// silently ignored — all rows from the matching partition are returned.
+#[tokio::test]
+async fn test_read_partitioned_table_mixed_and_filter() {
+ use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ // dt = '2024-01-01' AND id > 10
+ // Partition conjunct (dt) is applied; data conjunct (id) is NOT.
+ let filter = Predicate::and(vec![
+ pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+ pb.greater_than("id", Datum::Int(10)).unwrap(),
+ ]);
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let seen_partitions = extract_plan_partitions(&plan);
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-01".into()]),
+ "Only dt=2024-01-01 should survive"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![(1, "alice".to_string()), (2, "bob".to_string())],
+ "Data predicate (id > 10) is NOT applied — all rows from matching
partition returned"
+ );
+}
+
+/// Mixed OR: `dt = '...' OR id > 10` cannot be split into a pure partition
+/// predicate, so no partitions should be pruned.
+#[tokio::test]
+async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
+ use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ // dt = '2024-01-01' OR id > 10 — mixed OR is not safely splittable.
+ let filter = Predicate::or(vec![
+ pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+ pb.greater_than("id", Datum::Int(10)).unwrap(),
+ ]);
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let seen_partitions = extract_plan_partitions(&plan);
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
+ "Mixed OR should not prune any partitions"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ ],
+ "All rows should be returned when pruning is not possible"
+ );
+}
+
+/// Filter that matches no existing partition — all entries pruned, 0 splits.
+#[tokio::test]
+async fn test_read_partitioned_table_filter_matches_no_partition() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ // dt = '9999-12-31' matches no partition.
+ let filter = pb
+ .equal("dt", Datum::String("9999-12-31".into()))
+ .expect("Failed to build predicate");
+
+ let mut read_builder = table.new_read_builder();
+ read_builder.with_filter(filter);
+ let scan = read_builder.new_scan();
+ let plan = scan.plan().await.expect("Failed to plan scan");
+
+ assert!(
+ plan.splits().is_empty(),
+ "No splits should survive when filter matches no partition"
+ );
+}
+
+#[tokio::test]
+async fn test_read_partitioned_table_eval_row_error_fails_plan() {
+ use paimon::spec::{ArrayType, DataType, Datum, IntType, PredicateOperator};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ let dt_index = table
+ .schema()
+ .fields()
+ .iter()
+ .position(|f| f.name() == "dt")
+ .expect("dt partition column should exist");
+
+ // Use an unsupported DataType in a partition leaf so remapping succeeds
+ // but `eval_row` fails during partition pruning.
+ let filter = Predicate::Leaf {
+ column: "dt".into(),
+ index: dt_index,
+ data_type:
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
+ op: PredicateOperator::Eq,
+ literals: vec![Datum::Int(42)],
+ };
+
+ let mut read_builder = table.new_read_builder();
+ read_builder.with_filter(filter);
+
+ let err = read_builder
+ .new_scan()
+ .plan()
+ .await
+ .expect_err("eval_row error should fail-fast during planning");
+
+ assert!(
+ matches!(&err, Error::Unsupported { message } if
message.contains("extract_datum")),
+ "Expected extract_datum unsupported error, got: {err:?}"
+ );
+}
+
// ======================= REST Catalog read tests
===============================
/// Build a simple test schema matching the Spark-provisioned tables (id INT,
name VARCHAR).
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 3e82002..970fb5e 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -56,6 +56,7 @@ pub use types::*;
mod partition_utils;
pub(crate) use partition_utils::PartitionComputer;
mod predicate;
+pub(crate) use predicate::eval_row;
pub use predicate::{
field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder,
PredicateOperator,
};
diff --git a/crates/paimon/src/spec/predicate.rs
b/crates/paimon/src/spec/predicate.rs
index 67954a1..4c89820 100644
--- a/crates/paimon/src/spec/predicate.rs
+++ b/crates/paimon/src/spec/predicate.rs
@@ -24,8 +24,10 @@
//! - Java `PredicateBuilder` / `LeafPredicate` / `CompoundPredicate`
use crate::error::*;
+use crate::spec::data_file::BinaryRow;
use crate::spec::types::DataType;
use crate::spec::DataField;
+use std::cmp::Ordering;
use std::fmt;
// ---------------------------------------------------------------------------
@@ -43,6 +45,7 @@ use std::fmt;
/// rather than `BigDecimal.equals` which is scale-sensitive),
/// e.g. `Decimal(10, scale=1)` == `Decimal(100, scale=2)`
/// because both represent `1.0`.
+///
#[derive(Debug, Clone)]
pub enum Datum {
Bool(bool),
@@ -100,55 +103,66 @@ impl fmt::Display for Datum {
impl PartialEq for Datum {
fn eq(&self, other: &Self) -> bool {
- match (self, other) {
- (Self::Bool(a), Self::Bool(b)) => a == b,
- (Self::TinyInt(a), Self::TinyInt(b)) => a == b,
- (Self::SmallInt(a), Self::SmallInt(b)) => a == b,
- (Self::Int(a), Self::Int(b)) => a == b,
- (Self::Long(a), Self::Long(b)) => a == b,
- (Self::Float(a), Self::Float(b)) => a == b,
- (Self::Double(a), Self::Double(b)) => a == b,
- (Self::String(a), Self::String(b)) => a == b,
- (Self::Date(a), Self::Date(b)) => a == b,
- (Self::Time(a), Self::Time(b)) => a == b,
- (
- Self::Timestamp {
- millis: ma,
- nanos: na,
- },
- Self::Timestamp {
- millis: mb,
- nanos: nb,
- },
- ) => ma == mb && na == nb,
- (
- Self::LocalZonedTimestamp {
- millis: ma,
- nanos: na,
- },
- Self::LocalZonedTimestamp {
- millis: mb,
- nanos: nb,
- },
- ) => ma == mb && na == nb,
- // Decimal: mathematical equivalence — normalize to common scale
- // before comparing. Matches Java Paimon's Decimal which uses
- // compareTo() == 0 (not BigDecimal.equals which is
scale-sensitive).
- (
- Self::Decimal {
- unscaled: ua,
- scale: sa,
- ..
- },
- Self::Decimal {
- unscaled: ub,
- scale: sb,
- ..
- },
- ) => decimal_eq(*ua, *sa, *ub, *sb),
- (Self::Bytes(a), Self::Bytes(b)) => a == b,
- _ => false,
- }
+ datum_eq(self, other)
+ }
+}
+
+impl PartialOrd for Datum {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ datum_cmp(self, other)
+ }
+}
+
+fn datum_eq(lhs: &Datum, rhs: &Datum) -> bool {
+ datum_cmp(lhs, rhs) == Some(Ordering::Equal)
+}
+
+fn datum_cmp(lhs: &Datum, rhs: &Datum) -> Option<Ordering> {
+ match (lhs, rhs) {
+ (Datum::Bool(a), Datum::Bool(b)) => a.partial_cmp(b),
+ (Datum::TinyInt(a), Datum::TinyInt(b)) => a.partial_cmp(b),
+ (Datum::SmallInt(a), Datum::SmallInt(b)) => a.partial_cmp(b),
+ (Datum::Int(a), Datum::Int(b)) => a.partial_cmp(b),
+ (Datum::Long(a), Datum::Long(b)) => a.partial_cmp(b),
+ (Datum::Float(a), Datum::Float(b)) => a.partial_cmp(b),
+ (Datum::Double(a), Datum::Double(b)) => a.partial_cmp(b),
+ (Datum::String(a), Datum::String(b)) => a.partial_cmp(b),
+ (Datum::Date(a), Datum::Date(b)) => a.partial_cmp(b),
+ (Datum::Time(a), Datum::Time(b)) => a.partial_cmp(b),
+ (
+ Datum::Timestamp {
+ millis: ma,
+ nanos: na,
+ },
+ Datum::Timestamp {
+ millis: mb,
+ nanos: nb,
+ },
+ ) => (ma, na).partial_cmp(&(mb, nb)),
+ (
+ Datum::LocalZonedTimestamp {
+ millis: ma,
+ nanos: na,
+ },
+ Datum::LocalZonedTimestamp {
+ millis: mb,
+ nanos: nb,
+ },
+ ) => (ma, na).partial_cmp(&(mb, nb)),
+ (
+ Datum::Decimal {
+ unscaled: ua,
+ scale: sa,
+ ..
+ },
+ Datum::Decimal {
+ unscaled: ub,
+ scale: sb,
+ ..
+ },
+ ) => decimal_cmp(*ua, *sa, *ub, *sb),
+ (Datum::Bytes(a), Datum::Bytes(b)) => Some(java_bytes_cmp(a, b)),
+ _ => None,
}
}
@@ -156,23 +170,28 @@ impl PartialEq for Datum {
///
/// Normalizes both to the larger scale, then compares unscaled values.
/// E.g. `(10, scale=1)` vs `(100, scale=2)` → both represent 1.0 → equal.
-fn decimal_eq(ua: i128, sa: u32, ub: i128, sb: u32) -> bool {
+fn decimal_cmp(ua: i128, sa: u32, ub: i128, sb: u32) -> Option<Ordering> {
if sa == sb {
- return ua == ub;
+ return ua.partial_cmp(&ub);
}
- // Scale up the side with the smaller scale.
let (na, nb) = if sa < sb {
- match ua.checked_mul(pow10_i128(sb - sa)) {
- Some(scaled) => (scaled, ub),
- None => return false,
- }
+ (ua.checked_mul(pow10_i128(sb - sa))?, ub)
} else {
- match ub.checked_mul(pow10_i128(sa - sb)) {
- Some(scaled) => (ua, scaled),
- None => return false,
- }
+ (ua, ub.checked_mul(pow10_i128(sa - sb))?)
};
- na == nb
+ na.partial_cmp(&nb)
+}
+
+/// Match Java `CompareUtils.compare(byte[], byte[])`, which compares signed
+/// bytes lexicographically.
+fn java_bytes_cmp(a: &[u8], b: &[u8]) -> Ordering {
+ for (&lhs, &rhs) in a.iter().zip(b.iter()) {
+ let cmp = (lhs as i8).cmp(&(rhs as i8));
+ if cmp != Ordering::Equal {
+ return cmp;
+ }
+ }
+ a.len().cmp(&b.len())
}
/// 10^exp as i128. Returns i128::MAX for exponents that would overflow.
@@ -262,23 +281,16 @@ pub enum Predicate {
}
impl Predicate {
- /// Combine predicates with AND, with flattening and constant absorption.
+ /// Combine predicates with AND, with recursive flattening and constant
absorption.
///
/// - `AND(p, AlwaysTrue)` → `p` (identity element filtered out)
/// - `AND(p, AlwaysFalse)` → `AlwaysFalse` (annihilator short-circuits)
- /// - Nested `And` nodes are flattened
+ /// - Nested `And` nodes are recursively flattened
/// - Empty input → `AlwaysTrue`
/// - Single element → unwrapped
pub fn and(predicates: Vec<Predicate>) -> Predicate {
let mut flat = Vec::with_capacity(predicates.len());
- for p in predicates {
- match p {
- Predicate::AlwaysTrue => {}
- Predicate::AlwaysFalse => return Predicate::AlwaysFalse,
- Predicate::And(children) => flat.extend(children),
- other => flat.push(other),
- }
- }
+ Self::flatten_and(predicates, &mut flat);
match flat.len() {
0 => Predicate::AlwaysTrue,
1 => flat.into_iter().next().unwrap(),
@@ -286,23 +298,36 @@ impl Predicate {
}
}
- /// Combine predicates with OR, with flattening and constant absorption.
+ /// Recursively collect non-And children, absorbing constants.
+ fn flatten_and(predicates: Vec<Predicate>, out: &mut Vec<Predicate>) {
+ for p in predicates {
+ match p {
+ Predicate::AlwaysTrue => {}
+ Predicate::AlwaysFalse => {
+ out.clear();
+ out.push(Predicate::AlwaysFalse);
+ return;
+ }
+ Predicate::And(children) => Self::flatten_and(children, out),
+ other => out.push(other),
+ }
+ // Check if a nested flatten hit AlwaysFalse
+ if out.first() == Some(&Predicate::AlwaysFalse) {
+ return;
+ }
+ }
+ }
+
+ /// Combine predicates with OR, with recursive flattening and constant
absorption.
///
/// - `OR(p, AlwaysFalse)` → `p` (identity element filtered out)
/// - `OR(p, AlwaysTrue)` → `AlwaysTrue` (annihilator short-circuits)
- /// - Nested `Or` nodes are flattened
+ /// - Nested `Or` nodes are recursively flattened
/// - Empty input → `AlwaysFalse`
/// - Single element → unwrapped
pub fn or(predicates: Vec<Predicate>) -> Predicate {
let mut flat = Vec::with_capacity(predicates.len());
- for p in predicates {
- match p {
- Predicate::AlwaysFalse => {}
- Predicate::AlwaysTrue => return Predicate::AlwaysTrue,
- Predicate::Or(children) => flat.extend(children),
- other => flat.push(other),
- }
- }
+ Self::flatten_or(predicates, &mut flat);
match flat.len() {
0 => Predicate::AlwaysFalse,
1 => flat.into_iter().next().unwrap(),
@@ -310,6 +335,25 @@ impl Predicate {
}
}
+ /// Recursively collect non-Or children, absorbing constants.
+ fn flatten_or(predicates: Vec<Predicate>, out: &mut Vec<Predicate>) {
+ for p in predicates {
+ match p {
+ Predicate::AlwaysFalse => {}
+ Predicate::AlwaysTrue => {
+ out.clear();
+ out.push(Predicate::AlwaysTrue);
+ return;
+ }
+ Predicate::Or(children) => Self::flatten_or(children, out),
+ other => out.push(other),
+ }
+ if out.first() == Some(&Predicate::AlwaysTrue) {
+ return;
+ }
+ }
+ }
+
/// Negate a predicate with simplification.
///
/// - `NOT(NOT(p))` → `p` (double negation elimination)
@@ -323,6 +367,70 @@ impl Predicate {
other => Predicate::Not(Box::new(other)),
}
}
+
+ /// Split a predicate at AND boundaries into conjuncts (recursive).
+ ///
+ /// Unlike a simple one-level unwrap, this recursively flattens nested
+ /// `And` nodes — necessary because `Predicate` is a public enum and
+ /// callers may construct `And(vec![And(...), ...])` directly without
+ /// going through `Predicate::and()` which auto-flattens.
+ ///
+ /// Reference: Java `PredicateBuilder.splitAnd` which recursively
+ /// splits `CompoundPredicate(And, children)`.
+ pub(crate) fn split_and(self) -> Vec<Predicate> {
+ match self {
+ Predicate::And(children) => children.into_iter().flat_map(|c|
c.split_and()).collect(),
+ other => vec![other],
+ }
+ }
+
+ /// Remap leaf field indices from table schema space to partition row
space.
+ ///
+ /// Returns `Some(remapped)` if *all* leaf nodes in this subtree reference
+ /// partition columns; `None` otherwise. This guarantees safety under
NOT/OR:
+ /// a mixed predicate is never partially remapped.
+ ///
+ /// `mapping` is the output of `field_idx_to_partition_idx`.
+ pub(crate) fn remap_field_index(self, mapping: &[Option<usize>]) ->
Option<Predicate> {
+ match self {
+ Predicate::Leaf {
+ column,
+ index,
+ data_type,
+ op,
+ literals,
+ } => {
+ let new_index = (*mapping.get(index)?)?;
+ Some(Predicate::Leaf {
+ column,
+ index: new_index,
+ data_type,
+ op,
+ literals,
+ })
+ }
+ Predicate::And(children) => {
+ let remapped: Option<Vec<_>> = children
+ .into_iter()
+ .map(|c| c.remap_field_index(mapping))
+ .collect();
+ Some(Predicate::and(remapped?))
+ }
+ Predicate::Or(children) => {
+ let remapped: Option<Vec<_>> = children
+ .into_iter()
+ .map(|c| c.remap_field_index(mapping))
+ .collect();
+ Some(Predicate::or(remapped?))
+ }
+ Predicate::Not(inner) => {
+ let remapped = inner.remap_field_index(mapping)?;
+ Some(Predicate::negate(remapped))
+ }
+ Predicate::AlwaysTrue => Some(Predicate::AlwaysTrue),
+ Predicate::AlwaysFalse => Some(Predicate::AlwaysFalse),
+ }
+ }
}
impl fmt::Display for Predicate {
@@ -591,6 +699,145 @@ pub fn field_idx_to_partition_idx(
.collect()
}
+// ---------------------------------------------------------------------------
+// extract_datum
+// ---------------------------------------------------------------------------
+
+/// Extract a typed `Datum` from a `BinaryRow` field based on `DataType`.
+///
+/// Returns `Ok(None)` if the field is null, `Ok(Some(datum))` on success,
+/// or `Err` if the binary data is malformed.
+pub(crate) fn extract_datum(
+ row: &BinaryRow,
+ pos: usize,
+ data_type: &DataType,
+) -> Result<Option<Datum>> {
+ if row.is_null_at(pos) {
+ return Ok(None);
+ }
+ let datum = match data_type {
+ DataType::Boolean(_) => Datum::Bool(row.get_boolean(pos)?),
+ DataType::TinyInt(_) => Datum::TinyInt(row.get_byte(pos)?),
+ DataType::SmallInt(_) => Datum::SmallInt(row.get_short(pos)?),
+ DataType::Int(_) => Datum::Int(row.get_int(pos)?),
+ DataType::BigInt(_) => Datum::Long(row.get_long(pos)?),
+ DataType::Float(_) => Datum::Float(row.get_float(pos)?),
+ DataType::Double(_) => Datum::Double(row.get_double(pos)?),
+ DataType::Char(_) | DataType::VarChar(_) =>
Datum::String(row.get_string(pos)?.to_string()),
+ DataType::Date(_) => Datum::Date(row.get_int(pos)?),
+ DataType::Time(_) => Datum::Time(row.get_int(pos)?),
+ DataType::Timestamp(ts) => {
+ let (millis, nanos) = row.get_timestamp_raw(pos, ts.precision())?;
+ Datum::Timestamp { millis, nanos }
+ }
+ DataType::LocalZonedTimestamp(ts) => {
+ let (millis, nanos) = row.get_timestamp_raw(pos, ts.precision())?;
+ Datum::LocalZonedTimestamp { millis, nanos }
+ }
+ DataType::Decimal(dec) => {
+ let precision = dec.precision();
+ let scale = dec.scale();
+ let unscaled = row.get_decimal_unscaled(pos, precision)?;
+ Datum::Decimal {
+ unscaled,
+ precision,
+ scale,
+ }
+ }
+ DataType::Binary(_) | DataType::VarBinary(_) =>
Datum::Bytes(row.get_binary(pos)?.to_vec()),
+ other => {
+ return Err(Error::Unsupported {
+ message: format!("extract_datum: unsupported DataType
{other:?}"),
+ });
+ }
+ };
+ Ok(Some(datum))
+}
+
+// ---------------------------------------------------------------------------
+// eval_row
+// ---------------------------------------------------------------------------
+
+/// Evaluate a predicate tree against a `BinaryRow`.
+///
+/// Each `Leaf` carries its own `data_type` (preserved through
`remap_field_index`),
+/// so no external type list is needed.
+///
+/// SQL null semantics: null compared to any value yields `false`.
+pub(crate) fn eval_row(predicate: &Predicate, row: &BinaryRow) -> Result<bool>
{
+ match predicate {
+ Predicate::AlwaysTrue => Ok(true),
+ Predicate::AlwaysFalse => Ok(false),
+ Predicate::And(children) => {
+ for child in children {
+ if !eval_row(child, row)? {
+ return Ok(false);
+ }
+ }
+ Ok(true)
+ }
+ Predicate::Or(children) => {
+ for child in children {
+ if eval_row(child, row)? {
+ return Ok(true);
+ }
+ }
+ Ok(false)
+ }
+ Predicate::Not(inner) => Ok(!eval_row(inner, row)?),
+ Predicate::Leaf {
+ index,
+ data_type,
+ op,
+ literals,
+ ..
+ } => {
+ let datum = extract_datum(row, *index, data_type)?;
+ Ok(eval_leaf(*op, datum.as_ref(), literals))
+ }
+ }
+}
+
+/// Evaluate a single leaf predicate.
+///
+/// This function is infallible: all type decoding happens in `extract_datum`
+/// before this point, and the operator match is exhaustive.
+fn eval_leaf(op: PredicateOperator, datum: Option<&Datum>, literals: &[Datum])
-> bool {
+ match op {
+ PredicateOperator::IsNull => datum.is_none(),
+ PredicateOperator::IsNotNull => datum.is_some(),
+ _ => {
+ // SQL null semantics: NULL op value → false
+ let val = match datum {
+ Some(v) => v,
+ None => return false,
+ };
+ match op {
+ PredicateOperator::Eq => literals.first().is_some_and(|lit|
datum_eq(val, lit)),
+ PredicateOperator::NotEq => literals.first().is_some_and(|lit|
!datum_eq(val, lit)),
+ PredicateOperator::Lt => {
+ literals.first().and_then(|lit| datum_cmp(val, lit)) ==
Some(Ordering::Less)
+ }
+ PredicateOperator::LtEq => matches!(
+ literals.first().and_then(|lit| datum_cmp(val, lit)),
+ Some(Ordering::Less | Ordering::Equal)
+ ),
+ PredicateOperator::Gt => {
+ literals.first().and_then(|lit| datum_cmp(val, lit)) ==
Some(Ordering::Greater)
+ }
+ PredicateOperator::GtEq => matches!(
+ literals.first().and_then(|lit| datum_cmp(val, lit)),
+ Some(Ordering::Greater | Ordering::Equal)
+ ),
+ PredicateOperator::In => literals.iter().any(|lit|
datum_eq(val, lit)),
+ PredicateOperator::NotIn => !literals.iter().any(|lit|
datum_eq(val, lit)),
+ // IsNull/IsNotNull are handled in the outer match above.
+ PredicateOperator::IsNull | PredicateOperator::IsNotNull =>
unreachable!(),
+ }
+ }
+ }
+}
+
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
@@ -775,6 +1022,34 @@ mod tests {
}
}
+ #[test]
+ fn test_and_flattens_deep_nesting() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+ let p2 = pb.equal("id", Datum::Int(2)).unwrap();
+ let p3 = pb.equal("id", Datum::Int(3)).unwrap();
+ let p4 = pb.equal("id", Datum::Int(4)).unwrap();
+
+ // Directly construct nested And via enum (bypassing Predicate::and
flatten).
+ let deep = Predicate::And(vec![Predicate::And(vec![
+ Predicate::And(vec![p1.clone(), p2.clone()]),
+ p3.clone(),
+ ])]);
+ // Now flatten through Predicate::and.
+ let flat = Predicate::and(vec![deep, p4.clone()]);
+
+ match &flat {
+ Predicate::And(children) => {
+ assert_eq!(children.len(), 4);
+ assert_eq!(children[0], p1);
+ assert_eq!(children[1], p2);
+ assert_eq!(children[2], p3);
+ assert_eq!(children[3], p4);
+ }
+ other => panic!("expected And with 4 children, got {other:?}"),
+ }
+ }
+
#[test]
fn test_or_empty() {
assert_eq!(Predicate::or(vec![]), Predicate::AlwaysFalse);
@@ -806,6 +1081,29 @@ mod tests {
}
}
+ #[test]
+ fn test_or_flattens_deep_nesting() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+ let p2 = pb.equal("id", Datum::Int(2)).unwrap();
+ let p3 = pb.equal("id", Datum::Int(3)).unwrap();
+ let p4 = pb.equal("id", Datum::Int(4)).unwrap();
+
+ // Directly construct nested Or via enum (bypassing Predicate::or
flatten).
+ let deep = Predicate::Or(vec![Predicate::Or(vec![
+ Predicate::Or(vec![p1.clone(), p2.clone()]),
+ p3.clone(),
+ ])]);
+ let flat = Predicate::or(vec![deep, p4.clone()]);
+
+ match &flat {
+ Predicate::Or(children) => {
+ assert_eq!(children.len(), 4);
+ }
+ other => panic!("expected Or with 4 children, got {other:?}"),
+ }
+ }
+
#[test]
fn test_not() {
let pb = PredicateBuilder::new(&test_fields());
@@ -917,7 +1215,7 @@ mod tests {
assert!(result.is_err());
}
- // ======================== Empty IN handling ========================
+ // ======================== Empty IN / NOT IN handling
========================
#[test]
fn test_in_empty_returns_always_false() {
@@ -1055,4 +1353,364 @@ mod tests {
};
assert_eq!(a, b);
}
+
+ // ======================== PartialOrd ========================
+
+ #[test]
+ fn test_datum_partial_ord_int() {
+ assert!(Datum::Int(1) < Datum::Int(2));
+ assert!(Datum::Int(2) > Datum::Int(1));
+ assert!(Datum::Int(1) <= Datum::Int(1));
+ assert!(Datum::Int(1) >= Datum::Int(1));
+ }
+
+ #[test]
+ fn test_datum_partial_ord_string() {
+ assert!(Datum::String("a".into()) < Datum::String("b".into()));
+ assert!(Datum::String("b".into()) > Datum::String("a".into()));
+ }
+
+ #[test]
+ fn test_datum_partial_ord_decimal_cross_scale() {
+ // 10 / 10^1 = 1.0 < 200 / 10^2 = 2.0
+ let a = Datum::Decimal {
+ unscaled: 10,
+ precision: 10,
+ scale: 1,
+ };
+ let b = Datum::Decimal {
+ unscaled: 200,
+ precision: 10,
+ scale: 2,
+ };
+ assert!(a < b);
+ }
+
+ #[test]
+ fn test_datum_partial_ord_bytes_matches_java_signed_byte_order() {
+ assert!(Datum::Bytes(vec![0xFF]) < Datum::Bytes(vec![0x00]));
+ }
+
+ #[test]
+ fn test_datum_partial_ord_cross_variant_is_none() {
+ assert_eq!(Datum::Int(1).partial_cmp(&Datum::Long(1)), None);
+ }
+
+ // ======================== eval_row ========================
+
+ /// Minimal BinaryRow builder for predicate evaluation tests.
+ struct TestBinaryRowBuilder {
+ arity: i32,
+ null_bits_size: usize,
+ data: Vec<u8>,
+ }
+
+ impl TestBinaryRowBuilder {
+ fn new(arity: i32) -> Self {
+ let null_bits_size = BinaryRow::cal_bit_set_width_in_bytes(arity)
as usize;
+ let fixed_part_size = null_bits_size + (arity as usize) * 8;
+ Self {
+ arity,
+ null_bits_size,
+ data: vec![0u8; fixed_part_size],
+ }
+ }
+
+ fn field_offset(&self, pos: usize) -> usize {
+ self.null_bits_size + pos * 8
+ }
+
+ fn set_null_at(&mut self, pos: usize) {
+ let bit_index = pos + BinaryRow::HEADER_SIZE_IN_BYTES as usize;
+ let byte_index = bit_index / 8;
+ let bit_offset = bit_index % 8;
+ self.data[byte_index] |= 1 << bit_offset;
+ let offset = self.field_offset(pos);
+ self.data[offset..offset + 8].fill(0);
+ }
+
+ fn write_int(&mut self, pos: usize, value: i32) {
+ let offset = self.field_offset(pos);
+ self.data[offset..offset +
4].copy_from_slice(&value.to_le_bytes());
+ }
+
+ fn build(self) -> BinaryRow {
+ BinaryRow::from_bytes(self.arity, self.data)
+ }
+ }
+
+ fn make_leaf(col: &str, idx: usize, op: PredicateOperator, literals:
Vec<Datum>) -> Predicate {
+ Predicate::Leaf {
+ column: col.into(),
+ index: idx,
+ data_type: DataType::Int(IntType::new()),
+ op,
+ literals,
+ }
+ }
+
+ #[test]
+ fn test_eval_leaf_operators() {
+ // row: [x=10]
+ let mut b = TestBinaryRowBuilder::new(1);
+ b.write_int(0, 10);
+ let row = b.build();
+
+ // Eq
+ assert!(eval_row(
+ &make_leaf("x", 0, PredicateOperator::Eq, vec![Datum::Int(10)]),
+ &row
+ )
+ .unwrap());
+ assert!(!eval_row(
+ &make_leaf("x", 0, PredicateOperator::Eq, vec![Datum::Int(99)]),
+ &row
+ )
+ .unwrap());
+ // NotEq
+ assert!(eval_row(
+ &make_leaf("x", 0, PredicateOperator::NotEq, vec![Datum::Int(99)]),
+ &row
+ )
+ .unwrap());
+ // Lt / LtEq / Gt / GtEq
+ assert!(eval_row(
+ &make_leaf("x", 0, PredicateOperator::Lt, vec![Datum::Int(20)]),
+ &row
+ )
+ .unwrap());
+ assert!(!eval_row(
+ &make_leaf("x", 0, PredicateOperator::Gt, vec![Datum::Int(20)]),
+ &row
+ )
+ .unwrap());
+ assert!(eval_row(
+ &make_leaf("x", 0, PredicateOperator::LtEq, vec![Datum::Int(10)]),
+ &row
+ )
+ .unwrap());
+ assert!(eval_row(
+ &make_leaf("x", 0, PredicateOperator::GtEq, vec![Datum::Int(10)]),
+ &row
+ )
+ .unwrap());
+ // In / NotIn
+ assert!(eval_row(
+ &make_leaf(
+ "x",
+ 0,
+ PredicateOperator::In,
+ vec![Datum::Int(1), Datum::Int(10)]
+ ),
+ &row
+ )
+ .unwrap());
+ assert!(!eval_row(
+ &make_leaf(
+ "x",
+ 0,
+ PredicateOperator::In,
+ vec![Datum::Int(1), Datum::Int(2)]
+ ),
+ &row
+ )
+ .unwrap());
+ // NotIn: 10 not in {1, 2} → true; 10 not in {10, 20} → false
+ assert!(eval_row(
+ &make_leaf(
+ "x",
+ 0,
+ PredicateOperator::NotIn,
+ vec![Datum::Int(1), Datum::Int(2)]
+ ),
+ &row
+ )
+ .unwrap());
+ assert!(!eval_row(
+ &make_leaf(
+ "x",
+ 0,
+ PredicateOperator::NotIn,
+ vec![Datum::Int(10), Datum::Int(20)]
+ ),
+ &row
+ )
+ .unwrap());
+ }
+
+ #[test]
+ fn test_eval_null_semantics() {
+ let mut b = TestBinaryRowBuilder::new(1);
+ b.set_null_at(0);
+ let row = b.build();
+
+ // NULL compared to any value → false (SQL null semantics)
+ assert!(!eval_row(
+ &make_leaf("x", 0, PredicateOperator::Eq, vec![Datum::Int(42)]),
+ &row
+ )
+ .unwrap());
+ // IsNull / IsNotNull
+ assert!(eval_row(&make_leaf("x", 0, PredicateOperator::IsNull,
vec![]), &row).unwrap());
+ assert!(!eval_row(
+ &make_leaf("x", 0, PredicateOperator::IsNotNull, vec![]),
+ &row
+ )
+ .unwrap());
+ }
+
+ #[test]
+ fn test_eval_compound_and_constants() {
+ let mut b = TestBinaryRowBuilder::new(2);
+ b.write_int(0, 10);
+ b.write_int(1, 20);
+ let row = b.build();
+
+ let p_true = make_leaf("a", 0, PredicateOperator::Eq,
vec![Datum::Int(10)]);
+ let p_false = make_leaf("b", 1, PredicateOperator::Eq,
vec![Datum::Int(99)]);
+
+ assert!(!eval_row(&Predicate::and(vec![p_true.clone(),
p_false.clone()]), &row).unwrap());
+ assert!(eval_row(&Predicate::or(vec![p_true.clone(),
p_false.clone()]), &row).unwrap());
+ assert!(!eval_row(&Predicate::negate(p_true), &row).unwrap());
+
+ // Constants
+ let empty_row = TestBinaryRowBuilder::new(0).build();
+ assert!(eval_row(&Predicate::AlwaysTrue, &empty_row).unwrap());
+ assert!(!eval_row(&Predicate::AlwaysFalse, &empty_row).unwrap());
+ }
+
+ // ======================== split_and ========================
+
+ #[test]
+ fn test_split_and() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+ let p2 = pb.equal("dt", Datum::Date(19723)).unwrap();
+
+ // AND → children
+ let parts = Predicate::and(vec![p1.clone(), p2.clone()]).split_and();
+ assert_eq!(parts, vec![p1.clone(), p2]);
+ // Non-AND → single-element vec
+ assert_eq!(p1.clone().split_and(), vec![p1]);
+ }
+
+ #[test]
+ fn test_split_and_recursive_nested() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+ let p2 = pb.equal("dt", Datum::Date(19723)).unwrap();
+ let p3 = pb.equal("hr", Datum::Int(10)).unwrap();
+
+ // Manually construct nested And (bypassing Predicate::and which
flattens).
+ // And(And(p1, p2), p3) should still flatten to [p1, p2, p3].
+ let inner = Predicate::And(vec![p1.clone(), p2.clone()]);
+ let outer = Predicate::And(vec![inner, p3.clone()]);
+ let parts = outer.split_and();
+ assert_eq!(parts, vec![p1, p2, p3]);
+ }
+
+ // ======================== remap_field_index ========================
+
+ #[test]
+ fn test_remap_pure_partition_leaf() {
+ let pb = PredicateBuilder::new(&test_fields()); // [id(0), name(1),
dt(2), hr(3)]
+ let p = pb.equal("dt", Datum::Date(19723)).unwrap(); // index=2
+ let mapping = vec![None, None, Some(0), Some(1)]; // dt→0, hr→1
+
+ let remapped = p.remap_field_index(&mapping).unwrap();
+ match &remapped {
+ Predicate::Leaf { index, column, .. } => {
+ assert_eq!(column, "dt");
+ assert_eq!(*index, 0); // remapped to partition index
+ }
+ other => panic!("expected Leaf, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_remap_non_partition_leaf_returns_none() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p = pb.equal("id", Datum::Int(1)).unwrap(); // index=0, not a
partition key
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ assert!(p.remap_field_index(&mapping).is_none());
+ }
+
+ #[test]
+ fn test_remap_and_all_partition() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p1 = pb.equal("dt", Datum::Date(19723)).unwrap();
+ let p2 = pb.equal("hr", Datum::Int(10)).unwrap();
+ let combined = Predicate::and(vec![p1, p2]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ let remapped = combined.remap_field_index(&mapping).unwrap();
+ match &remapped {
+ Predicate::And(children) => {
+ assert_eq!(children.len(), 2);
+ }
+ other => panic!("expected And, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_remap_or_with_mixed_returns_none() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p_partition = pb.equal("dt", Datum::Date(19723)).unwrap();
+ let p_data = pb.equal("id", Datum::Int(1)).unwrap();
+ let combined = Predicate::or(vec![p_partition, p_data]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ // OR with mixed columns → cannot safely extract → None
+ assert!(combined.remap_field_index(&mapping).is_none());
+ }
+
+ /// Regression test: `eval_row` must propagate errors from `extract_datum`
+ /// as `Err` (fail-fast), not swallow them into `Ok(true)` (fail-open).
+ ///
+ /// This guards the invariant at `table_scan.rs` partition pruning where
+ /// `eval_row(pred, &row)?` was intentionally changed from fail-open to
+ /// fail-fast. An unsupported DataType in a leaf triggers `Err` from
+ /// `extract_datum`; we verify it surfaces through `eval_row`.
+ #[test]
+ fn test_eval_row_propagates_extract_error() {
+ let mut b = TestBinaryRowBuilder::new(1);
+ b.write_int(0, 42);
+ let row = b.build();
+
+ // Leaf with unsupported DataType → extract_datum returns Err.
+ let unsupported_leaf = Predicate::Leaf {
+ column: "arr".into(),
+ index: 0,
+ data_type:
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
+ op: PredicateOperator::Eq,
+ literals: vec![Datum::Int(42)],
+ };
+
+ // Must be Err, not Ok(true).
+ assert!(eval_row(&unsupported_leaf, &row).is_err());
+
+ // Also verify error propagates through compound predicates
(And/Or/Not).
+ let and_pred = Predicate::And(vec![Predicate::AlwaysTrue,
unsupported_leaf.clone()]);
+ assert!(eval_row(&and_pred, &row).is_err());
+
+ let or_pred = Predicate::Or(vec![Predicate::AlwaysFalse,
unsupported_leaf.clone()]);
+ assert!(eval_row(&or_pred, &row).is_err());
+
+ let not_pred = Predicate::Not(Box::new(unsupported_leaf));
+ assert!(eval_row(¬_pred, &row).is_err());
+ }
+
+ #[test]
+ fn test_remap_not_with_mixed_returns_none() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p_partition = pb.equal("dt", Datum::Date(19723)).unwrap();
+ let p_data = pb.greater_than("id", Datum::Int(10)).unwrap();
+ let inner = Predicate::and(vec![p_partition, p_data]);
+ let negated = Predicate::negate(inner);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ // NOT(partition AND data) → mixed under NOT → None
+ assert!(negated.remap_field_index(&mapping).is_none());
+ }
}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index 69e1674..e0aae28 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -22,7 +22,7 @@
use super::{ArrowRecordBatchStream, Table, TableScan};
use crate::arrow::ArrowReaderBuilder;
-use crate::spec::{CoreOptions, DataField};
+use crate::spec::{CoreOptions, DataField, Predicate};
use crate::Result;
use crate::{DataSplit, Error};
use std::collections::{HashMap, HashSet};
@@ -35,6 +35,7 @@ use std::collections::{HashMap, HashSet};
pub struct ReadBuilder<'a> {
table: &'a Table,
projected_fields: Option<Vec<String>>,
+ filter: Option<Predicate>,
}
impl<'a> ReadBuilder<'a> {
@@ -42,6 +43,7 @@ impl<'a> ReadBuilder<'a> {
Self {
table,
projected_fields: None,
+ filter: None,
}
}
@@ -53,9 +55,26 @@ impl<'a> ReadBuilder<'a> {
self
}
+ /// Set a filter predicate for scan planning.
+ ///
+ /// The predicate should use table schema field indices (as produced by
+ /// [`PredicateBuilder`]). During [`TableScan::plan`] the filter is
+ /// decomposed at AND boundaries: partition-only conjuncts are extracted
+ /// and used to prune partitions; all other conjuncts are currently
+ /// **ignored** (neither `TableScan` nor `TableRead` applies data-level
+ /// predicates yet).
+ ///
+ /// This means rows returned by `TableRead` may **not** satisfy the full
+ /// filter — callers must apply remaining predicates themselves until
+ /// data-level pushdown is implemented.
+ pub fn with_filter(&mut self, filter: Predicate) -> &mut Self {
+ self.filter = Some(filter);
+ self
+ }
+
/// Create a table scan. Call [TableScan::plan] to get splits.
pub fn new_scan(&self) -> TableScan<'a> {
- TableScan::new(self.table)
+ TableScan::new(self.table, self.filter.clone())
}
/// Create a table read for consuming splits (e.g. from a scan plan).
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 1c3454a..031d15f 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -23,7 +23,8 @@
use super::Table;
use crate::io::FileIO;
use crate::spec::{
- BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry,
PartitionComputer, Snapshot,
+ eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions, FileKind,
IndexManifest,
+ ManifestEntry, PartitionComputer, Predicate, Snapshot,
};
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket,
Plan};
@@ -151,17 +152,32 @@ fn merge_manifest_entries(entries: Vec<ManifestEntry>) ->
Vec<ManifestEntry> {
.collect()
}
+/// Evaluate a partition predicate against serialized manifest partition bytes.
+///
+/// - `BinaryRow::from_serialized_bytes` failure → fail-open (`Ok(true)`)
+/// - `eval_row` failure → fail-fast (`Err(_)`)
+fn partition_matches_predicate(
+ serialized_partition: &[u8],
+ predicate: &Predicate,
+) -> crate::Result<bool> {
+ match BinaryRow::from_serialized_bytes(serialized_partition) {
+ Ok(row) => eval_row(predicate, &row),
+ Err(_) => Ok(true),
+ }
+}
+
/// TableScan for full table scan (no incremental, no predicate).
///
/// Reference:
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
#[derive(Debug, Clone)]
pub struct TableScan<'a> {
table: &'a Table,
+ filter: Option<Predicate>,
}
impl<'a> TableScan<'a> {
- pub fn new(table: &'a Table) -> Self {
- Self { table }
+ pub fn new(table: &'a Table, filter: Option<Predicate>) -> Self {
+ Self { table, filter }
}
/// Plan the full scan: read latest snapshot, manifest list, manifest
entries, then build DataSplits using bin packing.
@@ -191,6 +207,58 @@ impl<'a> TableScan<'a> {
return Ok(Plan::new(Vec::new()));
}
+ // --- Partition predicate extraction ---
+ let partition_keys = self.table.schema().partition_keys();
+ let partition_predicate = if !partition_keys.is_empty() {
+ self.filter.clone().and_then(|filter| {
+ let mapping =
+ field_idx_to_partition_idx(self.table.schema().fields(),
partition_keys);
+ let conjuncts = filter.split_and();
+ let remapped: Vec<Predicate> = conjuncts
+ .into_iter()
+ .filter_map(|c| c.remap_field_index(&mapping))
+ .collect();
+ if remapped.is_empty() {
+ None
+ } else {
+ Some(Predicate::and(remapped))
+ }
+ })
+ } else {
+ None
+ };
+
+ // --- Partition pruning: filter manifest entries before grouping ---
+ //
+ // Note: split construction later still requires a decodable BinaryRow
+ // and will fail on corrupt partition bytes. Pruning is intentionally
+ // best-effort; split construction is mandatory.
+ let entries = if let Some(ref pred) = partition_predicate {
+ let mut kept = Vec::with_capacity(entries.len());
+ // Cache: partition bytes → accept/reject to avoid re-decoding.
+ let mut cache: HashMap<Vec<u8>, bool> = HashMap::new();
+ for e in entries {
+ let accept = match cache.get(e.partition()) {
+ Some(&cached) => cached,
+ None => {
+ let partition_bytes = e.partition();
+ let accept =
partition_matches_predicate(partition_bytes, pred)?;
+ cache.insert(partition_bytes.to_vec(), accept);
+ accept
+ }
+ };
+ if accept {
+ kept.push(e);
+ }
+ }
+ kept
+ } else {
+ entries
+ };
+ if entries.is_empty() {
+ return Ok(Plan::new(Vec::new()));
+ }
+
// Group by (partition, bucket). Key = (partition_bytes, bucket).
let mut groups: HashMap<(Vec<u8>, i32), Vec<ManifestEntry>> =
HashMap::new();
for e in entries {
@@ -202,7 +270,6 @@ impl<'a> TableScan<'a> {
let base_path = table_path.trim_end_matches('/');
let mut splits = Vec::new();
- let partition_keys = self.table.schema().partition_keys();
let partition_computer = if !partition_keys.is_empty() {
Some(PartitionComputer::new(
partition_keys,
@@ -279,3 +346,90 @@ impl<'a> TableScan<'a> {
Ok(Plan::new(splits))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::partition_matches_predicate;
+ use crate::spec::{
+ ArrayType, DataField, DataType, Datum, IntType, Predicate,
PredicateBuilder,
+ PredicateOperator, VarCharType,
+ };
+ use crate::Error;
+
+ struct SerializedBinaryRowBuilder {
+ arity: i32,
+ null_bits_size: usize,
+ data: Vec<u8>,
+ }
+
+ impl SerializedBinaryRowBuilder {
+ fn new(arity: i32) -> Self {
+ let null_bits_size =
crate::spec::BinaryRow::cal_bit_set_width_in_bytes(arity) as usize;
+ let fixed_part_size = null_bits_size + (arity as usize) * 8;
+ Self {
+ arity,
+ null_bits_size,
+ data: vec![0u8; fixed_part_size],
+ }
+ }
+
+ fn field_offset(&self, pos: usize) -> usize {
+ self.null_bits_size + pos * 8
+ }
+
+ fn write_string(&mut self, pos: usize, value: &str) {
+ let var_offset = self.data.len();
+ self.data.extend_from_slice(value.as_bytes());
+ let encoded = ((var_offset as u64) << 32) | (value.len() as u64);
+ let offset = self.field_offset(pos);
+ self.data[offset..offset +
8].copy_from_slice(&encoded.to_le_bytes());
+ }
+
+ fn build_serialized(self) -> Vec<u8> {
+ let mut serialized = Vec::with_capacity(4 + self.data.len());
+ serialized.extend_from_slice(&self.arity.to_be_bytes());
+ serialized.extend_from_slice(&self.data);
+ serialized
+ }
+ }
+
+ fn partition_string_field() -> Vec<DataField> {
+ vec![DataField::new(
+ 0,
+ "dt".to_string(),
+ DataType::VarChar(VarCharType::default()),
+ )]
+ }
+
+ #[test]
+ fn test_partition_matches_predicate_decode_failure_fails_open() {
+ let predicate = PredicateBuilder::new(&partition_string_field())
+ .equal("dt", Datum::String("2024-01-01".into()))
+ .unwrap();
+
+ assert!(partition_matches_predicate(&[0xFF, 0x00],
&predicate).unwrap());
+ }
+
+ #[test]
+ fn test_partition_matches_predicate_eval_error_fails_fast() {
+ let mut builder = SerializedBinaryRowBuilder::new(1);
+ builder.write_string(0, "2024-01-01");
+ let serialized = builder.build_serialized();
+
+ let predicate = Predicate::Leaf {
+ column: "dt".into(),
+ index: 0,
+ data_type:
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
+ op: PredicateOperator::Eq,
+ literals: vec![Datum::Int(42)],
+ };
+
+ let err = partition_matches_predicate(&serialized, &predicate)
+ .expect_err("eval_row error should propagate");
+
+ assert!(
+ matches!(&err, Error::Unsupported { message } if
message.contains("extract_datum")),
+ "Expected extract_datum unsupported error, got: {err:?}"
+ );
+ }
+}