This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new ae09718  feat: support IN and NOT IN operators in partition filter 
(#566)
ae09718 is described below

commit ae097181d4052db1ead599cc8a6ac252ae903ed2
Author: Yunchi Pang <[email protected]>
AuthorDate: Sun Mar 29 23:01:02 2026 -0400

    feat: support IN and NOT IN operators in partition filter (#566)
    
    
    
    ---------
    
    Co-authored-by: Shiyan Xu <[email protected]>
---
 crates/core/src/config/table.rs           |   5 +
 crates/core/src/expr/filter.rs            | 267 ++++++++++++++++++---
 crates/core/src/expr/mod.rs               |  22 +-
 crates/core/src/keygen/mod.rs             |  28 +++
 crates/core/src/keygen/timestamp_based.rs | 378 ++++++++++++++++++++----------
 crates/core/src/table/file_pruner.rs      |  44 +++-
 crates/core/src/table/mod.rs              |  14 +-
 crates/core/src/table/partition.rs        | 171 +++++++-------
 crates/datafusion/src/util/expr.rs        | 129 +++++++++-
 9 files changed, 799 insertions(+), 259 deletions(-)

diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 6ff9169..83c1c70 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -78,6 +78,9 @@ pub enum HudiTableConfig {
     /// Key Generator class property for the hoodie table
     KeyGeneratorClass,
 
+    /// Key Generator type property for the hoodie table (v8+)
+    KeyGeneratorType,
+
     /// Fields used to partition the table. Concatenated values of these 
fields are used as
     /// the partition path, by invoking toString().
     /// These fields also include the partition type which is used by custom 
key generators
@@ -156,6 +159,7 @@ impl AsRef<str> for HudiTableConfig {
             Self::IsHiveStylePartitioning => 
"hoodie.datasource.write.hive_style_partitioning",
             Self::IsPartitionPathUrlencoded => 
"hoodie.datasource.write.partitionpath.urlencode",
             Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
+            Self::KeyGeneratorType => "hoodie.table.keygenerator.type",
             Self::PartitionFields => "hoodie.table.partition.fields",
             Self::PrecombineField => "hoodie.table.precombine.field",
             Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
@@ -245,6 +249,7 @@ impl ConfigParser for HudiTableConfig {
                 })
                 .map(HudiConfigValue::Boolean),
             Self::KeyGeneratorClass => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::KeyGeneratorType => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::PartitionFields => get_result
                 .map(|v| 
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
             Self::PrecombineField => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 060841f..791a556 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -20,6 +20,7 @@
 use crate::Result;
 use crate::error::CoreError;
 use crate::expr::ExprOperator;
+use arrow_arith::boolean;
 use arrow_array::{ArrayRef, BooleanArray, Datum, Scalar, StringArray};
 use arrow_cast::{CastOptions, cast_with_options};
 use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
@@ -30,7 +31,29 @@ use std::str::FromStr;
 pub struct Filter {
     pub field_name: String,
     pub operator: ExprOperator,
-    pub field_value: String,
+    pub values: Vec<String>,
+}
+
+impl Filter {
+    pub fn new(field_name: String, operator: ExprOperator, values: 
Vec<String>) -> Result<Self> {
+        if operator.is_multi_value() {
+            if values.is_empty() {
+                return Err(CoreError::Schema(format!(
+                    "{operator} operator requires at least one value for field 
'{field_name}'"
+                )));
+            }
+        } else if values.len() != 1 {
+            return Err(CoreError::Schema(format!(
+                "Operator {operator} requires exactly one value for field 
'{field_name}', got {}",
+                values.len()
+            )));
+        }
+        Ok(Self {
+            field_name,
+            operator,
+            values,
+        })
+    }
 }
 
 impl Filter {
@@ -44,11 +67,8 @@ impl Filter {
 
 impl From<Filter> for (String, String, String) {
     fn from(filter: Filter) -> Self {
-        (
-            filter.field_name,
-            filter.operator.to_string(),
-            filter.field_value,
-        )
+        let value_str = filter.values.join(",");
+        (filter.field_name, filter.operator.to_string(), value_str)
     }
 }
 
@@ -57,18 +77,18 @@ impl TryFrom<(&str, &str, &str)> for Filter {
 
     fn try_from(binary_expr_tuple: (&str, &str, &str)) -> Result<Self, 
Self::Error> {
         let (field_name, operator_str, field_value) = binary_expr_tuple;
-
         let field_name = field_name.to_string();
-
         let operator = ExprOperator::from_str(operator_str)?;
-
-        let field_value = field_value.to_string();
-
-        Ok(Filter {
-            field_name,
-            operator,
-            field_value,
-        })
+        let values = if operator.is_multi_value() {
+            field_value
+                .split(',')
+                .map(|s| s.trim().to_string())
+                .filter(|s| !s.is_empty())
+                .collect()
+        } else {
+            vec![field_value.to_string()]
+        };
+        Filter::new(field_name, operator, values)
     }
 }
 
@@ -100,7 +120,7 @@ impl FilterField {
         Filter {
             field_name: self.name.clone(),
             operator: ExprOperator::Eq,
-            field_value: value.into(),
+            values: vec![value.into()],
         }
     }
 
@@ -108,7 +128,7 @@ impl FilterField {
         Filter {
             field_name: self.name.clone(),
             operator: ExprOperator::Ne,
-            field_value: value.into(),
+            values: vec![value.into()],
         }
     }
 
@@ -116,7 +136,7 @@ impl FilterField {
         Filter {
             field_name: self.name.clone(),
             operator: ExprOperator::Lt,
-            field_value: value.into(),
+            values: vec![value.into()],
         }
     }
 
@@ -124,7 +144,7 @@ impl FilterField {
         Filter {
             field_name: self.name.clone(),
             operator: ExprOperator::Lte,
-            field_value: value.into(),
+            values: vec![value.into()],
         }
     }
 
@@ -132,7 +152,7 @@ impl FilterField {
         Filter {
             field_name: self.name.clone(),
             operator: ExprOperator::Gt,
-            field_value: value.into(),
+            values: vec![value.into()],
         }
     }
 
@@ -140,7 +160,31 @@ impl FilterField {
         Filter {
             field_name: self.name.clone(),
             operator: ExprOperator::Gte,
-            field_value: value.into(),
+            values: vec![value.into()],
+        }
+    }
+
+    pub fn in_list<I, S>(&self, values: I) -> Filter
+    where
+        I: IntoIterator<Item = S>,
+        S: Into<String>,
+    {
+        Filter {
+            field_name: self.name.clone(),
+            operator: ExprOperator::In,
+            values: values.into_iter().map(|v| v.into()).collect(),
+        }
+    }
+
+    pub fn not_in_list<I, S>(&self, values: I) -> Filter
+    where
+        I: IntoIterator<Item = S>,
+        S: Into<String>,
+    {
+        Filter {
+            field_name: self.name.clone(),
+            operator: ExprOperator::NotIn,
+            values: values.into_iter().map(|v| v.into()).collect(),
         }
     }
 }
@@ -153,7 +197,7 @@ pub fn col(name: impl Into<String>) -> FilterField {
 pub struct SchemableFilter {
     pub field: Field,
     pub operator: ExprOperator,
-    pub value: Scalar<ArrayRef>,
+    pub values: Vec<Scalar<ArrayRef>>,
 }
 
 impl TryFrom<(Filter, &Schema)> for SchemableFilter {
@@ -166,14 +210,19 @@ impl TryFrom<(Filter, &Schema)> for SchemableFilter {
         })?;
 
         let operator = filter.operator;
-        let value = &[filter.field_value.as_str()];
-        let value = Self::cast_value(value, field.data_type())?;
+
+        let values: Result<Vec<_>> = filter
+            .values
+            .iter()
+            .map(|v| Self::cast_value(&[v.as_str()], field.data_type()))
+            .collect();
+        let values = values?;
 
         let field = field.clone();
         Ok(SchemableFilter {
             field,
             operator,
-            value,
+            values,
         })
     }
 }
@@ -195,12 +244,30 @@ impl SchemableFilter {
 
     pub fn apply_comparison(&self, value: &dyn Datum) -> Result<BooleanArray> {
         match self.operator {
-            ExprOperator::Eq => eq(value, &self.value),
-            ExprOperator::Ne => neq(value, &self.value),
-            ExprOperator::Lt => lt(value, &self.value),
-            ExprOperator::Lte => lt_eq(value, &self.value),
-            ExprOperator::Gt => gt(value, &self.value),
-            ExprOperator::Gte => gt_eq(value, &self.value),
+            ExprOperator::Eq => eq(value, &self.values[0]),
+            ExprOperator::Ne => neq(value, &self.values[0]),
+            ExprOperator::Lt => lt(value, &self.values[0]),
+            ExprOperator::Lte => lt_eq(value, &self.values[0]),
+            ExprOperator::Gt => gt(value, &self.values[0]),
+            ExprOperator::Gte => gt_eq(value, &self.values[0]),
+            ExprOperator::In => {
+                // IN: value == values[0] OR value == values[1] OR ...
+                let mut result = eq(value, &self.values[0])?;
+                for filter_value in &self.values[1..] {
+                    let comparison = eq(value, filter_value)?;
+                    result = boolean::or(&result, &comparison)?;
+                }
+                Ok(result)
+            }
+            ExprOperator::NotIn => {
+                // NOT IN: value != values[0] AND value != values[1] AND ...
+                let mut result = neq(value, &self.values[0])?;
+                for filter_value in &self.values[1..] {
+                    let comparison = neq(value, filter_value)?;
+                    result = boolean::and(&result, &comparison)?;
+                }
+                Ok(result)
+            }
         }
         .map_err(|e| e.into())
     }
@@ -227,7 +294,7 @@ mod tests {
         let string_filter = Filter {
             field_name: "string_col".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "test_value".to_string(),
+            values: vec!["test_value".to_string()],
         };
 
         let schemable = SchemableFilter::try_from((string_filter, &schema))?;
@@ -239,7 +306,7 @@ mod tests {
         let int_filter = Filter {
             field_name: "int_col".to_string(),
             operator: ExprOperator::Gt,
-            field_value: "42".to_string(),
+            values: vec!["42".to_string()],
         };
 
         let schemable = SchemableFilter::try_from((int_filter, &schema))?;
@@ -251,7 +318,7 @@ mod tests {
         let invalid_filter = Filter {
             field_name: "non_existent".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "value".to_string(),
+            values: vec!["value".to_string()],
         };
 
         assert!(SchemableFilter::try_from((invalid_filter, &schema)).is_err());
@@ -259,6 +326,31 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_filter_in_empty_values_error() -> Result<()> {
+        // IN operator with empty values should error at construction
+        let result = Filter::new("int_col".to_string(), ExprOperator::In, 
vec![]);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("at least one value")
+        );
+
+        // NOT IN operator with empty values should also error at construction
+        let result = Filter::new("int_col".to_string(), ExprOperator::NotIn, 
vec![]);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("at least one value")
+        );
+
+        Ok(())
+    }
+
     #[test]
     fn test_schemable_filter_cast_value() -> Result<()> {
         // Test casting to string
@@ -284,7 +376,7 @@ mod tests {
         let eq_filter = Filter {
             field_name: "string_col".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "test".to_string(),
+            values: vec!["test".to_string()],
         };
         let schemable = SchemableFilter::try_from((eq_filter, &schema))?;
 
@@ -296,7 +388,7 @@ mod tests {
         let gt_filter = Filter {
             field_name: "int_col".to_string(),
             operator: ExprOperator::Gt,
-            field_value: "50".to_string(),
+            values: vec!["50".to_string()],
         };
         let schemable = SchemableFilter::try_from((gt_filter, &schema))?;
 
@@ -325,7 +417,7 @@ mod tests {
             let filter = Filter {
                 field_name: "int_col".to_string(),
                 operator,
-                field_value: value.to_string(),
+                values: vec![value.to_string()],
             };
 
             let schemable = SchemableFilter::try_from((filter, &schema))?;
@@ -339,4 +431,103 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_schemable_filter_in() -> Result<()> {
+        let schema = create_test_schema();
+
+        let in_filter = Filter::new(
+            "string_col".to_string(),
+            ExprOperator::In,
+            vec!["foo".to_string(), "bar".to_string()],
+        )
+        .unwrap();
+
+        let schemable = SchemableFilter::try_from((in_filter, &schema))?;
+        let test_array = StringArray::from(vec!["foo", "baz", "bar", "qux"]);
+        let result = schemable.apply_comparison(&test_array)?;
+        assert_eq!(result, BooleanArray::from(vec![true, false, true, false]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_schemable_filter_not_in() -> Result<()> {
+        let schema = create_test_schema();
+
+        let not_in_filter = Filter::new(
+            "string_col".to_string(),
+            ExprOperator::NotIn,
+            vec!["foo".to_string(), "bar".to_string()],
+        )
+        .unwrap();
+
+        let schemable = SchemableFilter::try_from((not_in_filter, &schema))?;
+        let test_array = StringArray::from(vec!["foo", "baz", "bar", "qux"]);
+        let result = schemable.apply_comparison(&test_array)?;
+        assert_eq!(result, BooleanArray::from(vec![false, true, false, true]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_schemable_filter_in_with_integers() -> Result<()> {
+        let schema = create_test_schema();
+
+        let in_filter = Filter::new(
+            "int_col".to_string(),
+            ExprOperator::In,
+            vec!["40".to_string(), "60".to_string()],
+        )
+        .unwrap();
+
+        let schemable = SchemableFilter::try_from((in_filter, &schema))?;
+        let test_array = Int64Array::from(vec![40, 50, 60]);
+        let result = schemable.apply_comparison(&test_array)?;
+        assert_eq!(result, BooleanArray::from(vec![true, false, true]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_filter_try_from_tuple_in_operator() -> Result<()> {
+        // TryFrom should parse comma-separated values for IN
+        let filter = Filter::try_from(("col", "IN", "a,b,c"))?;
+        assert_eq!(filter.operator, ExprOperator::In);
+        assert_eq!(filter.values, vec!["a", "b", "c"]);
+
+        // TryFrom should parse comma-separated values for NOT IN
+        let filter = Filter::try_from(("col", "NOT IN", "x, y"))?;
+        assert_eq!(filter.operator, ExprOperator::NotIn);
+        assert_eq!(filter.values, vec!["x", "y"]);
+
+        // Empty value string for IN should error
+        let result = Filter::try_from(("col", "IN", ""));
+        assert!(result.is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_filter_roundtrip_in_operator() -> Result<()> {
+        // Round-trip: Filter -> (String, String, String) -> Filter
+        let original = Filter::new(
+            "col".to_string(),
+            ExprOperator::In,
+            vec!["a".to_string(), "b".to_string()],
+        )
+        .unwrap();
+
+        let tuple: (String, String, String) = original.into();
+        assert_eq!(
+            tuple,
+            ("col".to_string(), "IN".to_string(), "a,b".to_string())
+        );
+
+        let restored = Filter::try_from((tuple.0.as_str(), tuple.1.as_str(), 
tuple.2.as_str()))?;
+        assert_eq!(restored.operator, ExprOperator::In);
+        assert_eq!(restored.values, vec!["a", "b"]);
+
+        Ok(())
+    }
 }
diff --git a/crates/core/src/expr/mod.rs b/crates/core/src/expr/mod.rs
index d98b6a7..cc7c84c 100644
--- a/crates/core/src/expr/mod.rs
+++ b/crates/core/src/expr/mod.rs
@@ -35,6 +35,8 @@ pub enum ExprOperator {
     Lte,
     Gt,
     Gte,
+    In,
+    NotIn,
 }
 
 impl Display for ExprOperator {
@@ -47,20 +49,29 @@ impl Display for ExprOperator {
             ExprOperator::Lte => write!(f, "<="),
             ExprOperator::Gt => write!(f, ">"),
             ExprOperator::Gte => write!(f, ">="),
+            ExprOperator::In => write!(f, "IN"),
+            ExprOperator::NotIn => write!(f, "NOT IN"),
         }
     }
 }
 
 impl ExprOperator {
-    pub const TOKEN_OP_PAIRS: [(&'static str, ExprOperator); 6] = [
+    pub const TOKEN_OP_PAIRS: [(&'static str, ExprOperator); 8] = [
         ("=", ExprOperator::Eq),
         ("!=", ExprOperator::Ne),
         ("<", ExprOperator::Lt),
         ("<=", ExprOperator::Lte),
         (">", ExprOperator::Gt),
         (">=", ExprOperator::Gte),
+        ("IN", ExprOperator::In),
+        ("NOT IN", ExprOperator::NotIn),
     ];
 
+    /// Returns true if the operator expects multiple values (IN, NOT IN).
+    pub fn is_multi_value(&self) -> bool {
+        matches!(self, ExprOperator::In | ExprOperator::NotIn)
+    }
+
     /// Negates the operator.
     pub fn negate(&self) -> Option<ExprOperator> {
         match self {
@@ -70,6 +81,8 @@ impl ExprOperator {
             ExprOperator::Lte => Some(ExprOperator::Gt),
             ExprOperator::Gt => Some(ExprOperator::Lte),
             ExprOperator::Gte => Some(ExprOperator::Lt),
+            ExprOperator::In => Some(ExprOperator::NotIn),
+            ExprOperator::NotIn => Some(ExprOperator::In),
         }
     }
 }
@@ -103,6 +116,11 @@ mod tests {
         assert_eq!(ExprOperator::from_str("<=").unwrap(), ExprOperator::Lte);
         assert_eq!(ExprOperator::from_str(">").unwrap(), ExprOperator::Gt);
         assert_eq!(ExprOperator::from_str(">=").unwrap(), ExprOperator::Gte);
+        assert_eq!(ExprOperator::from_str("IN").unwrap(), ExprOperator::In);
+        assert_eq!(
+            ExprOperator::from_str("NOT IN").unwrap(),
+            ExprOperator::NotIn
+        );
         assert!(ExprOperator::from_str("??").is_err());
     }
 
@@ -114,5 +132,7 @@ mod tests {
         assert_eq!(ExprOperator::Lte.to_string(), "<=");
         assert_eq!(ExprOperator::Gt.to_string(), ">");
         assert_eq!(ExprOperator::Gte.to_string(), ">=");
+        assert_eq!(ExprOperator::In.to_string(), "IN");
+        assert_eq!(ExprOperator::NotIn.to_string(), "NOT IN");
     }
 }
diff --git a/crates/core/src/keygen/mod.rs b/crates/core/src/keygen/mod.rs
index 029ed95..350b053 100644
--- a/crates/core/src/keygen/mod.rs
+++ b/crates/core/src/keygen/mod.rs
@@ -22,8 +22,36 @@
 pub mod timestamp_based;
 
 use crate::Result;
+use crate::config::HudiConfigs;
+use crate::config::table::HudiTableConfig::{KeyGeneratorClass, 
KeyGeneratorType};
 use crate::expr::filter::Filter;
 
+/// Returns true if the table uses a timestamp-based key generator,
+/// checking both `hoodie.table.keygenerator.class` and
+/// `hoodie.table.keygenerator.type`.
+pub fn is_timestamp_based_keygen(hudi_configs: &HudiConfigs) -> bool {
+    let by_class: bool = hudi_configs
+        .try_get(KeyGeneratorClass)
+        .map(|v| {
+            let s: String = v.into();
+            s.contains("TimestampBasedKeyGenerator")
+        })
+        .unwrap_or(false);
+
+    if by_class {
+        return true;
+    }
+
+    hudi_configs
+        .try_get(KeyGeneratorType)
+        .map(|v| {
+            let s: String = v.into();
+            let upper = s.to_uppercase();
+            upper == "TIMESTAMP" || upper == "TIMESTAMP_AVRO"
+        })
+        .unwrap_or(false)
+}
+
 /// Trait for key generators that can transform user filters on data columns
 /// to filters on partition path columns.
 pub trait KeyGeneratorFilterTransformer {
diff --git a/crates/core/src/keygen/timestamp_based.rs 
b/crates/core/src/keygen/timestamp_based.rs
index 3869204..3524739 100644
--- a/crates/core/src/keygen/timestamp_based.rs
+++ b/crates/core/src/keygen/timestamp_based.rs
@@ -25,6 +25,7 @@ use crate::error::CoreError;
 use crate::expr::ExprOperator;
 use crate::expr::filter::Filter;
 use crate::keygen::KeyGeneratorFilterTransformer;
+use crate::metadata::meta_field::MetaField;
 use chrono::{DateTime, Datelike, NaiveDateTime, TimeZone, Timelike, Utc};
 use chrono_tz::Tz;
 use std::collections::HashMap;
@@ -57,7 +58,6 @@ pub struct TimestampBasedKeyGenerator {
     output_timezone: Tz,
 
     /// Whether partitions use Hive-style naming (e.g., year=2024 vs 2024)
-    #[allow(dead_code)]
     is_hive_style: bool,
 
     /// Partition field names derived from output format (e.g., ["year", 
"month", "day", "hour"])
@@ -418,29 +418,81 @@ impl TimestampBasedKeyGenerator {
             .replace("'T'", "T")
     }
 
-    /// Extracts partition values from a datetime based on output format,
-    /// applying the configured output timezone.
-    fn extract_partition_values(&self, dt: &DateTime<Utc>) -> HashMap<String, 
String> {
+    /// Formats a datetime into the full partition path string.
+    ///
+    /// For non-hive-style with format `yyyy/MM/dd`: `"2023/04/15"`
+    /// For hive-style with format `yyyy/MM/dd`: `"year=2023/month=04/day=15"`
+    fn format_partition_path(&self, dt: &DateTime<Utc>) -> String {
         let local_dt = dt.with_timezone(&self.output_timezone);
-        let mut values = HashMap::new();
-
-        let segments: Vec<&str> = self.output_dateformat.split('/').collect();
-
-        for (i, segment) in segments.iter().enumerate() {
-            let field_name = &self.partition_fields[i];
-            let value = match *segment {
-                "yyyy" => format!("{:04}", local_dt.year()),
-                "MM" => format!("{:02}", local_dt.month()),
-                "dd" => format!("{:02}", local_dt.day()),
-                "HH" => format!("{:02}", local_dt.hour()),
-                "mm" => format!("{:02}", local_dt.minute()),
-                "ss" => format!("{:02}", local_dt.second()),
-                _ => segment.to_string(),
-            };
-            values.insert(field_name.clone(), value);
+
+        let segments: Vec<String> = self
+            .output_dateformat
+            .split('/')
+            .enumerate()
+            .map(|(i, segment)| {
+                let value = Self::format_segment_value(segment, &local_dt);
+                if self.is_hive_style {
+                    let field_name = &self.partition_fields[i];
+                    format!("{field_name}={value}")
+                } else {
+                    value
+                }
+            })
+            .collect();
+
+        segments.join("/")
+    }
+
+    /// Formats a single date segment into its value string.
+    ///
+    /// Supports both simple tokens like `"yyyy"` and compound patterns like
+    /// `"yyyyMMdd"` or `"yyyy-MM-dd"` by progressively replacing known tokens.
+    fn format_segment_value<T: Datelike + Timelike>(segment: &str, dt: &T) -> 
String {
+        // Order matters: longer tokens must not be partially consumed by 
shorter ones.
+        // "mm" (minute) vs "MM" (month) are case-sensitive so they don't 
conflict.
+        segment
+            .replace("yyyy", &format!("{:04}", dt.year()))
+            .replace("MM", &format!("{:02}", dt.month()))
+            .replace("dd", &format!("{:02}", dt.day()))
+            .replace("HH", &format!("{:02}", dt.hour()))
+            .replace("mm", &format!("{:02}", dt.minute()))
+            .replace("ss", &format!("{:02}", dt.second()))
+    }
+
+    /// Returns true if the output date format produces lexicographically 
sortable
+    /// partition paths (i.e., string comparison preserves chronological 
order).
+    ///
+    /// This requires date tokens across the full format to appear in strict
+    /// descending significance: year > month > day > hour > minute > second.
+    /// Supports both `/`-separated formats like `yyyy/MM/dd` and compound
+    /// formats like `yyyyMMdd` or `yyyy-MM-dd`.
+    fn is_lex_sortable_format(&self) -> bool {
+        // Known tokens in search order (longest first to avoid partial 
matches)
+        const TOKENS: &[(&str, u8)] = &[
+            ("yyyy", 6),
+            ("MM", 5),
+            ("dd", 4),
+            ("HH", 3),
+            ("mm", 2),
+            ("ss", 1),
+        ];
+
+        // Extract all token ranks in order of appearance
+        let mut ranks: Vec<u8> = Vec::new();
+        let mut remaining = self.output_dateformat.as_str();
+
+        while !remaining.is_empty() {
+            if let Some((token, rank)) = TOKENS.iter().find(|(t, _)| 
remaining.starts_with(t)) {
+                ranks.push(*rank);
+                remaining = &remaining[token.len()..];
+            } else {
+                // Skip non-token characters (separators like '/', '-', etc.)
+                remaining = &remaining[1..];
+            }
         }
 
-        values
+        // Must have at least one token and be in strictly descending order
+        !ranks.is_empty() && ranks.windows(2).all(|w| w[0] > w[1])
     }
 }
 
@@ -454,57 +506,53 @@ impl KeyGeneratorFilterTransformer for 
TimestampBasedKeyGenerator {
             return Ok(vec![filter.clone()]);
         }
 
-        let dt = self.parse_timestamp(&filter.field_value)?;
-        let partition_values = self.extract_partition_values(&dt);
-
-        let mut filters = Vec::new();
+        let field_name = MetaField::PartitionPath.as_ref().to_string();
 
         match filter.operator {
-            ExprOperator::Eq => {
-                for field_name in &self.partition_fields {
-                    if let Some(value) = partition_values.get(field_name) {
-                        filters.push(Filter {
-                            field_name: field_name.clone(),
-                            operator: ExprOperator::Eq,
-                            field_value: value.clone(),
-                        });
-                    }
-                }
+            ExprOperator::Eq | ExprOperator::Ne => {
+                let dt = self.parse_timestamp(&filter.values[0])?;
+                let path = self.format_partition_path(&dt);
+                Ok(vec![Filter {
+                    field_name,
+                    operator: filter.operator,
+                    values: vec![path],
+                }])
             }
-            ExprOperator::Gte | ExprOperator::Gt => {
-                // Only compare the first partition field for simplicity.
-                // May scan more partitions than necessary but avoids complex 
multi-field range logic.
-                if let Some(first_field) = self.partition_fields.first() {
-                    if let Some(value) = partition_values.get(first_field) {
-                        filters.push(Filter {
-                            field_name: first_field.clone(),
-                            operator: ExprOperator::Gte,
-                            field_value: value.clone(),
-                        });
-                    }
-                }
+            ExprOperator::In | ExprOperator::NotIn => {
+                let paths: Result<Vec<String>> = filter
+                    .values
+                    .iter()
+                    .map(|v| {
+                        let dt = self.parse_timestamp(v)?;
+                        Ok(self.format_partition_path(&dt))
+                    })
+                    .collect();
+                Ok(vec![Filter {
+                    field_name,
+                    operator: filter.operator,
+                    values: paths?,
+                }])
             }
-            ExprOperator::Lte | ExprOperator::Lt => {
-                if let Some(first_field) = self.partition_fields.first() {
-                    if let Some(value) = partition_values.get(first_field) {
-                        filters.push(Filter {
-                            field_name: first_field.clone(),
-                            operator: ExprOperator::Lte,
-                            field_value: value.clone(),
-                        });
-                    }
+            ExprOperator::Gt | ExprOperator::Gte | ExprOperator::Lt | 
ExprOperator::Lte => {
+                if !self.is_lex_sortable_format() {
+                    // Not safe for string comparison; skip pruning
+                    return Ok(vec![]);
                 }
-            }
-            ExprOperator::Ne => {
-                return Err(CoreError::Config(ConfigError::InvalidValue(format!(
-                    "Not-equal (!=) operator is not supported for 
timestamp-based partition \
-                     pruning on field '{}'. Rewrite the query without != on 
partition columns.",
-                    filter.field_name
-                ))));
+                let dt = self.parse_timestamp(&filter.values[0])?;
+                let path = self.format_partition_path(&dt);
+                // Widen Gt→Gte and Lt→Lte for safe partition boundary 
approximation
+                let op = match filter.operator {
+                    ExprOperator::Gt => ExprOperator::Gte,
+                    ExprOperator::Lt => ExprOperator::Lte,
+                    other => other,
+                };
+                Ok(vec![Filter {
+                    field_name,
+                    operator: op,
+                    values: vec![path],
+                }])
             }
         }
-
-        Ok(filters)
     }
 }
 
@@ -772,7 +820,7 @@ mod tests {
     }
 
     #[test]
-    fn test_timezone_config_and_partition_values() {
+    fn test_timezone_config_and_partition_path() {
         // output.timezone shifts date components
         let configs = HudiConfigs::new([
             ("hoodie.table.partition.fields", "ts"),
@@ -786,10 +834,12 @@ mod tests {
         ]);
         let keygen = 
TimestampBasedKeyGenerator::from_configs(&configs).unwrap();
 
-        // 2024-01-25 03:00:00 UTC = 2024-01-24 22:00:00 EST → day=24
+        // 2024-01-25 03:00:00 UTC = 2024-01-24 22:00:00 EST
         let dt = keygen.parse_timestamp("1706151600").unwrap();
-        let values = keygen.extract_partition_values(&dt);
-        assert_eq!(values.get("day"), Some(&"24".to_string()));
+        assert_eq!(
+            keygen.format_partition_path(&dt),
+            "year=2024/month=01/day=24"
+        );
 
         // Fallback: hoodie.keygen.timebased.timezone used when 
output.timezone absent
         let configs = HudiConfigs::new([
@@ -802,10 +852,12 @@ mod tests {
         let keygen = 
TimestampBasedKeyGenerator::from_configs(&configs).unwrap();
         assert_eq!(keygen.output_timezone, chrono_tz::Asia::Tokyo);
 
-        // 2024-01-25 20:00:00 UTC = 2024-01-26 05:00:00 JST → day=26
+        // 2024-01-25 20:00:00 UTC = 2024-01-26 05:00:00 JST
         let dt = keygen.parse_timestamp("1706212800").unwrap();
-        let values = keygen.extract_partition_values(&dt);
-        assert_eq!(values.get("day"), Some(&"26".to_string()));
+        assert_eq!(
+            keygen.format_partition_path(&dt),
+            "year=2024/month=01/day=26"
+        );
 
         // Precedence: deprecated shared `timezone` wins over specific 
`output.timezone`
         let configs = HudiConfigs::new([
@@ -825,47 +877,55 @@ mod tests {
 
     #[test]
     fn test_transform_filter() {
-        // Equality: DATE_STRING → expands to all partition fields
+        // Equality: DATE_STRING → single _hoodie_partition_path filter with 
full path
         let keygen =
             
TimestampBasedKeyGenerator::from_configs(&create_test_configs_date_string()).unwrap();
 
         let filter = Filter {
             field_name: "ts_str".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "2023-04-01T12:01:00.123Z".to_string(),
+            values: vec!["2023-04-01T12:01:00.123Z".to_string()],
         };
         let transformed = keygen.transform_filter(&filter).unwrap();
-        assert_eq!(transformed.len(), 4);
-        assert_eq!(
-            (
-                transformed[0].field_name.as_str(),
-                transformed[0].field_value.as_str()
-            ),
-            ("year", "2023")
-        );
-        assert_eq!(
-            (
-                transformed[1].field_name.as_str(),
-                transformed[1].field_value.as_str()
-            ),
-            ("month", "04")
-        );
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].field_name, "_hoodie_partition_path");
+        assert_eq!(transformed[0].operator, ExprOperator::Eq);
         assert_eq!(
-            (
-                transformed[2].field_name.as_str(),
-                transformed[2].field_value.as_str()
-            ),
-            ("day", "01")
+            transformed[0].values[0],
+            "year=2023/month=04/day=01/hour=12"
         );
+
+        // Non-hive-style equality
+        let keygen =
+            
TimestampBasedKeyGenerator::from_configs(&create_test_configs_unix_timestamp())
+                .unwrap();
+        // 2024-01-25 00:00:00 UTC = 1706140800 seconds
+        let filter = Filter {
+            field_name: "event_timestamp".to_string(),
+            operator: ExprOperator::Eq,
+            values: vec!["1706140800".to_string()],
+        };
+        let transformed = keygen.transform_filter(&filter).unwrap();
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].values[0], "2024/01/25");
+
+        // Not-equal: now supported
+        let keygen =
+            
TimestampBasedKeyGenerator::from_configs(&create_test_configs_date_string()).unwrap();
+        let filter = Filter {
+            field_name: "ts_str".to_string(),
+            operator: ExprOperator::Ne,
+            values: vec!["2023-04-01T12:01:00.123Z".to_string()],
+        };
+        let transformed = keygen.transform_filter(&filter).unwrap();
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].operator, ExprOperator::Ne);
         assert_eq!(
-            (
-                transformed[3].field_name.as_str(),
-                transformed[3].field_value.as_str()
-            ),
-            ("hour", "12")
+            transformed[0].values[0],
+            "year=2023/month=04/day=01/hour=12"
         );
 
-        // Range operators: Gt/Gte → Gte, Lt/Lte → Lte (safe widening for 
partition boundaries)
+        // Range operators: Gt/Gte → Gte, Lt/Lte → Lte (safe widening)
         let keygen =
             
TimestampBasedKeyGenerator::from_configs(&create_test_configs_unix_timestamp())
                 .unwrap();
@@ -878,11 +938,12 @@ mod tests {
             let filter = Filter {
                 field_name: "event_timestamp".to_string(),
                 operator: input_op,
-                field_value: "1706140800".to_string(),
+                values: vec!["1706140800".to_string()],
             };
             let transformed = keygen.transform_filter(&filter).unwrap();
             assert_eq!(transformed.len(), 1, "Expected 1 filter for 
{input_op:?}");
-            assert_eq!(transformed[0].field_name, "yyyy");
+            assert_eq!(transformed[0].field_name, "_hoodie_partition_path");
+            assert_eq!(transformed[0].values[0], "2024/01/25");
             assert_eq!(
                 transformed[0].operator, expected_op,
                 "{input_op:?} should coerce to {expected_op:?}"
@@ -895,26 +956,12 @@ mod tests {
         let filter = Filter {
             field_name: "other_field".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "value".to_string(),
+            values: vec!["value".to_string()],
         };
         let transformed = keygen.transform_filter(&filter).unwrap();
         assert_eq!(transformed.len(), 1);
         assert_eq!(transformed[0].field_name, "other_field");
 
-        // Not-equal operator is rejected
-        let filter = Filter {
-            field_name: "ts_str".to_string(),
-            operator: ExprOperator::Ne,
-            field_value: "2023-04-01T12:01:00.123Z".to_string(),
-        };
-        assert!(
-            keygen
-                .transform_filter(&filter)
-                .unwrap_err()
-                .to_string()
-                .contains("Not-equal (!=) operator is not supported")
-        );
-
         // Invalid timestamp value produces error, not panic
         let unix_keygen =
             
TimestampBasedKeyGenerator::from_configs(&create_test_configs_unix_timestamp())
@@ -922,8 +969,103 @@ mod tests {
         let filter = Filter {
             field_name: "event_timestamp".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "not_a_number".to_string(),
+            values: vec!["not_a_number".to_string()],
         };
         assert!(unix_keygen.transform_filter(&filter).is_err());
     }
+
+    #[test]
+    fn test_transform_filter_in_not_in() {
+        let keygen =
+            
TimestampBasedKeyGenerator::from_configs(&create_test_configs_date_string()).unwrap();
+
+        // IN: multiple timestamps → multiple formatted paths in one filter
+        let filter = Filter::new(
+            "ts_str".to_string(),
+            ExprOperator::In,
+            vec![
+                "2023-04-01T12:00:00.000Z".to_string(),
+                "2023-06-15T08:00:00.000Z".to_string(),
+            ],
+        )
+        .unwrap();
+        let transformed = keygen.transform_filter(&filter).unwrap();
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].operator, ExprOperator::In);
+        assert_eq!(
+            transformed[0].values,
+            vec![
+                "year=2023/month=04/day=01/hour=12",
+                "year=2023/month=06/day=15/hour=08"
+            ]
+        );
+
+        // NOT IN
+        let filter = Filter::new(
+            "ts_str".to_string(),
+            ExprOperator::NotIn,
+            vec!["2023-04-01T12:00:00.000Z".to_string()],
+        )
+        .unwrap();
+        let transformed = keygen.transform_filter(&filter).unwrap();
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].operator, ExprOperator::NotIn);
+    }
+
+    #[test]
+    fn test_transform_filter_non_lex_sortable_skips_range() {
+        // Format MM/dd/yyyy is not lex-sortable → range ops return empty
+        let configs = HudiConfigs::new([
+            ("hoodie.table.partition.fields", "ts"),
+            ("hoodie.keygen.timebased.timestamp.type", "UNIX_TIMESTAMP"),
+            ("hoodie.keygen.timebased.output.dateformat", "MM/dd/yyyy"),
+            ("hoodie.datasource.write.hive_style_partitioning", "false"),
+        ]);
+        let keygen = 
TimestampBasedKeyGenerator::from_configs(&configs).unwrap();
+
+        // Range ops should return empty (skip pruning)
+        let filter = Filter {
+            field_name: "ts".to_string(),
+            operator: ExprOperator::Gte,
+            values: vec!["1706140800".to_string()],
+        };
+        assert!(keygen.transform_filter(&filter).unwrap().is_empty());
+
+        // But Eq still works
+        let filter = Filter {
+            field_name: "ts".to_string(),
+            operator: ExprOperator::Eq,
+            values: vec!["1706140800".to_string()],
+        };
+        let transformed = keygen.transform_filter(&filter).unwrap();
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].operator, ExprOperator::Eq);
+    }
+
+    #[test]
+    fn test_is_lex_sortable_format() {
+        let make_keygen = |fmt: &str| -> TimestampBasedKeyGenerator {
+            let configs = HudiConfigs::new([
+                ("hoodie.table.partition.fields", "ts"),
+                ("hoodie.keygen.timebased.timestamp.type", "UNIX_TIMESTAMP"),
+                ("hoodie.keygen.timebased.output.dateformat", fmt),
+                ("hoodie.datasource.write.hive_style_partitioning", "false"),
+            ]);
+            TimestampBasedKeyGenerator::from_configs(&configs).unwrap()
+        };
+
+        // Lex-sortable formats
+        assert!(make_keygen("yyyy/MM/dd").is_lex_sortable_format());
+        assert!(make_keygen("yyyy/MM/dd/HH").is_lex_sortable_format());
+        assert!(make_keygen("yyyy/MM").is_lex_sortable_format());
+        assert!(make_keygen("yyyy").is_lex_sortable_format());
+        assert!(make_keygen("yyyyMMdd").is_lex_sortable_format());
+        assert!(make_keygen("yyyyMMddHH").is_lex_sortable_format());
+        assert!(make_keygen("yyyy-MM-dd").is_lex_sortable_format());
+
+        // Not lex-sortable
+        assert!(!make_keygen("MM/dd/yyyy").is_lex_sortable_format());
+        assert!(!make_keygen("dd/MM/yyyy").is_lex_sortable_format());
+        assert!(!make_keygen("ddMMyyyy").is_lex_sortable_format());
+    }
 }
diff --git a/crates/core/src/table/file_pruner.rs 
b/crates/core/src/table/file_pruner.rs
index 500a2a3..50ef0a8 100644
--- a/crates/core/src/table/file_pruner.rs
+++ b/crates/core/src/table/file_pruner.rs
@@ -121,6 +121,12 @@ impl FilePruner {
     ///
     /// Returns `true` if the file can definitely be pruned (no rows can 
match).
     fn can_prune_by_filter(&self, filter: &SchemableFilter, col_stats: 
&ColumnStatistics) -> bool {
+        // Multi-value operators not yet supported for file-level pruning
+        // TODO: support IN/NOT IN by checking if all values are outside the 
min/max range
+        if filter.operator.is_multi_value() {
+            return false;
+        }
+
         // Get the filter value as an ArrayRef
         let filter_array = self.extract_filter_array(filter);
         let Some(filter_value) = filter_array else {
@@ -155,6 +161,9 @@ impl FilePruner {
                 // Prune if: max < value
                 self.can_prune_gte(&filter_value, max)
             }
+            ExprOperator::In | ExprOperator::NotIn => {
+                unreachable!("Multi-value operators are short-circuited above")
+            }
         }
     }
 
@@ -250,7 +259,7 @@ impl FilePruner {
 
     /// Extracts the filter value as an ArrayRef.
     fn extract_filter_array(&self, filter: &SchemableFilter) -> 
Option<ArrayRef> {
-        let (array, is_scalar) = filter.value.get();
+        let (array, is_scalar) = filter.values[0].get();
         if array.is_empty() {
             return None;
         }
@@ -531,6 +540,39 @@ mod tests {
         assert!(pruner.should_include(&stats2));
     }
 
+    #[test]
+    fn test_in_not_in_operators_no_prune() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        // IN operator should not prune (conservative approach)
+        let filters = vec![
+            Filter::new(
+                "id".to_string(),
+                ExprOperator::In,
+                vec!["5".to_string(), "10".to_string()],
+            )
+            .unwrap(),
+        ];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Even though 5 and 10 are below min=50, conservative approach 
includes file
+        let stats = create_stats_with_int_range("id", 50, 100);
+        assert!(pruner.should_include(&stats));
+
+        // NOT IN operator should also not prune
+        let filters = vec![
+            Filter::new(
+                "id".to_string(),
+                ExprOperator::NotIn,
+                vec!["5".to_string(), "10".to_string()],
+            )
+            .unwrap(),
+        ];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+        assert!(pruner.should_include(&stats));
+    }
+
     #[test]
     fn test_missing_column_stats_includes_file() {
         let table_schema = create_test_schema();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e861ed1..72d6b9d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -105,7 +105,9 @@ use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::expr::filter::{Filter, from_str_tuples};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
+use crate::keygen::is_timestamp_based_keygen;
 use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
+use crate::metadata::meta_field::MetaField;
 use crate::schema::resolver::{
     resolve_avro_schema, resolve_avro_schema_with_meta_fields, 
resolve_data_schema, resolve_schema,
 };
@@ -271,6 +273,16 @@ impl Table {
             )]));
         }
 
+        // Timestamp-based keygen: the source field is transformed into 
partition path
+        // strings, so use a single _hoodie_partition_path field.
+        if is_timestamp_based_keygen(&self.hudi_configs) {
+            return Ok(Schema::new(vec![Field::new(
+                MetaField::PartitionPath.as_ref(),
+                arrow_schema::DataType::Utf8,
+                false,
+            )]));
+        }
+
         let partition_fields: HashSet<String> = {
             let fields: Vec<String> = 
self.hudi_configs.get_or_default(PartitionFields).into();
             fields.into_iter().collect()
@@ -1001,7 +1013,7 @@ mod tests {
         let schema = hudi_table.get_partition_schema().await;
         assert!(schema.is_ok());
         let schema = schema.unwrap();
-        assert_arrow_field_names_eq!(schema, ["ts_str"]);
+        assert_arrow_field_names_eq!(schema, 
[MetaField::PartitionPath.as_ref()]);
     }
 
     #[tokio::test]
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 37699d7..3bce503 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -27,7 +27,9 @@ use 
crate::keygen::timestamp_based::TimestampBasedKeyGenerator;
 use arrow_array::{ArrayRef, Scalar};
 use arrow_schema::Schema;
 
-use crate::config::table::HudiTableConfig::{KeyGeneratorClass, 
PartitionFields};
+use crate::config::table::HudiTableConfig::{KeyGeneratorClass, 
KeyGeneratorType, PartitionFields};
+use crate::keygen::is_timestamp_based_keygen;
+use crate::metadata::meta_field::MetaField;
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -48,12 +50,11 @@ pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> 
bool {
         })
         .unwrap_or(false);
 
-    // v8+: also check hoodie.table.keygenerator.type for NON_PARTITION 
variants
     let uses_non_partitioned_type = hudi_configs
-        .as_options()
-        .get("hoodie.table.keygenerator.type")
+        .try_get(KeyGeneratorType)
         .map(|v| {
-            let upper = v.to_uppercase();
+            let s: String = v.into();
+            let upper = s.to_uppercase();
             upper == "NON_PARTITION" || upper == "NON_PARTITION_AVRO"
         })
         .unwrap_or(false);
@@ -144,34 +145,6 @@ impl PartitionPruner {
         })
     }
 
-    /// Returns true if the table uses a timestamp-based key generator,
-    /// checking both `hoodie.table.keygenerator.class` (v6) and
-    /// `hoodie.table.keygenerator.type` (v8+).
-    fn is_timestamp_based_keygen(hudi_configs: &HudiConfigs) -> bool {
-        // v6: hoodie.table.keygenerator.class contains 
"TimestampBasedKeyGenerator"
-        let by_class: bool = hudi_configs
-            .try_get(KeyGeneratorClass)
-            .map(|v| {
-                let s: String = v.into();
-                s.contains("TimestampBasedKeyGenerator")
-            })
-            .unwrap_or(false);
-
-        if by_class {
-            return true;
-        }
-
-        // v8+: hoodie.table.keygenerator.type = "TIMESTAMP" or 
"TIMESTAMP_AVRO"
-        let options = hudi_configs.as_options();
-        options
-            .get("hoodie.table.keygenerator.type")
-            .map(|v| {
-                let upper = v.to_uppercase();
-                upper == "TIMESTAMP" || upper == "TIMESTAMP_AVRO"
-            })
-            .unwrap_or(false)
-    }
-
     /// Transforms user filters on data columns to filters on partition path 
columns
     /// based on the configured key generator.
     fn transform_filters_for_keygen(
@@ -179,7 +152,7 @@ impl PartitionPruner {
         _partition_schema: &Schema,
         hudi_configs: &HudiConfigs,
     ) -> Result<Vec<Filter>> {
-        if Self::is_timestamp_based_keygen(hudi_configs) {
+        if is_timestamp_based_keygen(hudi_configs) {
             match TimestampBasedKeyGenerator::from_configs(hudi_configs) {
                 Ok(transformer) => {
                     return Self::apply_transformer_to_filters(filters, 
&transformer);
@@ -217,6 +190,20 @@ impl PartitionPruner {
             partition_path.to_string()
         };
 
+        // Special case: single _hoodie_partition_path field uses the raw path 
as-is
+        if self.schema.fields().len() == 1
+            && self.schema.field(0).name() == MetaField::PartitionPath.as_ref()
+        {
+            let scalar = SchemableFilter::cast_value(
+                &[partition_path.as_str()],
+                &arrow_schema::DataType::Utf8,
+            )?;
+            return Ok(HashMap::from([(
+                MetaField::PartitionPath.as_ref().to_string(),
+                scalar,
+            )]));
+        }
+
         let parts: Vec<&str> = partition_path.split('/').collect();
 
         if parts.len() != self.schema.fields().len() {
@@ -392,14 +379,14 @@ mod tests {
         let filter = Filter {
             field_name: "date".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "2023-01-01".to_string(),
+            values: vec!["2023-01-01".to_string()],
         };
 
         let partition_filter = SchemableFilter::try_from((filter, 
&schema)).unwrap();
         assert_eq!(partition_filter.field.name(), "date");
         assert_eq!(partition_filter.operator, ExprOperator::Eq);
 
-        let value_inner = partition_filter.value.into_inner();
+        let value_inner = partition_filter.values[0].clone().into_inner();
 
         let date_array = 
value_inner.as_any().downcast_ref::<Date32Array>().unwrap();
 
@@ -413,7 +400,7 @@ mod tests {
         let filter = Filter {
             field_name: "invalid_field".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "2023-01-01".to_string(),
+            values: vec!["2023-01-01".to_string()],
         };
         let result = SchemableFilter::try_from((filter, &schema));
         assert!(result.is_err());
@@ -431,7 +418,7 @@ mod tests {
         let filter = Filter {
             field_name: "count".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "not_a_number".to_string(),
+            values: vec!["not_a_number".to_string()],
         };
         let result = SchemableFilter::try_from((filter, &schema));
         assert!(result.is_err());
@@ -440,12 +427,8 @@ mod tests {
     #[test]
     fn test_partition_filter_try_from_all_operators() {
         let schema = create_test_schema();
-        for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
-            let filter = Filter {
-                field_name: "count".to_string(),
-                operator: ExprOperator::from_str(op).unwrap(),
-                field_value: "5".to_string(),
-            };
+        for (op, op_enum) in ExprOperator::TOKEN_OP_PAIRS {
+            let filter = Filter::new("count".to_string(), op_enum, 
vec!["5".to_string()]).unwrap();
             let partition_filter = SchemableFilter::try_from((filter, 
&schema));
             let filter = partition_filter.unwrap();
             assert_eq!(filter.field.name(), "count");
@@ -455,7 +438,13 @@ mod tests {
 
     #[test]
     fn test_transform_filters_for_keygen_timestamp_based() {
-        // Range filter: DATE_STRING Gte → year Gte
+        let partition_schema = Schema::new(vec![Field::new(
+            MetaField::PartitionPath.as_ref(),
+            DataType::Utf8,
+            false,
+        )]);
+
+        // Range filter: DATE_STRING Gte → single _hoodie_partition_path Gte
         let configs = HudiConfigs::new([
             ("hoodie.table.partition.fields", "ts_str"),
             (
@@ -471,16 +460,10 @@ mod tests {
             ("hoodie.datasource.write.hive_style_partitioning", "true"),
         ]);
 
-        let partition_schema = Schema::new(vec![
-            Field::new("year", DataType::Utf8, false),
-            Field::new("month", DataType::Utf8, false),
-            Field::new("day", DataType::Utf8, false),
-        ]);
-
         let user_filter = Filter {
             field_name: "ts_str".to_string(),
             operator: ExprOperator::Gte,
-            field_value: "2023-04-15T12:00:00.000Z".to_string(),
+            values: vec!["2023-04-15T12:00:00.000Z".to_string()],
         };
 
         let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -491,11 +474,11 @@ mod tests {
         .unwrap();
 
         assert_eq!(transformed.len(), 1);
-        assert_eq!(transformed[0].field_name, "year");
+        assert_eq!(transformed[0].field_name, 
MetaField::PartitionPath.as_ref());
         assert_eq!(transformed[0].operator, ExprOperator::Gte);
-        assert_eq!(transformed[0].field_value, "2023");
+        assert_eq!(transformed[0].values[0], "year=2023/month=04/day=15");
 
-        // Equality filter: UNIX_TIMESTAMP Eq → yyyy/MM/dd Eq
+        // Equality filter: UNIX_TIMESTAMP Eq → single path
         let configs = HudiConfigs::new([
             ("hoodie.table.partition.fields", "event_time"),
             (
@@ -507,17 +490,11 @@ mod tests {
             ("hoodie.datasource.write.hive_style_partitioning", "false"),
         ]);
 
-        let partition_schema = Schema::new(vec![
-            Field::new("yyyy", DataType::Utf8, false),
-            Field::new("MM", DataType::Utf8, false),
-            Field::new("dd", DataType::Utf8, false),
-        ]);
-
         // 2024-01-25 00:00:00 UTC = 1706140800 seconds
         let user_filter = Filter {
             field_name: "event_time".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "1706140800".to_string(),
+            values: vec!["1706140800".to_string()],
         };
 
         let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -527,13 +504,9 @@ mod tests {
         )
         .unwrap();
 
-        assert_eq!(transformed.len(), 3);
-        assert_eq!(transformed[0].field_name, "yyyy");
-        assert_eq!(transformed[0].field_value, "2024");
-        assert_eq!(transformed[1].field_name, "MM");
-        assert_eq!(transformed[1].field_value, "01");
-        assert_eq!(transformed[2].field_name, "dd");
-        assert_eq!(transformed[2].field_value, "25");
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].field_name, 
MetaField::PartitionPath.as_ref());
+        assert_eq!(transformed[0].values[0], "2024/01/25");
 
         // v8 detection via keygenerator.type=TIMESTAMP (no keygenerator.class)
         let configs = HudiConfigs::new([
@@ -548,16 +521,10 @@ mod tests {
             ("hoodie.datasource.write.hive_style_partitioning", "true"),
         ]);
 
-        let partition_schema = Schema::new(vec![
-            Field::new("year", DataType::Utf8, false),
-            Field::new("month", DataType::Utf8, false),
-            Field::new("day", DataType::Utf8, false),
-        ]);
-
         let user_filter = Filter {
             field_name: "ts_str".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "2023-04-15T12:00:00.000Z".to_string(),
+            values: vec!["2023-04-15T12:00:00.000Z".to_string()],
         };
 
         let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -567,9 +534,8 @@ mod tests {
         )
         .unwrap();
 
-        assert_eq!(transformed.len(), 3);
-        assert_eq!(transformed[0].field_name, "year");
-        assert_eq!(transformed[0].field_value, "2023");
+        assert_eq!(transformed.len(), 1);
+        assert_eq!(transformed[0].values[0], "year=2023/month=04/day=15");
     }
 
     #[test]
@@ -588,7 +554,7 @@ mod tests {
         let user_filter = Filter {
             field_name: "region".to_string(),
             operator: ExprOperator::Eq,
-            field_value: "us-west".to_string(),
+            values: vec!["us-west".to_string()],
         };
 
         let transformed = PartitionPruner::transform_filters_for_keygen(
@@ -600,7 +566,7 @@ mod tests {
 
         assert_eq!(transformed.len(), 1);
         assert_eq!(transformed[0].field_name, user_filter.field_name);
-        assert_eq!(transformed[0].field_value, user_filter.field_value);
+        assert_eq!(transformed[0].values[0], user_filter.values[0]);
     }
 
     #[test]
@@ -621,29 +587,56 @@ mod tests {
             ("hoodie.datasource.write.partitionpath.urlencode", "false"),
         ]);
 
-        let partition_schema = Schema::new(vec![
-            Field::new("year", DataType::Utf8, false),
-            Field::new("month", DataType::Utf8, false),
-            Field::new("day", DataType::Utf8, false),
-        ]);
+        let partition_schema = Schema::new(vec![Field::new(
+            MetaField::PartitionPath.as_ref(),
+            DataType::Utf8,
+            false,
+        )]);
 
         let user_filter = Filter {
             field_name: "ts".to_string(),
             operator: ExprOperator::Gte,
-            field_value: "2024-01-15T00:00:00Z".to_string(),
+            values: vec!["2024-01-15T00:00:00Z".to_string()],
         };
 
         let pruner = PartitionPruner::new(&[user_filter], &partition_schema, 
&configs).unwrap();
 
         assert!(!pruner.is_empty());
 
-        // Should include partitions >= 2024
+        // Full path string comparison on _hoodie_partition_path
         assert!(pruner.should_include("year=2024/month=01/day=15"));
         assert!(pruner.should_include("year=2024/month=06/day=30"));
         assert!(pruner.should_include("year=2025/month=01/day=01"));
 
-        // Should exclude partitions < 2024
+        // Should exclude partitions < 2024/01/15
         assert!(!pruner.should_include("year=2023/month=12/day=31"));
         assert!(!pruner.should_include("year=2022/month=01/day=01"));
+
+        // Non-hive-style: verify the same logic works
+        let configs = HudiConfigs::new([
+            ("hoodie.table.partition.fields", "ts"),
+            (
+                "hoodie.table.keygenerator.class",
+                "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
+            ),
+            ("hoodie.keygen.timebased.timestamp.type", "DATE_STRING"),
+            (
+                "hoodie.keygen.timebased.input.dateformat",
+                "yyyy-MM-dd'T'HH:mm:ssZ",
+            ),
+            ("hoodie.keygen.timebased.output.dateformat", "yyyy/MM/dd"),
+            ("hoodie.datasource.write.hive_style_partitioning", "false"),
+        ]);
+
+        let user_filter = Filter {
+            field_name: "ts".to_string(),
+            operator: ExprOperator::Eq,
+            values: vec!["2024-01-15T00:00:00Z".to_string()],
+        };
+
+        let pruner = PartitionPruner::new(&[user_filter], &partition_schema, 
&configs).unwrap();
+        assert!(pruner.should_include("2024/01/15"));
+        assert!(!pruner.should_include("2024/01/16"));
+        assert!(!pruner.should_include("2023/12/31"));
     }
 }
diff --git a/crates/datafusion/src/util/expr.rs 
b/crates/datafusion/src/util/expr.rs
index 0b68649..8a67876 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -18,6 +18,7 @@
  */
 
 use datafusion::logical_expr::Operator;
+use datafusion_expr::expr::InList;
 use datafusion_expr::{Between, BinaryExpr, Expr};
 use hudi_core::expr::filter::{Filter as HudiFilter, col};
 use log::{debug, warn};
@@ -33,6 +34,7 @@ use log::{debug, warn};
 /// - `NOT` expressions: negates inner binary expression
 /// - `AND` compound expressions: recursively flattens both sides
 /// - `BETWEEN` expressions: converts to `>= low AND <= high`
+/// - `IN` / `NOT IN` expressions: converts to `IN` / `NOT IN` filters
 ///
 /// # OR Expression Handling
 ///
@@ -91,6 +93,7 @@ fn expr_to_filters(expr: &Expr) -> Vec<HudiFilter> {
         },
         Expr::Not(not_expr) => 
not_expr_to_filter(not_expr).into_iter().collect(),
         Expr::Between(between) => between_to_filters(between),
+        Expr::InList(in_list) => 
inlist_expr_to_filter(in_list).into_iter().collect(),
         _ => vec![],
     }
 }
@@ -176,6 +179,46 @@ fn between_to_filters(between: &Between) -> 
Vec<HudiFilter> {
     ]
 }
 
+/// Converts an IN list expression into a HudiFilter with IN or NOT IN 
operator.
+///
+/// Returns None if the expression cannot be pushed down (non-column expr,
+/// non-literal values, or empty list).
+fn inlist_expr_to_filter(in_list: &InList) -> Option<HudiFilter> {
+    let column = match in_list.expr.as_ref() {
+        Expr::Column(col) => col,
+        _ => {
+            debug!("IN list with non-column expression cannot be pushed down");
+            return None;
+        }
+    };
+
+    if in_list.list.is_empty() {
+        debug!("Empty IN list cannot be pushed down");
+        return None;
+    }
+
+    let values: Vec<String> = in_list
+        .list
+        .iter()
+        .filter_map(|expr| match expr {
+            Expr::Literal(lit, _) => Some(lit.to_string()),
+            _ => None,
+        })
+        .collect();
+
+    if values.len() != in_list.list.len() {
+        debug!("IN list contains non-literal values, cannot be pushed down");
+        return None;
+    }
+
+    let field = col(column.name());
+    if in_list.negated {
+        Some(field.not_in_list(values))
+    } else {
+        Some(field.in_list(values))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -205,14 +248,14 @@ mod tests {
         let expected_filter = HudiFilter {
             field_name: schema.field(0).name().to_string(),
             operator: ExprOperator::Eq,
-            field_value: "42".to_string(),
+            values: vec!["42".to_string()],
         };
         assert_eq!(
             result[0],
             (
                 expected_filter.field_name,
                 expected_filter.operator.to_string(),
-                expected_filter.field_value
+                expected_filter.values.join(",")
             )
         );
     }
@@ -237,14 +280,14 @@ mod tests {
         let expected_filter = HudiFilter {
             field_name: schema.field(0).name().to_string(),
             operator: ExprOperator::Ne,
-            field_value: "42".to_string(),
+            values: vec!["42".to_string()],
         };
         assert_eq!(
             result[0],
             (
                 expected_filter.field_name,
                 expected_filter.operator.to_string(),
-                expected_filter.field_value
+                expected_filter.values.join(",")
             )
         );
     }
@@ -258,7 +301,7 @@ mod tests {
                 Some(HudiFilter {
                     field_name: String::from("int32_col"),
                     operator: ExprOperator::Eq,
-                    field_value: String::from("42"),
+                    values: vec![String::from("42")],
                 }),
             ),
             (
@@ -266,7 +309,7 @@ mod tests {
                 Some(HudiFilter {
                     field_name: String::from("int64_col"),
                     operator: ExprOperator::Gte,
-                    field_value: String::from("100"),
+                    values: vec![String::from("100")],
                 }),
             ),
             (
@@ -274,7 +317,7 @@ mod tests {
                 Some(HudiFilter {
                     field_name: String::from("float64_col"),
                     operator: ExprOperator::Lt,
-                    field_value: "32.666".to_string(),
+                    values: vec!["32.666".to_string()],
                 }),
             ),
             (
@@ -282,7 +325,7 @@ mod tests {
                 Some(HudiFilter {
                     field_name: String::from("string_col"),
                     operator: ExprOperator::Ne,
-                    field_value: String::from("test"),
+                    values: vec![String::from("test")],
                 }),
             ),
         ];
@@ -302,7 +345,7 @@ mod tests {
                 &(
                     expected_filter.field_name.clone(),
                     expected_filter.operator.to_string(),
-                    expected_filter.field_value.clone()
+                    expected_filter.values.join(",").clone()
                 )
             );
         }
@@ -336,14 +379,14 @@ mod tests {
             let expected_filter = HudiFilter {
                 field_name: schema.field(0).name().to_string(),
                 operator: expected_op,
-                field_value: String::from("42"),
+                values: vec![String::from("42")],
             };
             assert_eq!(
                 result[0],
                 (
                     expected_filter.field_name,
                     expected_filter.operator.to_string(),
-                    expected_filter.field_value
+                    expected_filter.values.join(",")
                 )
             );
         }
@@ -655,4 +698,68 @@ mod tests {
         );
         assert_eq!(result[0].0, "col_a");
     }
+
+    #[test]
+    fn test_convert_in_list() {
+        // col IN ('a', 'b', 'c')
+        let in_list = Expr::InList(InList::new(
+            Box::new(col("part")),
+            vec![lit("a"), lit("b"), lit("c")],
+            false,
+        ));
+        let result = exprs_to_filters(&[in_list]);
+        assert_eq!(result.len(), 1);
+        assert_eq!(result[0].0, "part");
+        assert_eq!(result[0].1, "IN");
+        assert_eq!(result[0].2, "a,b,c");
+
+        // col NOT IN ('x', 'y')
+        let not_in = Expr::InList(InList::new(
+            Box::new(col("part")),
+            vec![lit("x"), lit("y")],
+            true,
+        ));
+        let result = exprs_to_filters(&[not_in]);
+        assert_eq!(result.len(), 1);
+        assert_eq!(result[0].0, "part");
+        assert_eq!(result[0].1, "NOT IN");
+        assert_eq!(result[0].2, "x,y");
+
+        // Integer IN list
+        let in_int = Expr::InList(InList::new(
+            Box::new(col("id")),
+            vec![lit(40i32), lit(60i32)],
+            false,
+        ));
+        let result = exprs_to_filters(&[in_int]);
+        assert_eq!(result.len(), 1);
+        assert_eq!(result[0].1, "IN");
+    }
+
+    #[test]
+    fn test_convert_in_list_unsupported_cases() {
+        // Empty list
+        let empty = Expr::InList(InList::new(Box::new(col("col1")), vec![], 
false));
+        assert!(exprs_to_filters(&[empty]).is_empty());
+
+        // Non-literal values
+        let non_lit = Expr::InList(InList::new(
+            Box::new(col("col1")),
+            vec![col("col2"), col("col3")],
+            false,
+        ));
+        assert!(exprs_to_filters(&[non_lit]).is_empty());
+
+        // Non-column expression
+        let non_col = Expr::InList(InList::new(
+            Box::new(Expr::BinaryExpr(BinaryExpr::new(
+                Box::new(col("col1")),
+                Operator::Plus,
+                Box::new(col("col2")),
+            ))),
+            vec![lit(1i32)],
+            false,
+        ));
+        assert!(exprs_to_filters(&[non_col]).is_empty());
+    }
 }

Reply via email to