This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new eb0f520 feat: support filter pushdown for datafusion (#203)
eb0f520 is described below
commit eb0f5209fc53b754b9cdc13fceb5636befd2c804
Author: Jonathan Chen <[email protected]>
AuthorDate: Sat Dec 7 22:19:52 2024 -0500
feat: support filter pushdown for datafusion (#203)
---------
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/core/src/error.rs | 3 +
crates/core/src/{lib.rs => expr/filter.rs} | 69 +++--
crates/core/src/expr/mod.rs | 118 +++++++++
crates/core/src/lib.rs | 1 +
crates/core/src/table/fs_view.rs | 8 +-
crates/core/src/table/mod.rs | 93 +++----
crates/core/src/table/partition.rs | 275 +++++++++-----------
crates/datafusion/Cargo.toml | 3 +
crates/datafusion/src/lib.rs | 126 ++++++++-
crates/datafusion/src/util/expr.rs | 283 +++++++++++++++++++++
.../src/lib.rs => datafusion/src/util/mod.rs} | 35 +--
python/src/internal.rs | 48 ++--
python/tests/test_table_read.py | 20 ++
13 files changed, 783 insertions(+), 299 deletions(-)
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 6d4f0b4..b94e39b 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -30,6 +30,9 @@ pub enum CoreError {
#[error("Config error: {0}")]
Config(#[from] ConfigError),
+ #[error("Data type error: {0}")]
+ DataType(String),
+
#[error("File group error: {0}")]
FileGroup(String),
diff --git a/crates/core/src/lib.rs b/crates/core/src/expr/filter.rs
similarity index 50%
copy from crates/core/src/lib.rs
copy to crates/core/src/expr/filter.rs
index 42079a1..cf7ee5d 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/expr/filter.rs
@@ -16,38 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-//! Crate `hudi-core`.
-//!
-//! # The [config] module is responsible for managing configurations.
-//!
-//! **Example**
-//!
-//! ```rust
-//! use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp,
InputPartitions};
-//! use hudi_core::table::Table as HudiTable;
-//!
-//! let options = [(InputPartitions, "2"), (AsOfTimestamp,
"20240101010100000")];
-//! HudiTable::new_with_options("/tmp/hudi_data", options);
-//! ```
-//!
-//! # The [table] module is responsible for managing Hudi tables.
-//!
-//! **Example**
-//!
-//! create hudi table
-//! ```rust
-//! use hudi_core::table::Table;
-//!
-//! pub async fn test() {
-//! let hudi_table = Table::new("/tmp/hudi_data").await.unwrap();
-//! }
-//! ```
-
-pub mod config;
-pub mod error;
-pub mod file_group;
-pub mod storage;
-pub mod table;
-pub mod util;
-
-use error::Result;
+
+use crate::error::CoreError;
+use crate::expr::ExprOperator;
+use crate::Result;
+use std::str::FromStr;
+
+#[derive(Debug, Clone)]
+pub struct Filter {
+ pub field_name: String,
+ pub operator: ExprOperator,
+ pub field_value: String,
+}
+
+impl Filter {}
+
+impl TryFrom<(&str, &str, &str)> for Filter {
+ type Error = CoreError;
+
+ fn try_from(binary_expr_tuple: (&str, &str, &str)) -> Result<Self,
Self::Error> {
+ let (field_name, operator_str, field_value) = binary_expr_tuple;
+
+ let field_name = field_name.to_string();
+
+ let operator = ExprOperator::from_str(operator_str)?;
+
+ let field_value = field_value.to_string();
+
+ Ok(Filter {
+ field_name,
+ operator,
+ field_value,
+ })
+ }
+}
diff --git a/crates/core/src/expr/mod.rs b/crates/core/src/expr/mod.rs
new file mode 100644
index 0000000..d592c3f
--- /dev/null
+++ b/crates/core/src/expr/mod.rs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod filter;
+
+use crate::error::CoreError;
+use crate::error::CoreError::Unsupported;
+
+use std::cmp::PartialEq;
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::str::FromStr;
+
+/// An operator that represents a comparison operation used in a partition
filter expression.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum ExprOperator {
+ Eq,
+ Ne,
+ Lt,
+ Lte,
+ Gt,
+ Gte,
+}
+
+impl Display for ExprOperator {
+ fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
+ match self {
+ // Binary Operators
+ ExprOperator::Eq => write!(f, "="),
+ ExprOperator::Ne => write!(f, "!="),
+ ExprOperator::Lt => write!(f, "<"),
+ ExprOperator::Lte => write!(f, "<="),
+ ExprOperator::Gt => write!(f, ">"),
+ ExprOperator::Gte => write!(f, ">="),
+ }
+ }
+}
+
+impl ExprOperator {
+ pub const TOKEN_OP_PAIRS: [(&'static str, ExprOperator); 6] = [
+ ("=", ExprOperator::Eq),
+ ("!=", ExprOperator::Ne),
+ ("<", ExprOperator::Lt),
+ ("<=", ExprOperator::Lte),
+ (">", ExprOperator::Gt),
+ (">=", ExprOperator::Gte),
+ ];
+
+ /// Negates the operator.
+ pub fn negate(&self) -> Option<ExprOperator> {
+ match self {
+ ExprOperator::Eq => Some(ExprOperator::Ne),
+ ExprOperator::Ne => Some(ExprOperator::Eq),
+ ExprOperator::Lt => Some(ExprOperator::Gte),
+ ExprOperator::Lte => Some(ExprOperator::Gt),
+ ExprOperator::Gt => Some(ExprOperator::Lte),
+ ExprOperator::Gte => Some(ExprOperator::Lt),
+ }
+ }
+}
+
+impl FromStr for ExprOperator {
+ type Err = CoreError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ ExprOperator::TOKEN_OP_PAIRS
+ .iter()
+ .find_map(|&(token, op)| {
+ if token.eq_ignore_ascii_case(s) {
+ Some(op)
+ } else {
+ None
+ }
+ })
+ .ok_or_else(|| Unsupported(format!("Unsupported operator: {}", s)))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_operator_from_str() {
+ assert_eq!(ExprOperator::from_str("=").unwrap(), ExprOperator::Eq);
+ assert_eq!(ExprOperator::from_str("!=").unwrap(), ExprOperator::Ne);
+ assert_eq!(ExprOperator::from_str("<").unwrap(), ExprOperator::Lt);
+ assert_eq!(ExprOperator::from_str("<=").unwrap(), ExprOperator::Lte);
+ assert_eq!(ExprOperator::from_str(">").unwrap(), ExprOperator::Gt);
+ assert_eq!(ExprOperator::from_str(">=").unwrap(), ExprOperator::Gte);
+ assert!(ExprOperator::from_str("??").is_err());
+ }
+
+ #[test]
+ fn test_operator_display() {
+ assert_eq!(ExprOperator::Eq.to_string(), "=");
+ assert_eq!(ExprOperator::Ne.to_string(), "!=");
+ assert_eq!(ExprOperator::Lt.to_string(), "<");
+ assert_eq!(ExprOperator::Lte.to_string(), "<=");
+ assert_eq!(ExprOperator::Gt.to_string(), ">");
+ assert_eq!(ExprOperator::Gte.to_string(), ">=");
+ }
+}
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 42079a1..c9ce815 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -45,6 +45,7 @@
pub mod config;
pub mod error;
+pub mod expr;
pub mod file_group;
pub mod storage;
pub mod table;
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 092b352..6ca5d9b 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -180,10 +180,12 @@ impl FileSystemView {
mod tests {
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
+ use crate::expr::filter::Filter;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::Table;
+
use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -298,12 +300,16 @@ mod tests {
.await
.unwrap();
let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
+
+ let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
+ let filter_eq_300 = Filter::try_from(("shortField", "=",
"300")).unwrap();
let partition_pruner = PartitionPruner::new(
- &[("byteField", "<", "20"), ("shortField", "=", "300")],
+ &[filter_lt_20, filter_eq_300],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
)
.unwrap();
+
let file_slices = fs_view
.get_file_slices_as_of("20240418173235694", &partition_pruner,
excludes)
.await
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 9e7a495..f83aa91 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -84,18 +84,17 @@
//! }
//! ```
-use std::collections::{HashMap, HashSet};
-use std::sync::Arc;
-
-use arrow::record_batch::RecordBatch;
-use arrow_schema::{Field, Schema};
-use url::Url;
+pub mod builder;
+mod fs_view;
+pub mod partition;
+mod timeline;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig;
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::HudiConfigs;
use crate::error::CoreError;
+use crate::expr::filter::Filter;
use crate::file_group::reader::FileGroupReader;
use crate::file_group::FileSlice;
use crate::table::builder::TableBuilder;
@@ -104,10 +103,11 @@ use crate::table::partition::PartitionPruner;
use crate::table::timeline::Timeline;
use crate::Result;
-pub mod builder;
-mod fs_view;
-mod partition;
-mod timeline;
+use arrow::record_batch::RecordBatch;
+use arrow_schema::{Field, Schema};
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+use url::Url;
/// Hudi Table in-memory
#[derive(Clone, Debug)]
@@ -195,10 +195,11 @@ impl Table {
/// The file slices are split into `n` chunks.
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
+ ///
pub async fn get_file_slices_splits(
&self,
n: usize,
- filters: &[(&str, &str, &str)],
+ filters: &[Filter],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices(filters).await?;
if file_slices.is_empty() {
@@ -217,7 +218,7 @@ impl Table {
/// Get all the [FileSlice]s in the table.
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
- pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<FileSlice>> {
+ pub async fn get_file_slices(&self, filters: &[Filter]) ->
Result<Vec<FileSlice>> {
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.get_file_slices_as_of(timestamp.to::<String>().as_str(),
filters)
.await
@@ -232,7 +233,7 @@ impl Table {
async fn get_file_slices_as_of(
&self,
timestamp: &str,
- filters: &[(&str, &str, &str)],
+ filters: &[Filter],
) -> Result<Vec<FileSlice>> {
let excludes = self.timeline.get_replaced_file_groups().await?;
let partition_schema = self.get_partition_schema().await?;
@@ -250,7 +251,7 @@ impl Table {
/// Get all the latest records in the table.
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
- pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
+ pub async fn read_snapshot(&self, filters: &[Filter]) ->
Result<Vec<RecordBatch>> {
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(timestamp.to::<String>().as_str(),
filters)
.await
@@ -265,7 +266,7 @@ impl Table {
async fn read_snapshot_as_of(
&self,
timestamp: &str,
- filters: &[(&str, &str, &str)],
+ filters: &[Filter],
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
let fg_reader = self.create_file_group_reader();
@@ -298,12 +299,13 @@ impl Table {
mod tests {
use super::*;
use arrow_array::StringArray;
+ use hudi_tests::{assert_not, TestTable};
+ use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
use std::{env, panic};
- use hudi_tests::{assert_not, TestTable};
-
+ use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig::{
BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields,
IsHiveStylePartitioning,
IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields,
PopulatesMetaFields,
@@ -313,6 +315,7 @@ mod tests {
use crate::config::HUDI_CONF_DIR;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
+ use crate::table::Filter;
/// Test helper to create a new `Table` instance without validating the
configuration.
///
@@ -333,10 +336,7 @@ mod tests {
}
/// Test helper to get relative file paths from the table with filters.
- async fn get_file_paths_with_filters(
- table: &Table,
- filters: &[(&str, &str, &str)],
- ) -> Result<Vec<String>> {
+ async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) ->
Result<Vec<String>> {
let mut file_paths = Vec::new();
for f in table.get_file_slices(filters).await? {
file_paths.push(f.base_file_path().to_string());
@@ -733,8 +733,11 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &[("byteField", ">=", "10"), ("byteField",
"<", "30")];
- let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
+ let filter_ge_10 = Filter::try_from(("byteField", ">=",
"10")).unwrap();
+
+ let filter_lt_30 = Filter::try_from(("byteField", "<", "30")).unwrap();
+
+ let actual = get_file_paths_with_filters(&hudi_table, &[filter_ge_10,
filter_lt_30])
.await
.unwrap()
.into_iter()
@@ -748,8 +751,8 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &[("byteField", ">", "30")];
- let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
+ let filter_gt_30 = Filter::try_from(("byteField", ">", "30")).unwrap();
+ let actual = get_file_paths_with_filters(&hudi_table, &[filter_gt_30])
.await
.unwrap()
.into_iter()
@@ -780,16 +783,16 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &[
- ("byteField", ">=", "10"),
- ("byteField", "<", "20"),
- ("shortField", "!=", "100"),
- ];
- let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
- .await
- .unwrap()
- .into_iter()
- .collect::<HashSet<_>>();
+ let filter_gte_10 = Filter::try_from(("byteField", ">=",
"10")).unwrap();
+ let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
+ let filter_ne_100 = Filter::try_from(("shortField", "!=",
"100")).unwrap();
+
+ let actual =
+ get_file_paths_with_filters(&hudi_table, &[filter_gte_10,
filter_lt_20, filter_ne_100])
+ .await
+ .unwrap()
+ .into_iter()
+ .collect::<HashSet<_>>();
let expected = [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
@@ -797,9 +800,10 @@ mod tests {
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
+ let filter_lt_20 = Filter::try_from(("byteField", ">", "20")).unwrap();
+ let filter_eq_300 = Filter::try_from(("shortField", "=",
"300")).unwrap();
- let partition_filters = &[("byteField", ">", "20"), ("shortField",
"=", "300")];
- let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table, &[filter_lt_20,
filter_eq_300])
.await
.unwrap()
.into_iter()
@@ -812,12 +816,15 @@ mod tests {
async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let partition_filters = &[
- ("byteField", ">=", "10"),
- ("byteField", "<", "20"),
- ("shortField", "!=", "100"),
- ];
- let records =
hudi_table.read_snapshot(partition_filters).await.unwrap();
+
+ let filter_gte_10 = Filter::try_from(("byteField", ">=",
"10")).unwrap();
+ let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
+ let filter_ne_100 = Filter::try_from(("shortField", "!=",
"100")).unwrap();
+
+ let records = hudi_table
+ .read_snapshot(&[filter_gte_10, filter_lt_20, filter_ne_100])
+ .await
+ .unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 2);
let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index ca41f73..14f6b30 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -18,16 +18,19 @@
*/
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
-use crate::error::CoreError;
-use crate::error::CoreError::{InvalidPartitionPath, Unsupported};
+use crate::error::CoreError::InvalidPartitionPath;
+use crate::expr::filter::Filter;
+use crate::expr::ExprOperator;
use crate::Result;
+
use arrow_array::{ArrayRef, Scalar, StringArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{DataType, Field, Schema};
-use std::cmp::PartialEq;
+use arrow_schema::Schema;
+use arrow_schema::{DataType, Field};
+
+use crate::table::CoreError;
use std::collections::HashMap;
-use std::str::FromStr;
use std::sync::Arc;
/// A partition pruner that filters partitions based on the partition path and
its filters.
@@ -41,13 +44,13 @@ pub struct PartitionPruner {
impl PartitionPruner {
pub fn new(
- and_filters: &[(&str, &str, &str)],
+ and_filters: &[Filter],
partition_schema: &Schema,
hudi_configs: &HudiConfigs,
) -> Result<Self> {
let and_filters = and_filters
.iter()
- .map(|filter| PartitionFilter::try_from((*filter,
partition_schema)))
+ .map(|filter| PartitionFilter::try_from((filter.clone(),
partition_schema)))
.collect::<Result<Vec<PartitionFilter>>>()?;
let schema = Arc::new(partition_schema.clone());
@@ -91,12 +94,12 @@ impl PartitionPruner {
match segments.get(filter.field.name()) {
Some(segment_value) => {
let comparison_result = match filter.operator {
- Operator::Eq => eq(segment_value, &filter.value),
- Operator::Ne => neq(segment_value, &filter.value),
- Operator::Lt => lt(segment_value, &filter.value),
- Operator::Lte => lt_eq(segment_value, &filter.value),
- Operator::Gt => gt(segment_value, &filter.value),
- Operator::Gte => gt_eq(segment_value, &filter.value),
+ ExprOperator::Eq => eq(segment_value, &filter.value),
+ ExprOperator::Ne => neq(segment_value, &filter.value),
+ ExprOperator::Lt => lt(segment_value, &filter.value),
+ ExprOperator::Lte => lt_eq(segment_value,
&filter.value),
+ ExprOperator::Gt => gt(segment_value, &filter.value),
+ ExprOperator::Gte => gt_eq(segment_value,
&filter.value),
};
match comparison_result {
@@ -155,60 +158,24 @@ impl PartitionPruner {
}
}
-/// An operator that represents a comparison operation used in a partition
filter expression.
-#[derive(Debug, Clone, Copy, PartialEq)]
-enum Operator {
- Eq,
- Ne,
- Lt,
- Lte,
- Gt,
- Gte,
-}
-
-impl Operator {
- const TOKEN_OP_PAIRS: [(&'static str, Operator); 6] = [
- ("=", Operator::Eq),
- ("!=", Operator::Ne),
- ("<", Operator::Lt),
- ("<=", Operator::Lte),
- (">", Operator::Gt),
- (">=", Operator::Gte),
- ];
-}
-
-impl FromStr for Operator {
- type Err = CoreError;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- Operator::TOKEN_OP_PAIRS
- .iter()
- .find_map(|&(token, op)| if token == s { Some(op) } else { None })
- .ok_or(Unsupported(format!("Unsupported operator: {}", s)))
- }
-}
-
/// A partition filter that represents a filter expression for partition
pruning.
#[derive(Debug, Clone)]
pub struct PartitionFilter {
- field: Field,
- operator: Operator,
- value: Scalar<ArrayRef>,
+ pub field: Field,
+ pub operator: ExprOperator,
+ pub value: Scalar<ArrayRef>,
}
-impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
+impl TryFrom<(Filter, &Schema)> for PartitionFilter {
type Error = CoreError;
- fn try_from(
- (filter, partition_schema): ((&str, &str, &str), &Schema),
- ) -> Result<Self, Self::Error> {
- let (field_name, operator_str, value_str) = filter;
+ fn try_from((filter, partition_schema): (Filter, &Schema)) -> Result<Self,
Self::Error> {
+ let field: &Field = partition_schema
+ .field_with_name(&filter.field_name)
+ .map_err(|_| InvalidPartitionPath("Partition path should be in
schema.".to_string()))?;
- let field: &Field = partition_schema.field_with_name(field_name)?;
-
- let operator = Operator::from_str(operator_str)?;
-
- let value = &[value_str];
+ let operator = filter.operator;
+ let value = &[filter.field_value.as_str()];
let value = Self::cast_value(value, field.data_type())?;
let field = field.clone();
@@ -221,7 +188,7 @@ impl TryFrom<((&str, &str, &str), &Schema)> for
PartitionFilter {
}
impl PartitionFilter {
- fn cast_value(value: &[&str; 1], data_type: &DataType) ->
Result<Scalar<ArrayRef>> {
+ pub fn cast_value(value: &[&str; 1], data_type: &DataType) ->
Result<Scalar<ArrayRef>> {
let cast_options = CastOptions {
safe: false,
format_options: Default::default(),
@@ -229,11 +196,11 @@ impl PartitionFilter {
let value = StringArray::from(Vec::from(value));
- Ok(Scalar::new(cast_with_options(
- &value,
- data_type,
- &cast_options,
- )?))
+ Ok(Scalar::new(
+ cast_with_options(&value, data_type, &cast_options).map_err(|e| {
+ CoreError::DataType(format!("Unable to cast {:?}: {:?}",
data_type, e))
+ })?,
+ ))
}
}
@@ -243,8 +210,9 @@ mod tests {
use crate::config::table::HudiTableConfig::{
IsHiveStylePartitioning, IsPartitionPathUrlencoded,
};
+
use arrow::datatypes::{DataType, Field, Schema};
- use arrow_array::{Array, Datum};
+ use arrow_array::Date32Array;
use hudi_tests::assert_not;
use std::str::FromStr;
@@ -256,90 +224,6 @@ mod tests {
])
}
- #[test]
- fn test_partition_filter_try_from_valid() {
- let schema = create_test_schema();
- let filter_tuple = ("date", "=", "2023-01-01");
- let filter = PartitionFilter::try_from((filter_tuple, &schema));
- assert!(filter.is_ok());
- let filter = filter.unwrap();
- assert_eq!(filter.field.name(), "date");
- assert_eq!(filter.operator, Operator::Eq);
- assert_eq!(filter.value.get().0.len(), 1);
-
- let filter_tuple = ("category", "!=", "foo");
- let filter = PartitionFilter::try_from((filter_tuple, &schema));
- assert!(filter.is_ok());
- let filter = filter.unwrap();
- assert_eq!(filter.field.name(), "category");
- assert_eq!(filter.operator, Operator::Ne);
- assert_eq!(filter.value.get().0.len(), 1);
- assert_eq!(
- StringArray::from(filter.value.into_inner().to_data()).value(0),
- "foo"
- )
- }
-
- #[test]
- fn test_partition_filter_try_from_invalid_field() {
- let schema = create_test_schema();
- let filter_tuple = ("invalid_field", "=", "2023-01-01");
- let filter = PartitionFilter::try_from((filter_tuple, &schema));
- assert!(filter.is_err());
- assert!(filter
- .unwrap_err()
- .to_string()
- .contains("Unable to get field named"));
- }
-
- #[test]
- fn test_partition_filter_try_from_invalid_operator() {
- let schema = create_test_schema();
- let filter_tuple = ("date", "??", "2023-01-01");
- let filter = PartitionFilter::try_from((filter_tuple, &schema));
- assert!(filter.is_err());
- assert!(filter
- .unwrap_err()
- .to_string()
- .contains("Unsupported operator: ??"));
- }
-
- #[test]
- fn test_partition_filter_try_from_invalid_value() {
- let schema = create_test_schema();
- let filter_tuple = ("count", "=", "not_a_number");
- let filter = PartitionFilter::try_from((filter_tuple, &schema));
- assert!(filter.is_err());
- assert!(filter
- .unwrap_err()
- .to_string()
- .contains("Cannot cast string"));
- }
-
- #[test]
- fn test_partition_filter_try_from_all_operators() {
- let schema = create_test_schema();
- for (op, _) in Operator::TOKEN_OP_PAIRS {
- let filter_tuple = ("count", op, "10");
- let filter = PartitionFilter::try_from((filter_tuple, &schema));
- assert!(filter.is_ok(), "Failed for operator: {}", op);
- let filter = filter.unwrap();
- assert_eq!(filter.field.name(), "count");
- assert_eq!(filter.operator, Operator::from_str(op).unwrap());
- }
- }
-
- #[test]
- fn test_operator_from_str() {
- assert_eq!(Operator::from_str("=").unwrap(), Operator::Eq);
- assert_eq!(Operator::from_str("!=").unwrap(), Operator::Ne);
- assert_eq!(Operator::from_str("<").unwrap(), Operator::Lt);
- assert_eq!(Operator::from_str("<=").unwrap(), Operator::Lte);
- assert_eq!(Operator::from_str(">").unwrap(), Operator::Gt);
- assert_eq!(Operator::from_str(">=").unwrap(), Operator::Gte);
- assert!(Operator::from_str("??").is_err());
- }
-
fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) ->
HudiConfigs {
HudiConfigs::new([
(IsHiveStylePartitioning, is_hive_style.to_string()),
@@ -350,9 +234,11 @@ mod tests {
fn test_partition_pruner_new() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
- let filters = vec![("date", ">", "2023-01-01"), ("category", "=",
"A")];
- let pruner = PartitionPruner::new(&filters, &schema, &configs);
+ let filter_gt_date = Filter::try_from(("date", ">",
"2023-01-01")).unwrap();
+ let filter_eq_a = Filter::try_from(("category", "=", "A")).unwrap();
+
+ let pruner = PartitionPruner::new(&[filter_gt_date, filter_eq_a],
&schema, &configs);
assert!(pruner.is_ok());
let pruner = pruner.unwrap();
@@ -377,8 +263,8 @@ mod tests {
let pruner_empty = PartitionPruner::new(&[], &schema,
&configs).unwrap();
assert!(pruner_empty.is_empty());
- let pruner_non_empty =
- PartitionPruner::new(&[("date", ">", "2023-01-01")], &schema,
&configs).unwrap();
+ let filter_gt_date = Filter::try_from(("date", ">",
"2023-01-01")).unwrap();
+ let pruner_non_empty = PartitionPruner::new(&[filter_gt_date],
&schema, &configs).unwrap();
assert_not!(pruner_non_empty.is_empty());
}
@@ -386,13 +272,17 @@ mod tests {
fn test_partition_pruner_should_include() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
- let filters = vec![
- ("date", ">", "2023-01-01"),
- ("category", "=", "A"),
- ("count", "<=", "100"),
- ];
- let pruner = PartitionPruner::new(&filters, &schema,
&configs).unwrap();
+ let filter_gt_date = Filter::try_from(("date", ">",
"2023-01-01")).unwrap();
+ let filter_eq_a = Filter::try_from(("category", "=", "A")).unwrap();
+ let filter_lte_100 = Filter::try_from(("count", "<=", "100")).unwrap();
+
+ let pruner = PartitionPruner::new(
+ &[filter_gt_date, filter_eq_a, filter_lte_100],
+ &schema,
+ &configs,
+ )
+ .unwrap();
assert!(pruner.should_include("date=2023-02-01/category=A/count=10"));
assert!(pruner.should_include("date=2023-02-01/category=A/count=100"));
@@ -445,4 +335,69 @@ mod tests {
let result =
pruner.parse_segments("date=2023-02-01/category=A/non_exist_field=10");
assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
}
+
+ #[test]
+ fn test_partition_filter_try_from_valid() {
+ let schema = create_test_schema();
+ let filter = Filter {
+ field_name: "date".to_string(),
+ operator: ExprOperator::Eq,
+ field_value: "2023-01-01".to_string(),
+ };
+
+ let partition_filter = PartitionFilter::try_from((filter,
&schema)).unwrap();
+ assert_eq!(partition_filter.field.name(), "date");
+ assert_eq!(partition_filter.operator, ExprOperator::Eq);
+
+ let value_inner = partition_filter.value.into_inner();
+
+ let date_array =
value_inner.as_any().downcast_ref::<Date32Array>().unwrap();
+
+ let date_value = date_array.value_as_date(0).unwrap();
+ assert_eq!(date_value.to_string(), "2023-01-01");
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_invalid_field() {
+ let schema = create_test_schema();
+ let filter = Filter {
+ field_name: "invalid_field".to_string(),
+ operator: ExprOperator::Eq,
+ field_value: "2023-01-01".to_string(),
+ };
+ let result = PartitionFilter::try_from((filter, &schema));
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Partition path should be in schema."));
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_invalid_value() {
+ let schema = create_test_schema();
+ let filter = Filter {
+ field_name: "count".to_string(),
+ operator: ExprOperator::Eq,
+ field_value: "not_a_number".to_string(),
+ };
+ let result = PartitionFilter::try_from((filter, &schema));
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_all_operators() {
+ let schema = create_test_schema();
+ for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
+ let filter = Filter {
+ field_name: "count".to_string(),
+ operator: ExprOperator::from_str(op).unwrap(),
+ field_value: "5".to_string(),
+ };
+ let partition_filter = PartitionFilter::try_from((filter,
&schema));
+ let filter = partition_filter.unwrap();
+ assert_eq!(filter.field.name(), "count");
+ assert_eq!(filter.operator, ExprOperator::from_str(op).unwrap());
+ }
+ }
}
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 120aa8a..53bc2e4 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -30,6 +30,9 @@ repository.workspace = true
[dependencies]
hudi-core = { version = "0.3.0", path = "../core", features = ["datafusion"] }
# arrow
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-cast = { workspace = true }
arrow-schema = { workspace = true }
# datafusion
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 33f3987..a976a0f 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -17,6 +17,8 @@
* under the License.
*/
+pub(crate) mod util;
+
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
@@ -31,14 +33,16 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::TableProvider;
+use datafusion::logical_expr::Operator;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::DFSchema;
use datafusion_common::DataFusionError::Execution;
use datafusion_common::Result;
-use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_expr::{CreateExternalTable, Expr, TableProviderFilterPushDown,
TableType};
use datafusion_physical_expr::create_physical_expr;
+use crate::util::expr::exprs_to_filters;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_core::config::util::empty_options;
use hudi_core::storage::util::{get_scheme_authority, parse_uri};
@@ -54,7 +58,7 @@ use hudi_core::table::Table as HudiTable;
///
/// use datafusion::error::Result;
/// use datafusion::prelude::{DataFrame, SessionContext};
-/// use hudi::HudiDataSource;
+/// use hudi_datafusion::HudiDataSource;
///
/// // Initialize a new DataFusion session context
/// let ctx = SessionContext::new();
@@ -62,7 +66,7 @@ use hudi_core::table::Table as HudiTable;
/// // Create a new HudiDataSource with specific read options
/// let hudi = HudiDataSource::new_with_options(
/// "/tmp/trips_table",
-/// [("hoodie.read.as.of.timestamp", "20241122010827898")]).await?;
+/// [("hoodie.read.as.of.timestamp", "20241122010827898")]).await?;
///
/// // Register the Hudi table with the session context
/// ctx.register_table("trips_table", Arc::new(hudi))?;
@@ -98,6 +102,42 @@ impl HudiDataSource {
.get_or_default(InputPartitions)
.to::<usize>()
}
+
+ /// Check if the given expression can be pushed down to the Hudi table.
+ ///
+ /// The expression can be pushed down if it is a binary expression with a
supported operator and operands.
+ fn can_push_down(&self, expr: &Expr) -> bool {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => {
+ let left = &binary_expr.left;
+ let op = &binary_expr.op;
+ let right = &binary_expr.right;
+ self.is_supported_operator(op)
+ && self.is_supported_operand(left)
+ && self.is_supported_operand(right)
+ }
+ Expr::Not(inner_expr) => {
+ // Recursively check if the inner expression can be pushed down
+ self.can_push_down(inner_expr)
+ }
+ _ => false,
+ }
+ }
+
+ fn is_supported_operator(&self, op: &Operator) -> bool {
+ matches!(
+ op,
+ Operator::Eq | Operator::Gt | Operator::Lt | Operator::GtEq |
Operator::LtEq
+ )
+ }
+
+ fn is_supported_operand(&self, expr: &Expr) -> bool {
+ match expr {
+ Expr::Column(col) =>
self.schema().field_with_name(&col.name).is_ok(),
+ Expr::Literal(_) => true,
+ _ => false,
+ }
+ }
}
#[async_trait]
@@ -129,10 +169,11 @@ impl TableProvider for HudiDataSource {
) -> Result<Arc<dyn ExecutionPlan>> {
self.table.register_storage(state.runtime_env().clone());
+ // Convert Datafusion `Expr` to `Filter`
+ let pushdown_filters = exprs_to_filters(filters);
let file_slices = self
.table
- // TODO: implement supports_filters_pushdown() to pass filters to
Hudi table API
- .get_file_slices_splits(self.get_input_partitions(), &[])
+ .get_file_slices_splits(self.get_input_partitions(),
pushdown_filters.as_slice())
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
@@ -176,6 +217,22 @@ impl TableProvider for HudiDataSource {
Ok(exec_builder.build_arc())
}
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> Result<Vec<TableProviderFilterPushDown>> {
+ filters
+ .iter()
+ .map(|expr| {
+ if self.can_push_down(expr) {
+ Ok(TableProviderFilterPushDown::Inexact)
+ } else {
+ Ok(TableProviderFilterPushDown::Unsupported)
+ }
+ })
+ .collect()
+ }
}
/// `HudiTableFactory` is responsible for creating and configuring Hudi tables.
@@ -188,7 +245,8 @@ impl TableProvider for HudiDataSource {
/// Creating a new `HudiTableFactory` instance:
///
/// ```rust
-/// use hudi::HudiTableFactory;
+/// use datafusion::prelude::SessionContext;
+/// use hudi_datafusion::HudiTableFactory;
///
/// // Initialize a new HudiTableFactory
/// let factory = HudiTableFactory::new();
@@ -265,12 +323,13 @@ mod tests {
use super::*;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
- use datafusion_common::{DataFusionError, ScalarValue};
+ use datafusion_common::{Column, DataFusionError, ScalarValue};
use std::fs::canonicalize;
use std::path::Path;
use std::sync::Arc;
use url::Url;
+ use datafusion::logical_expr::BinaryExpr;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
@@ -479,4 +538,57 @@ mod tests {
verify_data_with_replacecommits(&ctx, &sql,
test_table.as_ref()).await
}
}
+
+ #[tokio::test]
+ async fn test_supports_filters_pushdown() {
+ let table_provider =
+ HudiDataSource::new_with_options(V6Nonpartitioned.path().as_str(),
empty_options())
+ .await
+ .unwrap();
+
+ let expr1 = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+ op: Operator::Eq,
+ right:
Box::new(Expr::Literal(ScalarValue::Utf8(Some("Alice".to_string())))),
+ });
+
+ let expr2 = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
+ op: Operator::Gt,
+ right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
+ });
+
+ let expr3 = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(Expr::Column(Column::from_name(
+ "nonexistent_column".to_string(),
+ ))),
+ op: Operator::Eq,
+ right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
+ });
+
+ let expr4 = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+ op: Operator::NotEq,
+ right:
Box::new(Expr::Literal(ScalarValue::Utf8(Some("Diana".to_string())))),
+ });
+
+ let expr5 = Expr::Literal(ScalarValue::Int32(Some(10)));
+
+ let expr6 = Expr::Not(Box::new(Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
+ op: Operator::Gt,
+ right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
+ })));
+
+ let filters = vec![&expr1, &expr2, &expr3, &expr4, &expr5, &expr6];
+ let result =
table_provider.supports_filters_pushdown(&filters).unwrap();
+
+ assert_eq!(result.len(), 6);
+ assert_eq!(result[0], TableProviderFilterPushDown::Inexact);
+ assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
+ assert_eq!(result[2], TableProviderFilterPushDown::Unsupported);
+ assert_eq!(result[3], TableProviderFilterPushDown::Unsupported);
+ assert_eq!(result[4], TableProviderFilterPushDown::Unsupported);
+ assert_eq!(result[5], TableProviderFilterPushDown::Inexact);
+ }
}
diff --git a/crates/datafusion/src/util/expr.rs
b/crates/datafusion/src/util/expr.rs
new file mode 100644
index 0000000..daa21b6
--- /dev/null
+++ b/crates/datafusion/src/util/expr.rs
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use datafusion::logical_expr::Operator;
+use datafusion_expr::{BinaryExpr, Expr};
+use hudi_core::expr::filter::Filter as HudiFilter;
+use hudi_core::expr::ExprOperator;
+
+/// Converts DataFusion expressions into Hudi filters.
+///
+/// Takes a slice of DataFusion [`Expr`] and attempts to convert each
expression
+/// into a [`HudiFilter`]. Only binary expressions and NOT expressions are
currently supported.
+///
+/// # Arguments
+/// * `exprs` - A slice of DataFusion expressions to convert
+///
+/// # Returns
+/// Returns `Some(Vec<HudiFilter>)` if at least one filter is successfully
converted,
+/// otherwise returns `None`.
+///
+/// TODO: Handle other DataFusion [`Expr`]
+pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<HudiFilter> {
+ let mut filters: Vec<HudiFilter> = Vec::new();
+
+ for expr in exprs {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => {
+ if let Some(filter) = binary_expr_to_filter(binary_expr) {
+ filters.push(filter);
+ }
+ }
+ Expr::Not(not_expr) => {
+ if let Some(filter) = not_expr_to_filter(not_expr) {
+ filters.push(filter);
+ }
+ }
+ _ => {}
+ }
+ }
+
+ filters
+}
+
+/// Converts a binary expression [`Expr::BinaryExpr`] into a [`HudiFilter`].
+fn binary_expr_to_filter(binary_expr: &BinaryExpr) -> Option<HudiFilter> {
+ // extract the column and literal from the binary expression
+ let (column, literal) = match (&*binary_expr.left, &*binary_expr.right) {
+ (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
+ (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
+ _ => return None,
+ };
+
+ let field_name = column.name().to_string();
+
+ let operator = match binary_expr.op {
+ Operator::Eq => ExprOperator::Eq,
+ Operator::NotEq => ExprOperator::Ne,
+ Operator::Lt => ExprOperator::Lt,
+ Operator::LtEq => ExprOperator::Lte,
+ Operator::Gt => ExprOperator::Gt,
+ Operator::GtEq => ExprOperator::Gte,
+ _ => return None,
+ };
+
+ let value = literal.to_string();
+
+ Some(HudiFilter {
+ field_name,
+ operator,
+ field_value: value,
+ })
+}
+
+/// Converts a NOT expression (`Expr::Not`) into a `PartitionFilter`.
+fn not_expr_to_filter(not_expr: &Expr) -> Option<HudiFilter> {
+ match not_expr {
+ Expr::BinaryExpr(ref binary_expr) => {
+ let mut filter = binary_expr_to_filter(binary_expr)?;
+ filter.operator = filter.operator.negate()?;
+ Some(filter)
+ }
+ _ => None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_schema::{DataType, Field, Schema};
+ use datafusion::logical_expr::{col, lit};
+ use datafusion_expr::{BinaryExpr, Expr};
+ use hudi_core::expr::ExprOperator;
+ use std::str::FromStr;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_convert_simple_binary_expr() {
+ let schema = Arc::new(Schema::new(vec![Field::new("col",
DataType::Int32, false)]));
+
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col")),
+ Operator::Eq,
+ Box::new(lit(42i32)),
+ ));
+
+ let filters = vec![expr];
+
+ let result = exprs_to_filters(&filters);
+
+ assert_eq!(result.len(), 1);
+
+ let expected_filter = HudiFilter {
+ field_name: schema.field(0).name().to_string(),
+ operator: ExprOperator::Eq,
+ field_value: "42".to_string(),
+ };
+
+ assert_eq!(result[0].field_name, expected_filter.field_name);
+ assert_eq!(result[0].operator, expected_filter.operator);
+ assert_eq!(*result[0].field_value.clone(),
expected_filter.field_value);
+ }
+
+ #[test]
+ fn test_convert_not_expr() {
+ let schema = Arc::new(Schema::new(vec![Field::new("col",
DataType::Int32, false)]));
+
+ let inner_expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col")),
+ Operator::Eq,
+ Box::new(lit(42i32)),
+ ));
+ let expr = Expr::Not(Box::new(inner_expr));
+
+ let filters = vec![expr];
+
+ let result = exprs_to_filters(&filters);
+
+ assert_eq!(result.len(), 1);
+
+ let expected_filter = HudiFilter {
+ field_name: schema.field(0).name().to_string(),
+ operator: ExprOperator::Ne,
+ field_value: "42".to_string(),
+ };
+
+ assert_eq!(result[0].field_name, expected_filter.field_name);
+ assert_eq!(result[0].operator, expected_filter.operator);
+ assert_eq!(*result[0].field_value.clone(),
expected_filter.field_value);
+ }
+
+ #[test]
+ fn test_convert_binary_expr_extensive() {
+ // list of test cases with different operators and data types
+ let test_cases = vec![
+ (
+ col("int32_col").eq(lit(42i32)),
+ Some(HudiFilter {
+ field_name: String::from("int32_col"),
+ operator: ExprOperator::Eq,
+ field_value: String::from("42"),
+ }),
+ ),
+ (
+ col("int64_col").gt_eq(lit(100i64)),
+ Some(HudiFilter {
+ field_name: String::from("int64_col"),
+ operator: ExprOperator::Gte,
+ field_value: String::from("100"),
+ }),
+ ),
+ (
+ col("float64_col").lt(lit(32.666)),
+ Some(HudiFilter {
+ field_name: String::from("float64_col"),
+ operator: ExprOperator::Lt,
+ field_value: "32.666".to_string(),
+ }),
+ ),
+ (
+ col("string_col").not_eq(lit("test")),
+ Some(HudiFilter {
+ field_name: String::from("string_col"),
+ operator: ExprOperator::Ne,
+ field_value: String::from("test"),
+ }),
+ ),
+ ];
+
+ let filters: Vec<Expr> = test_cases.iter().map(|(expr, _)|
expr.clone()).collect();
+ let result = exprs_to_filters(&filters);
+ let expected_filters: Vec<&HudiFilter> = test_cases
+ .iter()
+ .filter_map(|(_, opt_filter)| opt_filter.as_ref())
+ .collect();
+
+ assert_eq!(result.len(), expected_filters.len());
+
+ for (result, expected_filter) in
result.iter().zip(expected_filters.iter()) {
+ assert_eq!(result.field_name, expected_filter.field_name);
+ assert_eq!(result.operator, expected_filter.operator);
+ assert_eq!(*result.field_value.clone(),
expected_filter.field_value);
+ }
+ }
+
+ // Tests conversion with different operators (e.g., <, <=, >, >=)
+ #[test]
+ fn test_convert_various_operators() {
+ let schema = Arc::new(Schema::new(vec![Field::new("col",
DataType::Int32, false)]));
+
+ let operators = vec![
+ (Operator::Lt, ExprOperator::Lt),
+ (Operator::LtEq, ExprOperator::Lte),
+ (Operator::Gt, ExprOperator::Gt),
+ (Operator::GtEq, ExprOperator::Gte),
+ ];
+
+ for (op, expected_op) in operators {
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col")),
+ op,
+ Box::new(lit(42i32)),
+ ));
+
+ let filters = vec![expr];
+
+ let result = exprs_to_filters(&filters);
+
+ assert_eq!(result.len(), 1);
+
+ let expected_filter = HudiFilter {
+ field_name: schema.field(0).name().to_string(),
+ operator: expected_op,
+ field_value: String::from("42"),
+ };
+
+ assert_eq!(result[0].field_name, expected_filter.field_name);
+ assert_eq!(result[0].operator, expected_filter.operator);
+ assert_eq!(*result[0].field_value.clone(),
expected_filter.field_value);
+ }
+ }
+
+ #[test]
+ fn test_convert_expr_with_unsupported_operator() {
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col")),
+ Operator::And,
+ Box::new(lit("value")),
+ ));
+
+ let filters = vec![expr];
+ let result = exprs_to_filters(&filters);
+ assert!(result.is_empty());
+ }
+
+ #[test]
+ fn test_negate_operator_for_all_ops() {
+ for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
+ if let Some(negated_op) =
ExprOperator::from_str(op).unwrap().negate() {
+ let double_negated_op = negated_op
+ .negate()
+ .expect("Negation should be defined for all operators");
+
+ assert_eq!(double_negated_op,
ExprOperator::from_str(op).unwrap());
+ }
+ }
+ }
+}
diff --git a/crates/core/src/lib.rs b/crates/datafusion/src/util/mod.rs
similarity index 50%
copy from crates/core/src/lib.rs
copy to crates/datafusion/src/util/mod.rs
index 42079a1..ff2dcf3 100644
--- a/crates/core/src/lib.rs
+++ b/crates/datafusion/src/util/mod.rs
@@ -16,38 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-//! Crate `hudi-core`.
-//!
-//! # The [config] module is responsible for managing configurations.
-//!
-//! **Example**
-//!
-//! ```rust
-//! use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp,
InputPartitions};
-//! use hudi_core::table::Table as HudiTable;
-//!
-//! let options = [(InputPartitions, "2"), (AsOfTimestamp,
"20240101010100000")];
-//! HudiTable::new_with_options("/tmp/hudi_data", options);
-//! ```
-//!
-//! # The [table] module is responsible for managing Hudi tables.
-//!
-//! **Example**
-//!
-//! create hudi table
-//! ```rust
-//! use hudi_core::table::Table;
-//!
-//! pub async fn test() {
-//! let hudi_table = Table::new("/tmp/hudi_data").await.unwrap();
-//! }
-//! ```
-pub mod config;
-pub mod error;
-pub mod file_group;
-pub mod storage;
-pub mod table;
-pub mod util;
-
-use error::Result;
+pub mod expr;
diff --git a/python/src/internal.rs b/python/src/internal.rs
index a6a3552..4a76f7d 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -23,19 +23,17 @@ use std::path::PathBuf;
use std::sync::OnceLock;
use arrow::pyarrow::ToPyArrow;
-use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
use tokio::runtime::Runtime;
use hudi::error::CoreError;
+use hudi::expr::filter::Filter;
use hudi::file_group::reader::FileGroupReader;
use hudi::file_group::FileSlice;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
-use hudi::util::convert_vec_to_slice;
-use hudi::util::vec_to_slice;
-use pyo3::create_exception;
-use pyo3::exceptions::PyException;
+use pyo3::exceptions::{PyException, PyValueError};
+use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject,
PyResult, Python};
create_exception!(_internal, HudiCoreError, PyException);
@@ -197,12 +195,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
+ let filters = convert_filters(filters)?;
+
py.allow_threads(|| {
let file_slices = rt()
- .block_on(
- self.inner
- .get_file_slices_splits(n,
vec_to_slice!(filters.unwrap_or_default())),
- )
+ .block_on(self.inner.get_file_slices_splits(n, &filters))
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
@@ -217,12 +214,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
+ let filters = convert_filters(filters)?;
+
py.allow_threads(|| {
let file_slices = rt()
- .block_on(
- self.inner
-
.get_file_slices(vec_to_slice!(filters.unwrap_or_default())),
- )
+ .block_on(self.inner.get_file_slices(&filters))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
@@ -239,15 +235,29 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
- rt().block_on(
- self.inner
- .read_snapshot(vec_to_slice!(filters.unwrap_or_default())),
- )
- .map_err(PythonError::from)?
- .to_pyarrow(py)
+ let filters = convert_filters(filters)?;
+
+ rt().block_on(self.inner.read_snapshot(&filters))
+ .map_err(PythonError::from)?
+ .to_pyarrow(py)
}
}
+fn convert_filters(filters: Option<Vec<(String, String, String)>>) ->
PyResult<Vec<Filter>> {
+ filters
+ .unwrap_or_default()
+ .into_iter()
+ .map(|(field, op, value)| {
+ Filter::try_from((field.as_str(), op.as_str(),
value.as_str())).map_err(|e| {
+ PyValueError::new_err(format!(
+ "Invalid filter ({}, {}, {}): {}",
+ field, op, value, e
+ ))
+ })
+ })
+ .collect()
+}
+
#[cfg(not(tarpaulin))]
#[pyfunction]
#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None,
options=None))]
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index baebdff..f8986a8 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -193,3 +193,23 @@ def test_read_table_as_of_timestamp(get_sample_table):
"fare": 34.15,
},
]
+
+
+def test_convert_filters_valid(get_sample_table):
+ table_path = get_sample_table
+ table = HudiTable(table_path)
+
+ filters = [
+ ("city", "=", "san_francisco"),
+ ("city", ">", "san_francisco"),
+ ("city", "<", "san_francisco"),
+ ("city", "<=", "san_francisco"),
+ ("city", ">=", "san_francisco"),
+ ]
+
+ result = [3, 1, 1, 4, 4]
+
+ for i in range(len(filters)):
+ filter_list = [filters[i]]
+ file_slices = table.get_file_slices(filters=filter_list)
+ assert len(file_slices) == result[i]