This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d257a21  refactor: remove use of `Filter` from public APIs (#266)
d257a21 is described below

commit d257a2172c0855edfd47dd79d2c47d2b37931d5e
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 14:20:20 2025 -0600

    refactor: remove use of `Filter` from public APIs (#266)
    
    Replace `Filter` from being used as public API args. Use tuple of str 
instead for simpler invocation.
---
 crates/core/src/expr/filter.rs       |  14 ++++
 crates/core/src/file_group/reader.rs |   4 +-
 crates/core/src/table/mod.rs         | 138 ++++++++++++++++++++---------------
 crates/core/src/util/mod.rs          |  30 ++++----
 crates/datafusion/src/lib.rs         |   3 +-
 crates/datafusion/src/util/expr.rs   |  50 +++++++++----
 python/src/internal.rs               |  31 ++------
 7 files changed, 154 insertions(+), 116 deletions(-)

diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 685aec4..847a2f5 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -42,6 +42,16 @@ impl Filter {
     }
 }
 
+impl From<Filter> for (String, String, String) {
+    fn from(filter: Filter) -> Self {
+        (
+            filter.field_name,
+            filter.operator.to_string(),
+            filter.field_value,
+        )
+    }
+}
+
 impl TryFrom<(&str, &str, &str)> for Filter {
     type Error = CoreError;
 
@@ -62,6 +72,10 @@ impl TryFrom<(&str, &str, &str)> for Filter {
     }
 }
 
+pub fn from_str_tuples(tuples: &[(&str, &str, &str)]) -> Result<Vec<Filter>> {
+    tuples.iter().map(|t| Filter::try_from(*t)).collect()
+}
+
 pub struct FilterField {
     pub name: String,
 }
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 6b875d2..2a0610e 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -44,7 +44,7 @@ pub struct FileGroupReader {
 }
 
 impl FileGroupReader {
-    pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
+    pub(crate) fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) 
-> Self {
         Self {
             storage,
             hudi_configs,
@@ -52,7 +52,7 @@ impl FileGroupReader {
         }
     }
 
-    pub fn new_with_filters(
+    pub(crate) fn new_with_filters(
         storage: Arc<Storage>,
         hudi_configs: Arc<HudiConfigs>,
         and_filters: &[Filter],
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 5154a04..0a163b8 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -94,7 +94,7 @@ use crate::config::read::HudiReadConfig::{AsOfTimestamp, 
UseReadOptimizedMode};
 use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
-use crate::expr::filter::{Filter, FilterField};
+use crate::expr::filter::{from_str_tuples, Filter};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
 use crate::table::builder::TableBuilder;
@@ -140,7 +140,6 @@ impl Table {
             .await
     }
 
-    #[inline]
     pub fn base_url(&self) -> Url {
         let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::BasePath);
         self.hudi_configs
@@ -150,7 +149,6 @@ impl Table {
             .expect(&err_msg)
     }
 
-    #[inline]
     pub fn table_type(&self) -> TableTypeValue {
         let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::TableType);
         let table_type = self
@@ -161,7 +159,6 @@ impl Table {
         TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
     }
 
-    #[inline]
     pub fn timezone(&self) -> String {
         self.hudi_configs
             .get_or_default(HudiTableConfig::TimelineTimezone)
@@ -223,7 +220,7 @@ impl Table {
     pub async fn get_file_slices_splits(
         &self,
         n: usize,
-        filters: &[Filter],
+        filters: &[(&str, &str, &str)],
     ) -> Result<Vec<Vec<FileSlice>>> {
         let file_slices = self.get_file_slices(filters).await?;
         if file_slices.is_empty() {
@@ -242,19 +239,29 @@ impl Table {
     /// Get all the [FileSlice]s in the table.
     ///
     /// If the [AsOfTimestamp] configuration is set, the file slices at the 
specified timestamp will be returned.
-    pub async fn get_file_slices(&self, filters: &[Filter]) -> 
Result<Vec<FileSlice>> {
+    pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<FileSlice>> {
+        let filters = from_str_tuples(filters)?;
         if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
-            self.get_file_slices_as_of(timestamp.to::<String>().as_str(), 
filters)
+            self.get_file_slices_internal(timestamp.to::<String>().as_str(), 
&filters)
                 .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
-            self.get_file_slices_as_of(timestamp, filters).await
+            self.get_file_slices_internal(timestamp, &filters).await
         } else {
             Ok(Vec::new())
         }
     }
 
     /// Get all the [FileSlice]s at a given timestamp, as a time travel query.
-    async fn get_file_slices_as_of(
+    pub async fn get_file_slices_as_of(
+        &self,
+        timestamp: &str,
+        filters: &[(&str, &str, &str)],
+    ) -> Result<Vec<FileSlice>> {
+        let filters = from_str_tuples(filters)?;
+        self.get_file_slices_internal(timestamp, &filters).await
+    }
+
+    async fn get_file_slices_internal(
         &self,
         timestamp: &str,
         filters: &[Filter],
@@ -277,13 +284,14 @@ impl Table {
 
     pub fn create_file_group_reader_with_filters(
         &self,
-        filters: &[Filter],
+        filters: &[(&str, &str, &str)],
         schema: &Schema,
     ) -> Result<FileGroupReader> {
+        let filters = from_str_tuples(filters)?;
         FileGroupReader::new_with_filters(
             self.file_system_view.storage.clone(),
             self.hudi_configs.clone(),
-            filters,
+            &filters,
             schema,
         )
     }
@@ -291,21 +299,22 @@ impl Table {
     /// Get all the latest records in the table.
     ///
     /// If the [AsOfTimestamp] configuration is set, the records at the 
specified timestamp will be returned.
-    pub async fn read_snapshot(&self, filters: &[Filter]) -> 
Result<Vec<RecordBatch>> {
+    pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<RecordBatch>> {
+        let filters = from_str_tuples(filters)?;
         let read_optimized_mode = self
             .hudi_configs
             .get_or_default(UseReadOptimizedMode)
             .to::<bool>();
 
         if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
-            self.read_snapshot_as_of(
+            self.read_snapshot_internal(
                 timestamp.to::<String>().as_str(),
-                filters,
+                &filters,
                 read_optimized_mode,
             )
             .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
-            self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)
+            self.read_snapshot_internal(timestamp, &filters, 
read_optimized_mode)
                 .await
         } else {
             Ok(Vec::new())
@@ -313,13 +322,27 @@ impl Table {
     }
 
     /// Get all the records in the table at a given timestamp, as a time 
travel query.
-    async fn read_snapshot_as_of(
+    pub async fn read_snapshot_as_of(
+        &self,
+        timestamp: &str,
+        filters: &[(&str, &str, &str)],
+    ) -> Result<Vec<RecordBatch>> {
+        let filters = from_str_tuples(filters)?;
+        let read_optimized_mode = self
+            .hudi_configs
+            .get_or_default(UseReadOptimizedMode)
+            .to::<bool>();
+        self.read_snapshot_internal(timestamp, &filters, read_optimized_mode)
+            .await
+    }
+
+    async fn read_snapshot_internal(
         &self,
         timestamp: &str,
         filters: &[Filter],
         read_optimized_mode: bool,
     ) -> Result<Vec<RecordBatch>> {
-        let file_slices = self.get_file_slices_as_of(timestamp, 
filters).await?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
filters).await?;
         let fg_reader = self.create_file_group_reader();
         let base_file_only =
             read_optimized_mode || self.table_type() == 
TableTypeValue::CopyOnWrite;
@@ -334,7 +357,9 @@ impl Table {
         Ok(batches)
     }
 
-    /// Get records that were inserted or updated between the given 
timestamps. Records that were updated multiple times should have their latest 
states within the time span being returned.
+    /// Get records that were inserted or updated between the given timestamps.
+    /// Records that were updated multiple times should have their latest 
states within
+    /// the time span being returned.
     pub async fn read_incremental_records(
         &self,
         start_timestamp: &str,
@@ -361,11 +386,11 @@ impl Table {
 
         // Read incremental records from the file slices.
         let filters = &[
-            
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
-            
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
+            (MetaField::CommitTime.as_ref(), ">", start_timestamp),
+            (MetaField::CommitTime.as_ref(), "<=", end_timestamp),
         ];
-        let fg_reader =
-            self.create_file_group_reader_with_filters(filters, 
MetaField::schema().as_ref())?;
+        let schema = MetaField::schema();
+        let fg_reader = self.create_file_group_reader_with_filters(filters, 
&schema)?;
 
         // Read-optimized mode does not apply to incremental query semantics.
         let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
@@ -381,7 +406,9 @@ impl Table {
         Ok(batches)
     }
 
-    /// Get the change-data-capture (CDC) records between the given 
timestamps. The CDC records should reflect the records that were inserted, 
updated, and deleted between the timestamps.
+    /// Get the change-data-capture (CDC) records between the given timestamps.
+    /// The CDC records should reflect the records that were inserted, 
updated, and deleted
+    /// between the timestamps.
     #[allow(dead_code)]
     async fn read_incremental_changes(
         &self,
@@ -405,7 +432,6 @@ mod tests {
     use crate::config::HUDI_CONF_DIR;
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
-    use crate::table::Filter;
     use hudi_test::SampleTable;
     use std::collections::HashSet;
     use std::fs::canonicalize;
@@ -431,7 +457,10 @@ mod tests {
     }
 
     /// Test helper to get relative file paths from the table with filters.
-    async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> 
Result<Vec<String>> {
+    async fn get_file_paths_with_filters(
+        table: &Table,
+        filters: &[(&str, &str, &str)],
+    ) -> Result<Vec<String>> {
         let mut file_paths = Vec::new();
         let base_url = table.base_url();
         for f in table.get_file_slices(filters).await? {
@@ -783,11 +812,11 @@ mod tests {
         );
 
         // as of the latest timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
-        let hudi_table = Table::new_with_options(base_url.path(), opts)
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices = hudi_table
+            .get_file_slices_as_of("20240418173551906", &[])
             .await
             .unwrap();
-        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -847,11 +876,8 @@ mod tests {
         .collect::<HashSet<_>>();
         assert_eq!(actual, expected);
 
-        let filter_ge_10 = Filter::try_from(("byteField", ">=", 
"10")).unwrap();
-
-        let filter_lt_30 = Filter::try_from(("byteField", "<", "30")).unwrap();
-
-        let actual = get_file_paths_with_filters(&hudi_table, &[filter_ge_10, 
filter_lt_30])
+        let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
+        let actual = get_file_paths_with_filters(&hudi_table, &filters)
             .await
             .unwrap()
             .into_iter()
@@ -865,8 +891,7 @@ mod tests {
         .collect::<HashSet<_>>();
         assert_eq!(actual, expected);
 
-        let filter_gt_30 = Filter::try_from(("byteField", ">", "30")).unwrap();
-        let actual = get_file_paths_with_filters(&hudi_table, &[filter_gt_30])
+        let actual = get_file_paths_with_filters(&hudi_table, &[("byteField", 
">", "30")])
             .await
             .unwrap()
             .into_iter()
@@ -897,16 +922,16 @@ mod tests {
             .collect::<HashSet<_>>();
         assert_eq!(actual, expected);
 
-        let filter_gte_10 = Filter::try_from(("byteField", ">=", 
"10")).unwrap();
-        let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
-        let filter_ne_100 = Filter::try_from(("shortField", "!=", 
"100")).unwrap();
-
-        let actual =
-            get_file_paths_with_filters(&hudi_table, &[filter_gte_10, 
filter_lt_20, filter_ne_100])
-                .await
-                .unwrap()
-                .into_iter()
-                .collect::<HashSet<_>>();
+        let filters = [
+            ("byteField", ">=", "10"),
+            ("byteField", "<", "20"),
+            ("shortField", "!=", "100"),
+        ];
+        let actual = get_file_paths_with_filters(&hudi_table, &filters)
+            .await
+            .unwrap()
+            .into_iter()
+            .collect::<HashSet<_>>();
         let expected = [
             
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
         ]
@@ -914,10 +939,9 @@ mod tests {
             .into_iter()
             .collect::<HashSet<_>>();
         assert_eq!(actual, expected);
-        let filter_lt_20 = Filter::try_from(("byteField", ">", "20")).unwrap();
-        let filter_eq_300 = Filter::try_from(("shortField", "=", 
"300")).unwrap();
 
-        let actual = get_file_paths_with_filters(&hudi_table, &[filter_lt_20, 
filter_eq_300])
+        let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
+        let actual = get_file_paths_with_filters(&hudi_table, &filters)
             .await
             .unwrap()
             .into_iter()
@@ -966,7 +990,9 @@ mod tests {
         #[tokio::test]
         async fn test_non_partitioned_read_optimized() -> Result<()> {
             let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
-            let hudi_table = Table::new(base_url.path()).await?;
+            let hudi_table =
+                Table::new_with_options(base_url.path(), 
[(UseReadOptimizedMode.as_ref(), "true")])
+                    .await?;
             let commit_timestamps = hudi_table
                 .timeline
                 .completed_commits
@@ -974,9 +1000,7 @@ mod tests {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let latest_commit = commit_timestamps.last().unwrap();
-            let records = hudi_table
-                .read_snapshot_as_of(latest_commit, &[], true)
-                .await?;
+            let records = hudi_table.read_snapshot_as_of(latest_commit, 
&[]).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -1019,9 +1043,9 @@ mod tests {
                 let hudi_table = Table::new(base_url.path()).await?;
 
                 let filters = &[
-                    Filter::try_from(("byteField", ">=", "10"))?,
-                    Filter::try_from(("byteField", "<", "20"))?,
-                    Filter::try_from(("shortField", "!=", "100"))?,
+                    ("byteField", ">=", "10"),
+                    ("byteField", "<", "20"),
+                    ("shortField", "!=", "100"),
                 ];
                 let records = hudi_table.read_snapshot(filters).await?;
                 let schema = &records[0].schema();
@@ -1044,9 +1068,7 @@ mod tests {
                     .map(|i| i.timestamp.as_str())
                     .collect::<Vec<_>>();
                 let first_commit = commit_timestamps[0];
-                let records = hudi_table
-                    .read_snapshot_as_of(first_commit, &[], false)
-                    .await?;
+                let records = hudi_table.read_snapshot_as_of(first_commit, 
&[]).await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 84fa313..21306c9 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -18,19 +18,17 @@
  */
 pub mod arrow;
 
-pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str, 
&str, &str)> {
-    vec.iter()
-        .map(|(a, b, c)| (a.as_str(), b.as_str(), c.as_str()))
-        .collect()
+pub trait StrTupleRef {
+    fn as_strs(&self) -> Vec<(&str, &str, &str)>;
 }
 
-#[macro_export]
-macro_rules! vec_to_slice {
-    ($vec:expr) => {
-        &convert_vec_to_slice(&$vec)[..]
-    };
+impl StrTupleRef for Vec<(String, String, String)> {
+    fn as_strs(&self) -> Vec<(&str, &str, &str)> {
+        self.iter()
+            .map(|(s1, s2, s3)| (s1.as_str(), s2.as_str(), s3.as_str()))
+            .collect()
+    }
 }
-pub use vec_to_slice;
 
 #[cfg(test)]
 mod tests {
@@ -50,11 +48,11 @@ mod tests {
                 String::from("baz"),
             ),
         ];
-
-        let expected_slice = vec![("date", "=", "2022-01-02"), ("foo", "bar", 
"baz")];
-
-        let result = vec_to_slice!(&vec_of_strings);
-
-        assert_eq!(result, expected_slice);
+        let binding = vec_of_strings.as_strs();
+        let str_slice = &binding[..];
+        assert_eq!(
+            str_slice,
+            [("date", "=", "2022-01-02"), ("foo", "bar", "baz")]
+        );
     }
 }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 07ab3f3..50a7a05 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -47,6 +47,7 @@ use hudi_core::config::read::HudiReadConfig::InputPartitions;
 use hudi_core::config::util::empty_options;
 use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
 use hudi_core::table::Table as HudiTable;
+use hudi_core::util::StrTupleRef;
 
 /// Create a `HudiDataSource`.
 /// Used for Datafusion to query Hudi tables
@@ -178,7 +179,7 @@ impl TableProvider for HudiDataSource {
         let pushdown_filters = exprs_to_filters(filters);
         let file_slices = self
             .table
-            .get_file_slices_splits(self.get_input_partitions(), 
pushdown_filters.as_slice())
+            .get_file_slices_splits(self.get_input_partitions(), 
&pushdown_filters.as_strs())
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
         let base_url = self.table.base_url();
diff --git a/crates/datafusion/src/util/expr.rs 
b/crates/datafusion/src/util/expr.rs
index 3a8c94b..7eb0eed 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -34,7 +34,7 @@ use hudi_core::expr::filter::{col, Filter as HudiFilter};
 /// otherwise returns `None`.
 ///
 /// TODO: Handle other DataFusion [`Expr`]
-pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<HudiFilter> {
+pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> {
     exprs
         .iter()
         .filter_map(|expr| match expr {
@@ -42,6 +42,7 @@ pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<HudiFilter> {
             Expr::Not(not_expr) => not_expr_to_filter(not_expr),
             _ => None,
         })
+        .map(|filter| filter.into())
         .collect()
 }
 
@@ -111,10 +112,14 @@ mod tests {
             operator: ExprOperator::Eq,
             field_value: "42".to_string(),
         };
-
-        assert_eq!(result[0].field_name, expected_filter.field_name);
-        assert_eq!(result[0].operator, expected_filter.operator);
-        assert_eq!(*result[0].field_value.clone(), 
expected_filter.field_value);
+        assert_eq!(
+            result[0],
+            (
+                expected_filter.field_name,
+                expected_filter.operator.to_string(),
+                expected_filter.field_value
+            )
+        );
     }
 
     #[test]
@@ -139,10 +144,14 @@ mod tests {
             operator: ExprOperator::Ne,
             field_value: "42".to_string(),
         };
-
-        assert_eq!(result[0].field_name, expected_filter.field_name);
-        assert_eq!(result[0].operator, expected_filter.operator);
-        assert_eq!(*result[0].field_value.clone(), 
expected_filter.field_value);
+        assert_eq!(
+            result[0],
+            (
+                expected_filter.field_name,
+                expected_filter.operator.to_string(),
+                expected_filter.field_value
+            )
+        );
     }
 
     #[test]
@@ -193,9 +202,14 @@ mod tests {
         assert_eq!(result.len(), expected_filters.len());
 
         for (result, expected_filter) in 
result.iter().zip(expected_filters.iter()) {
-            assert_eq!(result.field_name, expected_filter.field_name);
-            assert_eq!(result.operator, expected_filter.operator);
-            assert_eq!(*result.field_value.clone(), 
expected_filter.field_value);
+            assert_eq!(
+                result,
+                &(
+                    expected_filter.field_name.clone(),
+                    expected_filter.operator.to_string(),
+                    expected_filter.field_value.clone()
+                )
+            );
         }
     }
 
@@ -229,10 +243,14 @@ mod tests {
                 operator: expected_op,
                 field_value: String::from("42"),
             };
-
-            assert_eq!(result[0].field_name, expected_filter.field_name);
-            assert_eq!(result[0].operator, expected_filter.operator);
-            assert_eq!(*result[0].field_value.clone(), 
expected_filter.field_value);
+            assert_eq!(
+                result[0],
+                (
+                    expected_filter.field_name,
+                    expected_filter.operator.to_string(),
+                    expected_filter.field_value
+                )
+            );
         }
     }
 
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 9d300fd..1bfb953 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -26,13 +26,13 @@ use arrow::pyarrow::ToPyArrow;
 use tokio::runtime::Runtime;
 
 use hudi::error::CoreError;
-use hudi::expr::filter::Filter;
 use hudi::file_group::file_slice::FileSlice;
 use hudi::file_group::reader::FileGroupReader;
 use hudi::storage::error::StorageError;
 use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
-use pyo3::exceptions::{PyException, PyValueError};
+use hudi::util::StrTupleRef;
+use pyo3::exceptions::PyException;
 use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, 
PyResult, Python};
 
 create_exception!(_internal, HudiCoreError, PyException);
@@ -195,11 +195,11 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
-        let filters = convert_filters(filters)?;
+        let filters = filters.unwrap_or_default();
 
         py.allow_threads(|| {
             let file_slices = rt()
-                .block_on(self.inner.get_file_slices_splits(n, &filters))
+                .block_on(self.inner.get_file_slices_splits(n, 
&filters.as_strs()))
                 .map_err(PythonError::from)?;
             Ok(file_slices
                 .iter()
@@ -214,11 +214,11 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Vec<HudiFileSlice>> {
-        let filters = convert_filters(filters)?;
+        let filters = filters.unwrap_or_default();
 
         py.allow_threads(|| {
             let file_slices = rt()
-                .block_on(self.inner.get_file_slices(&filters))
+                .block_on(self.inner.get_file_slices(&filters.as_strs()))
                 .map_err(PythonError::from)?;
             Ok(file_slices.iter().map(convert_file_slice).collect())
         })
@@ -235,9 +235,9 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<PyObject> {
-        let filters = convert_filters(filters)?;
+        let filters = filters.unwrap_or_default();
 
-        rt().block_on(self.inner.read_snapshot(&filters))
+        rt().block_on(self.inner.read_snapshot(&filters.as_strs()))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
@@ -258,21 +258,6 @@ impl HudiTable {
     }
 }
 
-fn convert_filters(filters: Option<Vec<(String, String, String)>>) -> 
PyResult<Vec<Filter>> {
-    filters
-        .unwrap_or_default()
-        .into_iter()
-        .map(|(field, op, value)| {
-            Filter::try_from((field.as_str(), op.as_str(), 
value.as_str())).map_err(|e| {
-                PyValueError::new_err(format!(
-                    "Invalid filter ({}, {}, {}): {}",
-                    field, op, value, e
-                ))
-            })
-        })
-        .collect()
-}
-
 #[cfg(not(tarpaulin))]
 #[pyfunction]
 #[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, 
options=None))]

Reply via email to