This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new ae09718 feat: support IN and NOT IN operators in partition filter
(#566)
ae09718 is described below
commit ae097181d4052db1ead599cc8a6ac252ae903ed2
Author: Yunchi Pang <[email protected]>
AuthorDate: Sun Mar 29 23:01:02 2026 -0400
feat: support IN and NOT IN operators in partition filter (#566)
---------
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/core/src/config/table.rs | 5 +
crates/core/src/expr/filter.rs | 267 ++++++++++++++++++---
crates/core/src/expr/mod.rs | 22 +-
crates/core/src/keygen/mod.rs | 28 +++
crates/core/src/keygen/timestamp_based.rs | 378 ++++++++++++++++++++----------
crates/core/src/table/file_pruner.rs | 44 +++-
crates/core/src/table/mod.rs | 14 +-
crates/core/src/table/partition.rs | 171 +++++++-------
crates/datafusion/src/util/expr.rs | 129 +++++++++-
9 files changed, 799 insertions(+), 259 deletions(-)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 6ff9169..83c1c70 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -78,6 +78,9 @@ pub enum HudiTableConfig {
/// Key Generator class property for the hoodie table
KeyGeneratorClass,
+ /// Key Generator type property for the hoodie table (v8+)
+ KeyGeneratorType,
+
/// Fields used to partition the table. Concatenated values of these
fields are used as
/// the partition path, by invoking toString().
/// These fields also include the partition type which is used by custom
key generators
@@ -156,6 +159,7 @@ impl AsRef<str> for HudiTableConfig {
Self::IsHiveStylePartitioning =>
"hoodie.datasource.write.hive_style_partitioning",
Self::IsPartitionPathUrlencoded =>
"hoodie.datasource.write.partitionpath.urlencode",
Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
+ Self::KeyGeneratorType => "hoodie.table.keygenerator.type",
Self::PartitionFields => "hoodie.table.partition.fields",
Self::PrecombineField => "hoodie.table.precombine.field",
Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
@@ -245,6 +249,7 @@ impl ConfigParser for HudiTableConfig {
})
.map(HudiConfigValue::Boolean),
Self::KeyGeneratorClass => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::KeyGeneratorType => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::PartitionFields => get_result
.map(|v|
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Self::PrecombineField => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 060841f..791a556 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -20,6 +20,7 @@
use crate::Result;
use crate::error::CoreError;
use crate::expr::ExprOperator;
+use arrow_arith::boolean;
use arrow_array::{ArrayRef, BooleanArray, Datum, Scalar, StringArray};
use arrow_cast::{CastOptions, cast_with_options};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
@@ -30,7 +31,29 @@ use std::str::FromStr;
pub struct Filter {
pub field_name: String,
pub operator: ExprOperator,
- pub field_value: String,
+ pub values: Vec<String>,
+}
+
+impl Filter {
+ pub fn new(field_name: String, operator: ExprOperator, values:
Vec<String>) -> Result<Self> {
+ if operator.is_multi_value() {
+ if values.is_empty() {
+ return Err(CoreError::Schema(format!(
+ "{operator} operator requires at least one value for field
'{field_name}'"
+ )));
+ }
+ } else if values.len() != 1 {
+ return Err(CoreError::Schema(format!(
+ "Operator {operator} requires exactly one value for field
'{field_name}', got {}",
+ values.len()
+ )));
+ }
+ Ok(Self {
+ field_name,
+ operator,
+ values,
+ })
+ }
}
impl Filter {
@@ -44,11 +67,8 @@ impl Filter {
impl From<Filter> for (String, String, String) {
fn from(filter: Filter) -> Self {
- (
- filter.field_name,
- filter.operator.to_string(),
- filter.field_value,
- )
+ let value_str = filter.values.join(",");
+ (filter.field_name, filter.operator.to_string(), value_str)
}
}
@@ -57,18 +77,18 @@ impl TryFrom<(&str, &str, &str)> for Filter {
fn try_from(binary_expr_tuple: (&str, &str, &str)) -> Result<Self,
Self::Error> {
let (field_name, operator_str, field_value) = binary_expr_tuple;
-
let field_name = field_name.to_string();
-
let operator = ExprOperator::from_str(operator_str)?;
-
- let field_value = field_value.to_string();
-
- Ok(Filter {
- field_name,
- operator,
- field_value,
- })
+ let values = if operator.is_multi_value() {
+ field_value
+ .split(',')
+ .map(|s| s.trim().to_string())
+ .filter(|s| !s.is_empty())
+ .collect()
+ } else {
+ vec![field_value.to_string()]
+ };
+ Filter::new(field_name, operator, values)
}
}
@@ -100,7 +120,7 @@ impl FilterField {
Filter {
field_name: self.name.clone(),
operator: ExprOperator::Eq,
- field_value: value.into(),
+ values: vec![value.into()],
}
}
@@ -108,7 +128,7 @@ impl FilterField {
Filter {
field_name: self.name.clone(),
operator: ExprOperator::Ne,
- field_value: value.into(),
+ values: vec![value.into()],
}
}
@@ -116,7 +136,7 @@ impl FilterField {
Filter {
field_name: self.name.clone(),
operator: ExprOperator::Lt,
- field_value: value.into(),
+ values: vec![value.into()],
}
}
@@ -124,7 +144,7 @@ impl FilterField {
Filter {
field_name: self.name.clone(),
operator: ExprOperator::Lte,
- field_value: value.into(),
+ values: vec![value.into()],
}
}
@@ -132,7 +152,7 @@ impl FilterField {
Filter {
field_name: self.name.clone(),
operator: ExprOperator::Gt,
- field_value: value.into(),
+ values: vec![value.into()],
}
}
@@ -140,7 +160,31 @@ impl FilterField {
Filter {
field_name: self.name.clone(),
operator: ExprOperator::Gte,
- field_value: value.into(),
+ values: vec![value.into()],
+ }
+ }
+
+ pub fn in_list<I, S>(&self, values: I) -> Filter
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<String>,
+ {
+ Filter {
+ field_name: self.name.clone(),
+ operator: ExprOperator::In,
+ values: values.into_iter().map(|v| v.into()).collect(),
+ }
+ }
+
+ pub fn not_in_list<I, S>(&self, values: I) -> Filter
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<String>,
+ {
+ Filter {
+ field_name: self.name.clone(),
+ operator: ExprOperator::NotIn,
+ values: values.into_iter().map(|v| v.into()).collect(),
}
}
}
@@ -153,7 +197,7 @@ pub fn col(name: impl Into<String>) -> FilterField {
pub struct SchemableFilter {
pub field: Field,
pub operator: ExprOperator,
- pub value: Scalar<ArrayRef>,
+ pub values: Vec<Scalar<ArrayRef>>,
}
impl TryFrom<(Filter, &Schema)> for SchemableFilter {
@@ -166,14 +210,19 @@ impl TryFrom<(Filter, &Schema)> for SchemableFilter {
})?;
let operator = filter.operator;
- let value = &[filter.field_value.as_str()];
- let value = Self::cast_value(value, field.data_type())?;
+
+ let values: Result<Vec<_>> = filter
+ .values
+ .iter()
+ .map(|v| Self::cast_value(&[v.as_str()], field.data_type()))
+ .collect();
+ let values = values?;
let field = field.clone();
Ok(SchemableFilter {
field,
operator,
- value,
+ values,
})
}
}
@@ -195,12 +244,30 @@ impl SchemableFilter {
pub fn apply_comparison(&self, value: &dyn Datum) -> Result<BooleanArray> {
match self.operator {
- ExprOperator::Eq => eq(value, &self.value),
- ExprOperator::Ne => neq(value, &self.value),
- ExprOperator::Lt => lt(value, &self.value),
- ExprOperator::Lte => lt_eq(value, &self.value),
- ExprOperator::Gt => gt(value, &self.value),
- ExprOperator::Gte => gt_eq(value, &self.value),
+ ExprOperator::Eq => eq(value, &self.values[0]),
+ ExprOperator::Ne => neq(value, &self.values[0]),
+ ExprOperator::Lt => lt(value, &self.values[0]),
+ ExprOperator::Lte => lt_eq(value, &self.values[0]),
+ ExprOperator::Gt => gt(value, &self.values[0]),
+ ExprOperator::Gte => gt_eq(value, &self.values[0]),
+ ExprOperator::In => {
+ // IN: value == values[0] OR value == values[1] OR ...
+ let mut result = eq(value, &self.values[0])?;
+ for filter_value in &self.values[1..] {
+ let comparison = eq(value, filter_value)?;
+ result = boolean::or(&result, &comparison)?;
+ }
+ Ok(result)
+ }
+ ExprOperator::NotIn => {
+ // NOT IN: value != values[0] AND value != values[1] AND ...
+ let mut result = neq(value, &self.values[0])?;
+ for filter_value in &self.values[1..] {
+ let comparison = neq(value, filter_value)?;
+ result = boolean::and(&result, &comparison)?;
+ }
+ Ok(result)
+ }
}
.map_err(|e| e.into())
}
@@ -227,7 +294,7 @@ mod tests {
let string_filter = Filter {
field_name: "string_col".to_string(),
operator: ExprOperator::Eq,
- field_value: "test_value".to_string(),
+ values: vec!["test_value".to_string()],
};
let schemable = SchemableFilter::try_from((string_filter, &schema))?;
@@ -239,7 +306,7 @@ mod tests {
let int_filter = Filter {
field_name: "int_col".to_string(),
operator: ExprOperator::Gt,
- field_value: "42".to_string(),
+ values: vec!["42".to_string()],
};
let schemable = SchemableFilter::try_from((int_filter, &schema))?;
@@ -251,7 +318,7 @@ mod tests {
let invalid_filter = Filter {
field_name: "non_existent".to_string(),
operator: ExprOperator::Eq,
- field_value: "value".to_string(),
+ values: vec!["value".to_string()],
};
assert!(SchemableFilter::try_from((invalid_filter, &schema)).is_err());
@@ -259,6 +326,31 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_filter_in_empty_values_error() -> Result<()> {
+ // IN operator with empty values should error at construction
+ let result = Filter::new("int_col".to_string(), ExprOperator::In,
vec![]);
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("at least one value")
+ );
+
+ // NOT IN operator with empty values should also error at construction
+ let result = Filter::new("int_col".to_string(), ExprOperator::NotIn,
vec![]);
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("at least one value")
+ );
+
+ Ok(())
+ }
+
#[test]
fn test_schemable_filter_cast_value() -> Result<()> {
// Test casting to string
@@ -284,7 +376,7 @@ mod tests {
let eq_filter = Filter {
field_name: "string_col".to_string(),
operator: ExprOperator::Eq,
- field_value: "test".to_string(),
+ values: vec!["test".to_string()],
};
let schemable = SchemableFilter::try_from((eq_filter, &schema))?;
@@ -296,7 +388,7 @@ mod tests {
let gt_filter = Filter {
field_name: "int_col".to_string(),
operator: ExprOperator::Gt,
- field_value: "50".to_string(),
+ values: vec!["50".to_string()],
};
let schemable = SchemableFilter::try_from((gt_filter, &schema))?;
@@ -325,7 +417,7 @@ mod tests {
let filter = Filter {
field_name: "int_col".to_string(),
operator,
- field_value: value.to_string(),
+ values: vec![value.to_string()],
};
let schemable = SchemableFilter::try_from((filter, &schema))?;
@@ -339,4 +431,103 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_schemable_filter_in() -> Result<()> {
+ let schema = create_test_schema();
+
+ let in_filter = Filter::new(
+ "string_col".to_string(),
+ ExprOperator::In,
+ vec!["foo".to_string(), "bar".to_string()],
+ )
+ .unwrap();
+
+ let schemable = SchemableFilter::try_from((in_filter, &schema))?;
+ let test_array = StringArray::from(vec!["foo", "baz", "bar", "qux"]);
+ let result = schemable.apply_comparison(&test_array)?;
+ assert_eq!(result, BooleanArray::from(vec![true, false, true, false]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schemable_filter_not_in() -> Result<()> {
+ let schema = create_test_schema();
+
+ let not_in_filter = Filter::new(
+ "string_col".to_string(),
+ ExprOperator::NotIn,
+ vec!["foo".to_string(), "bar".to_string()],
+ )
+ .unwrap();
+
+ let schemable = SchemableFilter::try_from((not_in_filter, &schema))?;
+ let test_array = StringArray::from(vec!["foo", "baz", "bar", "qux"]);
+ let result = schemable.apply_comparison(&test_array)?;
+ assert_eq!(result, BooleanArray::from(vec![false, true, false, true]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schemable_filter_in_with_integers() -> Result<()> {
+ let schema = create_test_schema();
+
+ let in_filter = Filter::new(
+ "int_col".to_string(),
+ ExprOperator::In,
+ vec!["40".to_string(), "60".to_string()],
+ )
+ .unwrap();
+
+ let schemable = SchemableFilter::try_from((in_filter, &schema))?;
+ let test_array = Int64Array::from(vec![40, 50, 60]);
+ let result = schemable.apply_comparison(&test_array)?;
+ assert_eq!(result, BooleanArray::from(vec![true, false, true]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_filter_try_from_tuple_in_operator() -> Result<()> {
+ // TryFrom should parse comma-separated values for IN
+ let filter = Filter::try_from(("col", "IN", "a,b,c"))?;
+ assert_eq!(filter.operator, ExprOperator::In);
+ assert_eq!(filter.values, vec!["a", "b", "c"]);
+
+ // TryFrom should parse comma-separated values for NOT IN
+ let filter = Filter::try_from(("col", "NOT IN", "x, y"))?;
+ assert_eq!(filter.operator, ExprOperator::NotIn);
+ assert_eq!(filter.values, vec!["x", "y"]);
+
+ // Empty value string for IN should error
+ let result = Filter::try_from(("col", "IN", ""));
+ assert!(result.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_filter_roundtrip_in_operator() -> Result<()> {
+ // Round-trip: Filter -> (String, String, String) -> Filter
+ let original = Filter::new(
+ "col".to_string(),
+ ExprOperator::In,
+ vec!["a".to_string(), "b".to_string()],
+ )
+ .unwrap();
+
+ let tuple: (String, String, String) = original.into();
+ assert_eq!(
+ tuple,
+ ("col".to_string(), "IN".to_string(), "a,b".to_string())
+ );
+
+ let restored = Filter::try_from((tuple.0.as_str(), tuple.1.as_str(),
tuple.2.as_str()))?;
+ assert_eq!(restored.operator, ExprOperator::In);
+ assert_eq!(restored.values, vec!["a", "b"]);
+
+ Ok(())
+ }
}
diff --git a/crates/core/src/expr/mod.rs b/crates/core/src/expr/mod.rs
index d98b6a7..cc7c84c 100644
--- a/crates/core/src/expr/mod.rs
+++ b/crates/core/src/expr/mod.rs
@@ -35,6 +35,8 @@ pub enum ExprOperator {
Lte,
Gt,
Gte,
+ In,
+ NotIn,
}
impl Display for ExprOperator {
@@ -47,20 +49,29 @@ impl Display for ExprOperator {
ExprOperator::Lte => write!(f, "<="),
ExprOperator::Gt => write!(f, ">"),
ExprOperator::Gte => write!(f, ">="),
+ ExprOperator::In => write!(f, "IN"),
+ ExprOperator::NotIn => write!(f, "NOT IN"),
}
}
}
impl ExprOperator {
- pub const TOKEN_OP_PAIRS: [(&'static str, ExprOperator); 6] = [
+ pub const TOKEN_OP_PAIRS: [(&'static str, ExprOperator); 8] = [
("=", ExprOperator::Eq),
("!=", ExprOperator::Ne),
("<", ExprOperator::Lt),
("<=", ExprOperator::Lte),
(">", ExprOperator::Gt),
(">=", ExprOperator::Gte),
+ ("IN", ExprOperator::In),
+ ("NOT IN", ExprOperator::NotIn),
];
+ /// Returns true if the operator expects multiple values (IN, NOT IN).
+ pub fn is_multi_value(&self) -> bool {
+ matches!(self, ExprOperator::In | ExprOperator::NotIn)
+ }
+
/// Negates the operator.
pub fn negate(&self) -> Option<ExprOperator> {
match self {
@@ -70,6 +81,8 @@ impl ExprOperator {
ExprOperator::Lte => Some(ExprOperator::Gt),
ExprOperator::Gt => Some(ExprOperator::Lte),
ExprOperator::Gte => Some(ExprOperator::Lt),
+ ExprOperator::In => Some(ExprOperator::NotIn),
+ ExprOperator::NotIn => Some(ExprOperator::In),
}
}
}
@@ -103,6 +116,11 @@ mod tests {
assert_eq!(ExprOperator::from_str("<=").unwrap(), ExprOperator::Lte);
assert_eq!(ExprOperator::from_str(">").unwrap(), ExprOperator::Gt);
assert_eq!(ExprOperator::from_str(">=").unwrap(), ExprOperator::Gte);
+ assert_eq!(ExprOperator::from_str("IN").unwrap(), ExprOperator::In);
+ assert_eq!(
+ ExprOperator::from_str("NOT IN").unwrap(),
+ ExprOperator::NotIn
+ );
assert!(ExprOperator::from_str("??").is_err());
}
@@ -114,5 +132,7 @@ mod tests {
assert_eq!(ExprOperator::Lte.to_string(), "<=");
assert_eq!(ExprOperator::Gt.to_string(), ">");
assert_eq!(ExprOperator::Gte.to_string(), ">=");
+ assert_eq!(ExprOperator::In.to_string(), "IN");
+ assert_eq!(ExprOperator::NotIn.to_string(), "NOT IN");
}
}
diff --git a/crates/core/src/keygen/mod.rs b/crates/core/src/keygen/mod.rs
index 029ed95..350b053 100644
--- a/crates/core/src/keygen/mod.rs
+++ b/crates/core/src/keygen/mod.rs
@@ -22,8 +22,36 @@
pub mod timestamp_based;
use crate::Result;
+use crate::config::HudiConfigs;
+use crate::config::table::HudiTableConfig::{KeyGeneratorClass,
KeyGeneratorType};
use crate::expr::filter::Filter;
+/// Returns true if the table uses a timestamp-based key generator,
+/// checking both `hoodie.table.keygenerator.class` and
+/// `hoodie.table.keygenerator.type`.
+pub fn is_timestamp_based_keygen(hudi_configs: &HudiConfigs) -> bool {
+ let by_class: bool = hudi_configs
+ .try_get(KeyGeneratorClass)
+ .map(|v| {
+ let s: String = v.into();
+ s.contains("TimestampBasedKeyGenerator")
+ })
+ .unwrap_or(false);
+
+ if by_class {
+ return true;
+ }
+
+ hudi_configs
+ .try_get(KeyGeneratorType)
+ .map(|v| {
+ let s: String = v.into();
+ let upper = s.to_uppercase();
+ upper == "TIMESTAMP" || upper == "TIMESTAMP_AVRO"
+ })
+ .unwrap_or(false)
+}
+
/// Trait for key generators that can transform user filters on data columns
/// to filters on partition path columns.
pub trait KeyGeneratorFilterTransformer {
diff --git a/crates/core/src/keygen/timestamp_based.rs
b/crates/core/src/keygen/timestamp_based.rs
index 3869204..3524739 100644
--- a/crates/core/src/keygen/timestamp_based.rs
+++ b/crates/core/src/keygen/timestamp_based.rs
@@ -25,6 +25,7 @@ use crate::error::CoreError;
use crate::expr::ExprOperator;
use crate::expr::filter::Filter;
use crate::keygen::KeyGeneratorFilterTransformer;
+use crate::metadata::meta_field::MetaField;
use chrono::{DateTime, Datelike, NaiveDateTime, TimeZone, Timelike, Utc};
use chrono_tz::Tz;
use std::collections::HashMap;
@@ -57,7 +58,6 @@ pub struct TimestampBasedKeyGenerator {
output_timezone: Tz,
/// Whether partitions use Hive-style naming (e.g., year=2024 vs 2024)
- #[allow(dead_code)]
is_hive_style: bool,
/// Partition field names derived from output format (e.g., ["year",
"month", "day", "hour"])
@@ -418,29 +418,81 @@ impl TimestampBasedKeyGenerator {
.replace("'T'", "T")
}
- /// Extracts partition values from a datetime based on output format,
- /// applying the configured output timezone.
- fn extract_partition_values(&self, dt: &DateTime<Utc>) -> HashMap<String,
String> {
+ /// Formats a datetime into the full partition path string.
+ ///
+ /// For non-hive-style with format `yyyy/MM/dd`: `"2023/04/15"`
+ /// For hive-style with format `yyyy/MM/dd`: `"year=2023/month=04/day=15"`
+ fn format_partition_path(&self, dt: &DateTime<Utc>) -> String {
let local_dt = dt.with_timezone(&self.output_timezone);
- let mut values = HashMap::new();
-
- let segments: Vec<&str> = self.output_dateformat.split('/').collect();
-
- for (i, segment) in segments.iter().enumerate() {
- let field_name = &self.partition_fields[i];
- let value = match *segment {
- "yyyy" => format!("{:04}", local_dt.year()),
- "MM" => format!("{:02}", local_dt.month()),
- "dd" => format!("{:02}", local_dt.day()),
- "HH" => format!("{:02}", local_dt.hour()),
- "mm" => format!("{:02}", local_dt.minute()),
- "ss" => format!("{:02}", local_dt.second()),
- _ => segment.to_string(),
- };
- values.insert(field_name.clone(), value);
+
+ let segments: Vec<String> = self
+ .output_dateformat
+ .split('/')
+ .enumerate()
+ .map(|(i, segment)| {
+ let value = Self::format_segment_value(segment, &local_dt);
+ if self.is_hive_style {
+ let field_name = &self.partition_fields[i];
+ format!("{field_name}={value}")
+ } else {
+ value
+ }
+ })
+ .collect();
+
+ segments.join("/")
+ }
+
+ /// Formats a single date segment into its value string.
+ ///
+ /// Supports both simple tokens like `"yyyy"` and compound patterns like
+ /// `"yyyyMMdd"` or `"yyyy-MM-dd"` by progressively replacing known tokens.
+ fn format_segment_value<T: Datelike + Timelike>(segment: &str, dt: &T) ->
String {
+ // Order matters: longer tokens must not be partially consumed by
shorter ones.
+ // "mm" (minute) vs "MM" (month) are case-sensitive so they don't
conflict.
+ segment
+ .replace("yyyy", &format!("{:04}", dt.year()))
+ .replace("MM", &format!("{:02}", dt.month()))
+ .replace("dd", &format!("{:02}", dt.day()))
+ .replace("HH", &format!("{:02}", dt.hour()))
+ .replace("mm", &format!("{:02}", dt.minute()))
+ .replace("ss", &format!("{:02}", dt.second()))
+ }
+
+ /// Returns true if the output date format produces lexicographically
sortable
+ /// partition paths (i.e., string comparison preserves chronological
order).
+ ///
+ /// This requires date tokens across the full format to appear in strict
+ /// descending significance: year > month > day > hour > minute > second.
+ /// Supports both `/`-separated formats like `yyyy/MM/dd` and compound
+ /// formats like `yyyyMMdd` or `yyyy-MM-dd`.
+ fn is_lex_sortable_format(&self) -> bool {
+ // Known tokens in search order (longest first to avoid partial
matches)
+ const TOKENS: &[(&str, u8)] = &[
+ ("yyyy", 6),
+ ("MM", 5),
+ ("dd", 4),
+ ("HH", 3),
+ ("mm", 2),
+ ("ss", 1),
+ ];
+
+ // Extract all token ranks in order of appearance
+ let mut ranks: Vec<u8> = Vec::new();
+ let mut remaining = self.output_dateformat.as_str();
+
+ while !remaining.is_empty() {
+ if let Some((token, rank)) = TOKENS.iter().find(|(t, _)|
remaining.starts_with(t)) {
+ ranks.push(*rank);
+ remaining = &remaining[token.len()..];
+ } else {
+ // Skip non-token characters (separators like '/', '-', etc.)
+ remaining = &remaining[1..];
+ }
}
- values
+ // Must have at least one token and be in strictly descending order
+ !ranks.is_empty() && ranks.windows(2).all(|w| w[0] > w[1])
}
}
@@ -454,57 +506,53 @@ impl KeyGeneratorFilterTransformer for
TimestampBasedKeyGenerator {
return Ok(vec![filter.clone()]);
}
- let dt = self.parse_timestamp(&filter.field_value)?;
- let partition_values = self.extract_partition_values(&dt);
-
- let mut filters = Vec::new();
+ let field_name = MetaField::PartitionPath.as_ref().to_string();
match filter.operator {
- ExprOperator::Eq => {
- for field_name in &self.partition_fields {
- if let Some(value) = partition_values.get(field_name) {
- filters.push(Filter {
- field_name: field_name.clone(),
- operator: ExprOperator::Eq,
- field_value: value.clone(),
- });
- }
- }
+ ExprOperator::Eq | ExprOperator::Ne => {
+ let dt = self.parse_timestamp(&filter.values[0])?;
+ let path = self.format_partition_path(&dt);
+ Ok(vec![Filter {
+ field_name,
+ operator: filter.operator,
+ values: vec![path],
+ }])
}
- ExprOperator::Gte | ExprOperator::Gt => {
- // Only compare the first partition field for simplicity.
- // May scan more partitions than necessary but avoids complex
multi-field range logic.
- if let Some(first_field) = self.partition_fields.first() {
- if let Some(value) = partition_values.get(first_field) {
- filters.push(Filter {
- field_name: first_field.clone(),
- operator: ExprOperator::Gte,
- field_value: value.clone(),
- });
- }
- }
+ ExprOperator::In | ExprOperator::NotIn => {
+ let paths: Result<Vec<String>> = filter
+ .values
+ .iter()
+ .map(|v| {
+ let dt = self.parse_timestamp(v)?;
+ Ok(self.format_partition_path(&dt))
+ })
+ .collect();
+ Ok(vec![Filter {
+ field_name,
+ operator: filter.operator,
+ values: paths?,
+ }])
}
- ExprOperator::Lte | ExprOperator::Lt => {
- if let Some(first_field) = self.partition_fields.first() {
- if let Some(value) = partition_values.get(first_field) {
- filters.push(Filter {
- field_name: first_field.clone(),
- operator: ExprOperator::Lte,
- field_value: value.clone(),
- });
- }
+ ExprOperator::Gt | ExprOperator::Gte | ExprOperator::Lt |
ExprOperator::Lte => {
+ if !self.is_lex_sortable_format() {
+ // Not safe for string comparison; skip pruning
+ return Ok(vec![]);
}
- }
- ExprOperator::Ne => {
- return Err(CoreError::Config(ConfigError::InvalidValue(format!(
- "Not-equal (!=) operator is not supported for
timestamp-based partition \
- pruning on field '{}'. Rewrite the query without != on
partition columns.",
- filter.field_name
- ))));
+ let dt = self.parse_timestamp(&filter.values[0])?;
+ let path = self.format_partition_path(&dt);
+ // Widen Gt→Gte and Lt→Lte for safe partition boundary
approximation
+ let op = match filter.operator {
+ ExprOperator::Gt => ExprOperator::Gte,
+ ExprOperator::Lt => ExprOperator::Lte,
+ other => other,
+ };
+ Ok(vec![Filter {
+ field_name,
+ operator: op,
+ values: vec![path],
+ }])
}
}
-
- Ok(filters)
}
}
@@ -772,7 +820,7 @@ mod tests {
}
#[test]
- fn test_timezone_config_and_partition_values() {
+ fn test_timezone_config_and_partition_path() {
// output.timezone shifts date components
let configs = HudiConfigs::new([
("hoodie.table.partition.fields", "ts"),
@@ -786,10 +834,12 @@ mod tests {
]);
let keygen =
TimestampBasedKeyGenerator::from_configs(&configs).unwrap();
- // 2024-01-25 03:00:00 UTC = 2024-01-24 22:00:00 EST → day=24
+ // 2024-01-25 03:00:00 UTC = 2024-01-24 22:00:00 EST
let dt = keygen.parse_timestamp("1706151600").unwrap();
- let values = keygen.extract_partition_values(&dt);
- assert_eq!(values.get("day"), Some(&"24".to_string()));
+ assert_eq!(
+ keygen.format_partition_path(&dt),
+ "year=2024/month=01/day=24"
+ );
// Fallback: hoodie.keygen.timebased.timezone used when
output.timezone absent
let configs = HudiConfigs::new([
@@ -802,10 +852,12 @@ mod tests {
let keygen =
TimestampBasedKeyGenerator::from_configs(&configs).unwrap();
assert_eq!(keygen.output_timezone, chrono_tz::Asia::Tokyo);
- // 2024-01-25 20:00:00 UTC = 2024-01-26 05:00:00 JST → day=26
+ // 2024-01-25 20:00:00 UTC = 2024-01-26 05:00:00 JST
let dt = keygen.parse_timestamp("1706212800").unwrap();
- let values = keygen.extract_partition_values(&dt);
- assert_eq!(values.get("day"), Some(&"26".to_string()));
+ assert_eq!(
+ keygen.format_partition_path(&dt),
+ "year=2024/month=01/day=26"
+ );
// Precedence: deprecated shared `timezone` wins over specific
`output.timezone`
let configs = HudiConfigs::new([
@@ -825,47 +877,55 @@ mod tests {
#[test]
fn test_transform_filter() {
- // Equality: DATE_STRING → expands to all partition fields
+ // Equality: DATE_STRING → single _hoodie_partition_path filter with
full path
let keygen =
TimestampBasedKeyGenerator::from_configs(&create_test_configs_date_string()).unwrap();
let filter = Filter {
field_name: "ts_str".to_string(),
operator: ExprOperator::Eq,
- field_value: "2023-04-01T12:01:00.123Z".to_string(),
+ values: vec!["2023-04-01T12:01:00.123Z".to_string()],
};
let transformed = keygen.transform_filter(&filter).unwrap();
- assert_eq!(transformed.len(), 4);
- assert_eq!(
- (
- transformed[0].field_name.as_str(),
- transformed[0].field_value.as_str()
- ),
- ("year", "2023")
- );
- assert_eq!(
- (
- transformed[1].field_name.as_str(),
- transformed[1].field_value.as_str()
- ),
- ("month", "04")
- );
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].field_name, "_hoodie_partition_path");
+ assert_eq!(transformed[0].operator, ExprOperator::Eq);
assert_eq!(
- (
- transformed[2].field_name.as_str(),
- transformed[2].field_value.as_str()
- ),
- ("day", "01")
+ transformed[0].values[0],
+ "year=2023/month=04/day=01/hour=12"
);
+
+ // Non-hive-style equality
+ let keygen =
+
TimestampBasedKeyGenerator::from_configs(&create_test_configs_unix_timestamp())
+ .unwrap();
+ // 2024-01-25 00:00:00 UTC = 1706140800 seconds
+ let filter = Filter {
+ field_name: "event_timestamp".to_string(),
+ operator: ExprOperator::Eq,
+ values: vec!["1706140800".to_string()],
+ };
+ let transformed = keygen.transform_filter(&filter).unwrap();
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].values[0], "2024/01/25");
+
+ // Not-equal: now supported
+ let keygen =
+
TimestampBasedKeyGenerator::from_configs(&create_test_configs_date_string()).unwrap();
+ let filter = Filter {
+ field_name: "ts_str".to_string(),
+ operator: ExprOperator::Ne,
+ values: vec!["2023-04-01T12:01:00.123Z".to_string()],
+ };
+ let transformed = keygen.transform_filter(&filter).unwrap();
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].operator, ExprOperator::Ne);
assert_eq!(
- (
- transformed[3].field_name.as_str(),
- transformed[3].field_value.as_str()
- ),
- ("hour", "12")
+ transformed[0].values[0],
+ "year=2023/month=04/day=01/hour=12"
);
- // Range operators: Gt/Gte → Gte, Lt/Lte → Lte (safe widening for
partition boundaries)
+ // Range operators: Gt/Gte → Gte, Lt/Lte → Lte (safe widening)
let keygen =
TimestampBasedKeyGenerator::from_configs(&create_test_configs_unix_timestamp())
.unwrap();
@@ -878,11 +938,12 @@ mod tests {
let filter = Filter {
field_name: "event_timestamp".to_string(),
operator: input_op,
- field_value: "1706140800".to_string(),
+ values: vec!["1706140800".to_string()],
};
let transformed = keygen.transform_filter(&filter).unwrap();
assert_eq!(transformed.len(), 1, "Expected 1 filter for
{input_op:?}");
- assert_eq!(transformed[0].field_name, "yyyy");
+ assert_eq!(transformed[0].field_name, "_hoodie_partition_path");
+ assert_eq!(transformed[0].values[0], "2024/01/25");
assert_eq!(
transformed[0].operator, expected_op,
"{input_op:?} should coerce to {expected_op:?}"
@@ -895,26 +956,12 @@ mod tests {
let filter = Filter {
field_name: "other_field".to_string(),
operator: ExprOperator::Eq,
- field_value: "value".to_string(),
+ values: vec!["value".to_string()],
};
let transformed = keygen.transform_filter(&filter).unwrap();
assert_eq!(transformed.len(), 1);
assert_eq!(transformed[0].field_name, "other_field");
- // Not-equal operator is rejected
- let filter = Filter {
- field_name: "ts_str".to_string(),
- operator: ExprOperator::Ne,
- field_value: "2023-04-01T12:01:00.123Z".to_string(),
- };
- assert!(
- keygen
- .transform_filter(&filter)
- .unwrap_err()
- .to_string()
- .contains("Not-equal (!=) operator is not supported")
- );
-
// Invalid timestamp value produces error, not panic
let unix_keygen =
TimestampBasedKeyGenerator::from_configs(&create_test_configs_unix_timestamp())
@@ -922,8 +969,103 @@ mod tests {
let filter = Filter {
field_name: "event_timestamp".to_string(),
operator: ExprOperator::Eq,
- field_value: "not_a_number".to_string(),
+ values: vec!["not_a_number".to_string()],
};
assert!(unix_keygen.transform_filter(&filter).is_err());
}
+
+ #[test]
+ fn test_transform_filter_in_not_in() {
+ let keygen =
+
TimestampBasedKeyGenerator::from_configs(&create_test_configs_date_string()).unwrap();
+
+ // IN: multiple timestamps → multiple formatted paths in one filter
+ let filter = Filter::new(
+ "ts_str".to_string(),
+ ExprOperator::In,
+ vec![
+ "2023-04-01T12:00:00.000Z".to_string(),
+ "2023-06-15T08:00:00.000Z".to_string(),
+ ],
+ )
+ .unwrap();
+ let transformed = keygen.transform_filter(&filter).unwrap();
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].operator, ExprOperator::In);
+ assert_eq!(
+ transformed[0].values,
+ vec![
+ "year=2023/month=04/day=01/hour=12",
+ "year=2023/month=06/day=15/hour=08"
+ ]
+ );
+
+ // NOT IN
+ let filter = Filter::new(
+ "ts_str".to_string(),
+ ExprOperator::NotIn,
+ vec!["2023-04-01T12:00:00.000Z".to_string()],
+ )
+ .unwrap();
+ let transformed = keygen.transform_filter(&filter).unwrap();
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].operator, ExprOperator::NotIn);
+ }
+
+ #[test]
+ fn test_transform_filter_non_lex_sortable_skips_range() {
+ // Format MM/dd/yyyy is not lex-sortable → range ops return empty
+ let configs = HudiConfigs::new([
+ ("hoodie.table.partition.fields", "ts"),
+ ("hoodie.keygen.timebased.timestamp.type", "UNIX_TIMESTAMP"),
+ ("hoodie.keygen.timebased.output.dateformat", "MM/dd/yyyy"),
+ ("hoodie.datasource.write.hive_style_partitioning", "false"),
+ ]);
+ let keygen =
TimestampBasedKeyGenerator::from_configs(&configs).unwrap();
+
+ // Range ops should return empty (skip pruning)
+ let filter = Filter {
+ field_name: "ts".to_string(),
+ operator: ExprOperator::Gte,
+ values: vec!["1706140800".to_string()],
+ };
+ assert!(keygen.transform_filter(&filter).unwrap().is_empty());
+
+ // But Eq still works
+ let filter = Filter {
+ field_name: "ts".to_string(),
+ operator: ExprOperator::Eq,
+ values: vec!["1706140800".to_string()],
+ };
+ let transformed = keygen.transform_filter(&filter).unwrap();
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].operator, ExprOperator::Eq);
+ }
+
+ #[test]
+ fn test_is_lex_sortable_format() {
+ let make_keygen = |fmt: &str| -> TimestampBasedKeyGenerator {
+ let configs = HudiConfigs::new([
+ ("hoodie.table.partition.fields", "ts"),
+ ("hoodie.keygen.timebased.timestamp.type", "UNIX_TIMESTAMP"),
+ ("hoodie.keygen.timebased.output.dateformat", fmt),
+ ("hoodie.datasource.write.hive_style_partitioning", "false"),
+ ]);
+ TimestampBasedKeyGenerator::from_configs(&configs).unwrap()
+ };
+
+ // Lex-sortable formats
+ assert!(make_keygen("yyyy/MM/dd").is_lex_sortable_format());
+ assert!(make_keygen("yyyy/MM/dd/HH").is_lex_sortable_format());
+ assert!(make_keygen("yyyy/MM").is_lex_sortable_format());
+ assert!(make_keygen("yyyy").is_lex_sortable_format());
+ assert!(make_keygen("yyyyMMdd").is_lex_sortable_format());
+ assert!(make_keygen("yyyyMMddHH").is_lex_sortable_format());
+ assert!(make_keygen("yyyy-MM-dd").is_lex_sortable_format());
+
+ // Not lex-sortable
+ assert!(!make_keygen("MM/dd/yyyy").is_lex_sortable_format());
+ assert!(!make_keygen("dd/MM/yyyy").is_lex_sortable_format());
+ assert!(!make_keygen("ddMMyyyy").is_lex_sortable_format());
+ }
}
diff --git a/crates/core/src/table/file_pruner.rs
b/crates/core/src/table/file_pruner.rs
index 500a2a3..50ef0a8 100644
--- a/crates/core/src/table/file_pruner.rs
+++ b/crates/core/src/table/file_pruner.rs
@@ -121,6 +121,12 @@ impl FilePruner {
///
/// Returns `true` if the file can definitely be pruned (no rows can
match).
fn can_prune_by_filter(&self, filter: &SchemableFilter, col_stats:
&ColumnStatistics) -> bool {
+ // Multi-value operators not yet supported for file-level pruning
+ // TODO: support IN/NOT IN by checking if all values are outside the
min/max range
+ if filter.operator.is_multi_value() {
+ return false;
+ }
+
// Get the filter value as an ArrayRef
let filter_array = self.extract_filter_array(filter);
let Some(filter_value) = filter_array else {
@@ -155,6 +161,9 @@ impl FilePruner {
// Prune if: max < value
self.can_prune_gte(&filter_value, max)
}
+ ExprOperator::In | ExprOperator::NotIn => {
+ unreachable!("Multi-value operators are short-circuited above")
+ }
}
}
@@ -250,7 +259,7 @@ impl FilePruner {
/// Extracts the filter value as an ArrayRef.
fn extract_filter_array(&self, filter: &SchemableFilter) ->
Option<ArrayRef> {
- let (array, is_scalar) = filter.value.get();
+ let (array, is_scalar) = filter.values[0].get();
if array.is_empty() {
return None;
}
@@ -531,6 +540,39 @@ mod tests {
assert!(pruner.should_include(&stats2));
}
+ #[test]
+ fn test_in_not_in_operators_no_prune() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ // IN operator should not prune (conservative approach)
+ let filters = vec![
+ Filter::new(
+ "id".to_string(),
+ ExprOperator::In,
+ vec!["5".to_string(), "10".to_string()],
+ )
+ .unwrap(),
+ ];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Even though 5 and 10 are below min=50, conservative approach
includes file
+ let stats = create_stats_with_int_range("id", 50, 100);
+ assert!(pruner.should_include(&stats));
+
+ // NOT IN operator should also not prune
+ let filters = vec![
+ Filter::new(
+ "id".to_string(),
+ ExprOperator::NotIn,
+ vec!["5".to_string(), "10".to_string()],
+ )
+ .unwrap(),
+ ];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+ assert!(pruner.should_include(&stats));
+ }
+
#[test]
fn test_missing_column_stats_includes_file() {
let table_schema = create_test_schema();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e861ed1..72d6b9d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -105,7 +105,9 @@ use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::expr::filter::{Filter, from_str_tuples};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
+use crate::keygen::is_timestamp_based_keygen;
use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
+use crate::metadata::meta_field::MetaField;
use crate::schema::resolver::{
resolve_avro_schema, resolve_avro_schema_with_meta_fields,
resolve_data_schema, resolve_schema,
};
@@ -271,6 +273,16 @@ impl Table {
)]));
}
+ // Timestamp-based keygen: the source field is transformed into
partition path
+ // strings, so use a single _hoodie_partition_path field.
+ if is_timestamp_based_keygen(&self.hudi_configs) {
+ return Ok(Schema::new(vec![Field::new(
+ MetaField::PartitionPath.as_ref(),
+ arrow_schema::DataType::Utf8,
+ false,
+ )]));
+ }
+
let partition_fields: HashSet<String> = {
let fields: Vec<String> =
self.hudi_configs.get_or_default(PartitionFields).into();
fields.into_iter().collect()
@@ -1001,7 +1013,7 @@ mod tests {
let schema = hudi_table.get_partition_schema().await;
assert!(schema.is_ok());
let schema = schema.unwrap();
- assert_arrow_field_names_eq!(schema, ["ts_str"]);
+ assert_arrow_field_names_eq!(schema,
[MetaField::PartitionPath.as_ref()]);
}
#[tokio::test]
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 37699d7..3bce503 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -27,7 +27,9 @@ use
crate::keygen::timestamp_based::TimestampBasedKeyGenerator;
use arrow_array::{ArrayRef, Scalar};
use arrow_schema::Schema;
-use crate::config::table::HudiTableConfig::{KeyGeneratorClass,
PartitionFields};
+use crate::config::table::HudiTableConfig::{KeyGeneratorClass,
KeyGeneratorType, PartitionFields};
+use crate::keygen::is_timestamp_based_keygen;
+use crate::metadata::meta_field::MetaField;
use std::collections::HashMap;
use std::sync::Arc;
@@ -48,12 +50,11 @@ pub fn is_table_partitioned(hudi_configs: &HudiConfigs) ->
bool {
})
.unwrap_or(false);
- // v8+: also check hoodie.table.keygenerator.type for NON_PARTITION
variants
let uses_non_partitioned_type = hudi_configs
- .as_options()
- .get("hoodie.table.keygenerator.type")
+ .try_get(KeyGeneratorType)
.map(|v| {
- let upper = v.to_uppercase();
+ let s: String = v.into();
+ let upper = s.to_uppercase();
upper == "NON_PARTITION" || upper == "NON_PARTITION_AVRO"
})
.unwrap_or(false);
@@ -144,34 +145,6 @@ impl PartitionPruner {
})
}
- /// Returns true if the table uses a timestamp-based key generator,
- /// checking both `hoodie.table.keygenerator.class` (v6) and
- /// `hoodie.table.keygenerator.type` (v8+).
- fn is_timestamp_based_keygen(hudi_configs: &HudiConfigs) -> bool {
- // v6: hoodie.table.keygenerator.class contains
"TimestampBasedKeyGenerator"
- let by_class: bool = hudi_configs
- .try_get(KeyGeneratorClass)
- .map(|v| {
- let s: String = v.into();
- s.contains("TimestampBasedKeyGenerator")
- })
- .unwrap_or(false);
-
- if by_class {
- return true;
- }
-
- // v8+: hoodie.table.keygenerator.type = "TIMESTAMP" or
"TIMESTAMP_AVRO"
- let options = hudi_configs.as_options();
- options
- .get("hoodie.table.keygenerator.type")
- .map(|v| {
- let upper = v.to_uppercase();
- upper == "TIMESTAMP" || upper == "TIMESTAMP_AVRO"
- })
- .unwrap_or(false)
- }
-
/// Transforms user filters on data columns to filters on partition path
columns
/// based on the configured key generator.
fn transform_filters_for_keygen(
@@ -179,7 +152,7 @@ impl PartitionPruner {
_partition_schema: &Schema,
hudi_configs: &HudiConfigs,
) -> Result<Vec<Filter>> {
- if Self::is_timestamp_based_keygen(hudi_configs) {
+ if is_timestamp_based_keygen(hudi_configs) {
match TimestampBasedKeyGenerator::from_configs(hudi_configs) {
Ok(transformer) => {
return Self::apply_transformer_to_filters(filters,
&transformer);
@@ -217,6 +190,20 @@ impl PartitionPruner {
partition_path.to_string()
};
+ // Special case: single _hoodie_partition_path field uses the raw path
as-is
+ if self.schema.fields().len() == 1
+ && self.schema.field(0).name() == MetaField::PartitionPath.as_ref()
+ {
+ let scalar = SchemableFilter::cast_value(
+ &[partition_path.as_str()],
+ &arrow_schema::DataType::Utf8,
+ )?;
+ return Ok(HashMap::from([(
+ MetaField::PartitionPath.as_ref().to_string(),
+ scalar,
+ )]));
+ }
+
let parts: Vec<&str> = partition_path.split('/').collect();
if parts.len() != self.schema.fields().len() {
@@ -392,14 +379,14 @@ mod tests {
let filter = Filter {
field_name: "date".to_string(),
operator: ExprOperator::Eq,
- field_value: "2023-01-01".to_string(),
+ values: vec!["2023-01-01".to_string()],
};
let partition_filter = SchemableFilter::try_from((filter,
&schema)).unwrap();
assert_eq!(partition_filter.field.name(), "date");
assert_eq!(partition_filter.operator, ExprOperator::Eq);
- let value_inner = partition_filter.value.into_inner();
+ let value_inner = partition_filter.values[0].clone().into_inner();
let date_array =
value_inner.as_any().downcast_ref::<Date32Array>().unwrap();
@@ -413,7 +400,7 @@ mod tests {
let filter = Filter {
field_name: "invalid_field".to_string(),
operator: ExprOperator::Eq,
- field_value: "2023-01-01".to_string(),
+ values: vec!["2023-01-01".to_string()],
};
let result = SchemableFilter::try_from((filter, &schema));
assert!(result.is_err());
@@ -431,7 +418,7 @@ mod tests {
let filter = Filter {
field_name: "count".to_string(),
operator: ExprOperator::Eq,
- field_value: "not_a_number".to_string(),
+ values: vec!["not_a_number".to_string()],
};
let result = SchemableFilter::try_from((filter, &schema));
assert!(result.is_err());
@@ -440,12 +427,8 @@ mod tests {
#[test]
fn test_partition_filter_try_from_all_operators() {
let schema = create_test_schema();
- for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
- let filter = Filter {
- field_name: "count".to_string(),
- operator: ExprOperator::from_str(op).unwrap(),
- field_value: "5".to_string(),
- };
+ for (op, op_enum) in ExprOperator::TOKEN_OP_PAIRS {
+ let filter = Filter::new("count".to_string(), op_enum,
vec!["5".to_string()]).unwrap();
let partition_filter = SchemableFilter::try_from((filter,
&schema));
let filter = partition_filter.unwrap();
assert_eq!(filter.field.name(), "count");
@@ -455,7 +438,13 @@ mod tests {
#[test]
fn test_transform_filters_for_keygen_timestamp_based() {
- // Range filter: DATE_STRING Gte → year Gte
+ let partition_schema = Schema::new(vec![Field::new(
+ MetaField::PartitionPath.as_ref(),
+ DataType::Utf8,
+ false,
+ )]);
+
+ // Range filter: DATE_STRING Gte → single _hoodie_partition_path Gte
let configs = HudiConfigs::new([
("hoodie.table.partition.fields", "ts_str"),
(
@@ -471,16 +460,10 @@ mod tests {
("hoodie.datasource.write.hive_style_partitioning", "true"),
]);
- let partition_schema = Schema::new(vec![
- Field::new("year", DataType::Utf8, false),
- Field::new("month", DataType::Utf8, false),
- Field::new("day", DataType::Utf8, false),
- ]);
-
let user_filter = Filter {
field_name: "ts_str".to_string(),
operator: ExprOperator::Gte,
- field_value: "2023-04-15T12:00:00.000Z".to_string(),
+ values: vec!["2023-04-15T12:00:00.000Z".to_string()],
};
let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -491,11 +474,11 @@ mod tests {
.unwrap();
assert_eq!(transformed.len(), 1);
- assert_eq!(transformed[0].field_name, "year");
+ assert_eq!(transformed[0].field_name,
MetaField::PartitionPath.as_ref());
assert_eq!(transformed[0].operator, ExprOperator::Gte);
- assert_eq!(transformed[0].field_value, "2023");
+ assert_eq!(transformed[0].values[0], "year=2023/month=04/day=15");
- // Equality filter: UNIX_TIMESTAMP Eq → yyyy/MM/dd Eq
+ // Equality filter: UNIX_TIMESTAMP Eq → single path
let configs = HudiConfigs::new([
("hoodie.table.partition.fields", "event_time"),
(
@@ -507,17 +490,11 @@ mod tests {
("hoodie.datasource.write.hive_style_partitioning", "false"),
]);
- let partition_schema = Schema::new(vec![
- Field::new("yyyy", DataType::Utf8, false),
- Field::new("MM", DataType::Utf8, false),
- Field::new("dd", DataType::Utf8, false),
- ]);
-
// 2024-01-25 00:00:00 UTC = 1706140800 seconds
let user_filter = Filter {
field_name: "event_time".to_string(),
operator: ExprOperator::Eq,
- field_value: "1706140800".to_string(),
+ values: vec!["1706140800".to_string()],
};
let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -527,13 +504,9 @@ mod tests {
)
.unwrap();
- assert_eq!(transformed.len(), 3);
- assert_eq!(transformed[0].field_name, "yyyy");
- assert_eq!(transformed[0].field_value, "2024");
- assert_eq!(transformed[1].field_name, "MM");
- assert_eq!(transformed[1].field_value, "01");
- assert_eq!(transformed[2].field_name, "dd");
- assert_eq!(transformed[2].field_value, "25");
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].field_name,
MetaField::PartitionPath.as_ref());
+ assert_eq!(transformed[0].values[0], "2024/01/25");
// v8 detection via keygenerator.type=TIMESTAMP (no keygenerator.class)
let configs = HudiConfigs::new([
@@ -548,16 +521,10 @@ mod tests {
("hoodie.datasource.write.hive_style_partitioning", "true"),
]);
- let partition_schema = Schema::new(vec![
- Field::new("year", DataType::Utf8, false),
- Field::new("month", DataType::Utf8, false),
- Field::new("day", DataType::Utf8, false),
- ]);
-
let user_filter = Filter {
field_name: "ts_str".to_string(),
operator: ExprOperator::Eq,
- field_value: "2023-04-15T12:00:00.000Z".to_string(),
+ values: vec!["2023-04-15T12:00:00.000Z".to_string()],
};
let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -567,9 +534,8 @@ mod tests {
)
.unwrap();
- assert_eq!(transformed.len(), 3);
- assert_eq!(transformed[0].field_name, "year");
- assert_eq!(transformed[0].field_value, "2023");
+ assert_eq!(transformed.len(), 1);
+ assert_eq!(transformed[0].values[0], "year=2023/month=04/day=15");
}
#[test]
@@ -588,7 +554,7 @@ mod tests {
let user_filter = Filter {
field_name: "region".to_string(),
operator: ExprOperator::Eq,
- field_value: "us-west".to_string(),
+ values: vec!["us-west".to_string()],
};
let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -600,7 +566,7 @@ mod tests {
assert_eq!(transformed.len(), 1);
assert_eq!(transformed[0].field_name, user_filter.field_name);
- assert_eq!(transformed[0].field_value, user_filter.field_value);
+ assert_eq!(transformed[0].values[0], user_filter.values[0]);
}
#[test]
@@ -621,29 +587,56 @@ mod tests {
("hoodie.datasource.write.partitionpath.urlencode", "false"),
]);
- let partition_schema = Schema::new(vec![
- Field::new("year", DataType::Utf8, false),
- Field::new("month", DataType::Utf8, false),
- Field::new("day", DataType::Utf8, false),
- ]);
+ let partition_schema = Schema::new(vec![Field::new(
+ MetaField::PartitionPath.as_ref(),
+ DataType::Utf8,
+ false,
+ )]);
let user_filter = Filter {
field_name: "ts".to_string(),
operator: ExprOperator::Gte,
- field_value: "2024-01-15T00:00:00Z".to_string(),
+ values: vec!["2024-01-15T00:00:00Z".to_string()],
};
let pruner = PartitionPruner::new(&[user_filter], &partition_schema,
&configs).unwrap();
assert!(!pruner.is_empty());
- // Should include partitions >= 2024
+ // Full path string comparison on _hoodie_partition_path
assert!(pruner.should_include("year=2024/month=01/day=15"));
assert!(pruner.should_include("year=2024/month=06/day=30"));
assert!(pruner.should_include("year=2025/month=01/day=01"));
- // Should exclude partitions < 2024
+ // Should exclude partitions < 2024/01/15
assert!(!pruner.should_include("year=2023/month=12/day=31"));
assert!(!pruner.should_include("year=2022/month=01/day=01"));
+
+ // Non-hive-style: verify the same logic works
+ let configs = HudiConfigs::new([
+ ("hoodie.table.partition.fields", "ts"),
+ (
+ "hoodie.table.keygenerator.class",
+ "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
+ ),
+ ("hoodie.keygen.timebased.timestamp.type", "DATE_STRING"),
+ (
+ "hoodie.keygen.timebased.input.dateformat",
+ "yyyy-MM-dd'T'HH:mm:ssZ",
+ ),
+ ("hoodie.keygen.timebased.output.dateformat", "yyyy/MM/dd"),
+ ("hoodie.datasource.write.hive_style_partitioning", "false"),
+ ]);
+
+ let user_filter = Filter {
+ field_name: "ts".to_string(),
+ operator: ExprOperator::Eq,
+ values: vec!["2024-01-15T00:00:00Z".to_string()],
+ };
+
+ let pruner = PartitionPruner::new(&[user_filter], &partition_schema,
&configs).unwrap();
+ assert!(pruner.should_include("2024/01/15"));
+ assert!(!pruner.should_include("2024/01/16"));
+ assert!(!pruner.should_include("2023/12/31"));
}
}
diff --git a/crates/datafusion/src/util/expr.rs
b/crates/datafusion/src/util/expr.rs
index 0b68649..8a67876 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -18,6 +18,7 @@
*/
use datafusion::logical_expr::Operator;
+use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, Expr};
use hudi_core::expr::filter::{Filter as HudiFilter, col};
use log::{debug, warn};
@@ -33,6 +34,7 @@ use log::{debug, warn};
/// - `NOT` expressions: negates inner binary expression
/// - `AND` compound expressions: recursively flattens both sides
/// - `BETWEEN` expressions: converts to `>= low AND <= high`
+/// - `IN` / `NOT IN` expressions: converts to `IN` / `NOT IN` filters
///
/// # OR Expression Handling
///
@@ -91,6 +93,7 @@ fn expr_to_filters(expr: &Expr) -> Vec<HudiFilter> {
},
Expr::Not(not_expr) =>
not_expr_to_filter(not_expr).into_iter().collect(),
Expr::Between(between) => between_to_filters(between),
+ Expr::InList(in_list) =>
inlist_expr_to_filter(in_list).into_iter().collect(),
_ => vec![],
}
}
@@ -176,6 +179,46 @@ fn between_to_filters(between: &Between) ->
Vec<HudiFilter> {
]
}
+/// Converts an IN list expression into a HudiFilter with IN or NOT IN
operator.
+///
+/// Returns None if the expression cannot be pushed down (non-column expr,
+/// non-literal values, or empty list).
+fn inlist_expr_to_filter(in_list: &InList) -> Option<HudiFilter> {
+ let column = match in_list.expr.as_ref() {
+ Expr::Column(col) => col,
+ _ => {
+ debug!("IN list with non-column expression cannot be pushed down");
+ return None;
+ }
+ };
+
+ if in_list.list.is_empty() {
+ debug!("Empty IN list cannot be pushed down");
+ return None;
+ }
+
+ let values: Vec<String> = in_list
+ .list
+ .iter()
+ .filter_map(|expr| match expr {
+ Expr::Literal(lit, _) => Some(lit.to_string()),
+ _ => None,
+ })
+ .collect();
+
+ if values.len() != in_list.list.len() {
+ debug!("IN list contains non-literal values, cannot be pushed down");
+ return None;
+ }
+
+ let field = col(column.name());
+ if in_list.negated {
+ Some(field.not_in_list(values))
+ } else {
+ Some(field.in_list(values))
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -205,14 +248,14 @@ mod tests {
let expected_filter = HudiFilter {
field_name: schema.field(0).name().to_string(),
operator: ExprOperator::Eq,
- field_value: "42".to_string(),
+ values: vec!["42".to_string()],
};
assert_eq!(
result[0],
(
expected_filter.field_name,
expected_filter.operator.to_string(),
- expected_filter.field_value
+ expected_filter.values.join(",")
)
);
}
@@ -237,14 +280,14 @@ mod tests {
let expected_filter = HudiFilter {
field_name: schema.field(0).name().to_string(),
operator: ExprOperator::Ne,
- field_value: "42".to_string(),
+ values: vec!["42".to_string()],
};
assert_eq!(
result[0],
(
expected_filter.field_name,
expected_filter.operator.to_string(),
- expected_filter.field_value
+ expected_filter.values.join(",")
)
);
}
@@ -258,7 +301,7 @@ mod tests {
Some(HudiFilter {
field_name: String::from("int32_col"),
operator: ExprOperator::Eq,
- field_value: String::from("42"),
+ values: vec![String::from("42")],
}),
),
(
@@ -266,7 +309,7 @@ mod tests {
Some(HudiFilter {
field_name: String::from("int64_col"),
operator: ExprOperator::Gte,
- field_value: String::from("100"),
+ values: vec![String::from("100")],
}),
),
(
@@ -274,7 +317,7 @@ mod tests {
Some(HudiFilter {
field_name: String::from("float64_col"),
operator: ExprOperator::Lt,
- field_value: "32.666".to_string(),
+ values: vec!["32.666".to_string()],
}),
),
(
@@ -282,7 +325,7 @@ mod tests {
Some(HudiFilter {
field_name: String::from("string_col"),
operator: ExprOperator::Ne,
- field_value: String::from("test"),
+ values: vec![String::from("test")],
}),
),
];
@@ -302,7 +345,7 @@ mod tests {
&(
expected_filter.field_name.clone(),
expected_filter.operator.to_string(),
- expected_filter.field_value.clone()
+ expected_filter.values.join(",").clone()
)
);
}
@@ -336,14 +379,14 @@ mod tests {
let expected_filter = HudiFilter {
field_name: schema.field(0).name().to_string(),
operator: expected_op,
- field_value: String::from("42"),
+ values: vec![String::from("42")],
};
assert_eq!(
result[0],
(
expected_filter.field_name,
expected_filter.operator.to_string(),
- expected_filter.field_value
+ expected_filter.values.join(",")
)
);
}
@@ -655,4 +698,68 @@ mod tests {
);
assert_eq!(result[0].0, "col_a");
}
+
+ #[test]
+ fn test_convert_in_list() {
+ // col IN ('a', 'b', 'c')
+ let in_list = Expr::InList(InList::new(
+ Box::new(col("part")),
+ vec![lit("a"), lit("b"), lit("c")],
+ false,
+ ));
+ let result = exprs_to_filters(&[in_list]);
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].0, "part");
+ assert_eq!(result[0].1, "IN");
+ assert_eq!(result[0].2, "a,b,c");
+
+ // col NOT IN ('x', 'y')
+ let not_in = Expr::InList(InList::new(
+ Box::new(col("part")),
+ vec![lit("x"), lit("y")],
+ true,
+ ));
+ let result = exprs_to_filters(&[not_in]);
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].0, "part");
+ assert_eq!(result[0].1, "NOT IN");
+ assert_eq!(result[0].2, "x,y");
+
+ // Integer IN list
+ let in_int = Expr::InList(InList::new(
+ Box::new(col("id")),
+ vec![lit(40i32), lit(60i32)],
+ false,
+ ));
+ let result = exprs_to_filters(&[in_int]);
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].1, "IN");
+ }
+
+ #[test]
+ fn test_convert_in_list_unsupported_cases() {
+ // Empty list
+ let empty = Expr::InList(InList::new(Box::new(col("col1")), vec![],
false));
+ assert!(exprs_to_filters(&[empty]).is_empty());
+
+ // Non-literal values
+ let non_lit = Expr::InList(InList::new(
+ Box::new(col("col1")),
+ vec![col("col2"), col("col3")],
+ false,
+ ));
+ assert!(exprs_to_filters(&[non_lit]).is_empty());
+
+ // Non-column expression
+ let non_col = Expr::InList(InList::new(
+ Box::new(Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col1")),
+ Operator::Plus,
+ Box::new(col("col2")),
+ ))),
+ vec![lit(1i32)],
+ false,
+ ));
+ assert!(exprs_to_filters(&[non_col]).is_empty());
+ }
}