This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d257a21 refactor: remove use of `Filter` from public APIs (#266)
d257a21 is described below
commit d257a2172c0855edfd47dd79d2c47d2b37931d5e
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 14:20:20 2025 -0600
refactor: remove use of `Filter` from public APIs (#266)
Replace `Filter` from being used as public API args. Use tuple of str
instead for simpler invocation.
---
crates/core/src/expr/filter.rs | 14 ++++
crates/core/src/file_group/reader.rs | 4 +-
crates/core/src/table/mod.rs | 138 ++++++++++++++++++++---------------
crates/core/src/util/mod.rs | 30 ++++----
crates/datafusion/src/lib.rs | 3 +-
crates/datafusion/src/util/expr.rs | 50 +++++++++----
python/src/internal.rs | 31 ++------
7 files changed, 154 insertions(+), 116 deletions(-)
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 685aec4..847a2f5 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -42,6 +42,16 @@ impl Filter {
}
}
+impl From<Filter> for (String, String, String) {
+ fn from(filter: Filter) -> Self {
+ (
+ filter.field_name,
+ filter.operator.to_string(),
+ filter.field_value,
+ )
+ }
+}
+
impl TryFrom<(&str, &str, &str)> for Filter {
type Error = CoreError;
@@ -62,6 +72,10 @@ impl TryFrom<(&str, &str, &str)> for Filter {
}
}
+pub fn from_str_tuples(tuples: &[(&str, &str, &str)]) -> Result<Vec<Filter>> {
+ tuples.iter().map(|t| Filter::try_from(*t)).collect()
+}
+
pub struct FilterField {
pub name: String,
}
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 6b875d2..2a0610e 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -44,7 +44,7 @@ pub struct FileGroupReader {
}
impl FileGroupReader {
- pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
+ pub(crate) fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>)
-> Self {
Self {
storage,
hudi_configs,
@@ -52,7 +52,7 @@ impl FileGroupReader {
}
}
- pub fn new_with_filters(
+ pub(crate) fn new_with_filters(
storage: Arc<Storage>,
hudi_configs: Arc<HudiConfigs>,
and_filters: &[Filter],
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 5154a04..0a163b8 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -94,7 +94,7 @@ use crate::config::read::HudiReadConfig::{AsOfTimestamp,
UseReadOptimizedMode};
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
-use crate::expr::filter::{Filter, FilterField};
+use crate::expr::filter::{from_str_tuples, Filter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
use crate::table::builder::TableBuilder;
@@ -140,7 +140,6 @@ impl Table {
.await
}
- #[inline]
pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::BasePath);
self.hudi_configs
@@ -150,7 +149,6 @@ impl Table {
.expect(&err_msg)
}
- #[inline]
pub fn table_type(&self) -> TableTypeValue {
let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::TableType);
let table_type = self
@@ -161,7 +159,6 @@ impl Table {
TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}
- #[inline]
pub fn timezone(&self) -> String {
self.hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
@@ -223,7 +220,7 @@ impl Table {
pub async fn get_file_slices_splits(
&self,
n: usize,
- filters: &[Filter],
+ filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices(filters).await?;
if file_slices.is_empty() {
@@ -242,19 +239,29 @@ impl Table {
/// Get all the [FileSlice]s in the table.
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
- pub async fn get_file_slices(&self, filters: &[Filter]) ->
Result<Vec<FileSlice>> {
+ pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<FileSlice>> {
+ let filters = from_str_tuples(filters)?;
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
- self.get_file_slices_as_of(timestamp.to::<String>().as_str(),
filters)
+ self.get_file_slices_internal(timestamp.to::<String>().as_str(),
&filters)
.await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
- self.get_file_slices_as_of(timestamp, filters).await
+ self.get_file_slices_internal(timestamp, &filters).await
} else {
Ok(Vec::new())
}
}
/// Get all the [FileSlice]s at a given timestamp, as a time travel query.
- async fn get_file_slices_as_of(
+ pub async fn get_file_slices_as_of(
+ &self,
+ timestamp: &str,
+ filters: &[(&str, &str, &str)],
+ ) -> Result<Vec<FileSlice>> {
+ let filters = from_str_tuples(filters)?;
+ self.get_file_slices_internal(timestamp, &filters).await
+ }
+
+ async fn get_file_slices_internal(
&self,
timestamp: &str,
filters: &[Filter],
@@ -277,13 +284,14 @@ impl Table {
pub fn create_file_group_reader_with_filters(
&self,
- filters: &[Filter],
+ filters: &[(&str, &str, &str)],
schema: &Schema,
) -> Result<FileGroupReader> {
+ let filters = from_str_tuples(filters)?;
FileGroupReader::new_with_filters(
self.file_system_view.storage.clone(),
self.hudi_configs.clone(),
- filters,
+ &filters,
schema,
)
}
@@ -291,21 +299,22 @@ impl Table {
/// Get all the latest records in the table.
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
- pub async fn read_snapshot(&self, filters: &[Filter]) ->
Result<Vec<RecordBatch>> {
+ pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
+ let filters = from_str_tuples(filters)?;
let read_optimized_mode = self
.hudi_configs
.get_or_default(UseReadOptimizedMode)
.to::<bool>();
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
- self.read_snapshot_as_of(
+ self.read_snapshot_internal(
timestamp.to::<String>().as_str(),
- filters,
+ &filters,
read_optimized_mode,
)
.await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
- self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)
+ self.read_snapshot_internal(timestamp, &filters,
read_optimized_mode)
.await
} else {
Ok(Vec::new())
@@ -313,13 +322,27 @@ impl Table {
}
/// Get all the records in the table at a given timestamp, as a time
travel query.
- async fn read_snapshot_as_of(
+ pub async fn read_snapshot_as_of(
+ &self,
+ timestamp: &str,
+ filters: &[(&str, &str, &str)],
+ ) -> Result<Vec<RecordBatch>> {
+ let filters = from_str_tuples(filters)?;
+ let read_optimized_mode = self
+ .hudi_configs
+ .get_or_default(UseReadOptimizedMode)
+ .to::<bool>();
+ self.read_snapshot_internal(timestamp, &filters, read_optimized_mode)
+ .await
+ }
+
+ async fn read_snapshot_internal(
&self,
timestamp: &str,
filters: &[Filter],
read_optimized_mode: bool,
) -> Result<Vec<RecordBatch>> {
- let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
+ let file_slices = self.get_file_slices_internal(timestamp,
filters).await?;
let fg_reader = self.create_file_group_reader();
let base_file_only =
read_optimized_mode || self.table_type() ==
TableTypeValue::CopyOnWrite;
@@ -334,7 +357,9 @@ impl Table {
Ok(batches)
}
- /// Get records that were inserted or updated between the given
timestamps. Records that were updated multiple times should have their latest
states within the time span being returned.
+ /// Get records that were inserted or updated between the given timestamps.
+ /// Records that were updated multiple times should have their latest
states within
+ /// the time span being returned.
pub async fn read_incremental_records(
&self,
start_timestamp: &str,
@@ -361,11 +386,11 @@ impl Table {
// Read incremental records from the file slices.
let filters = &[
-
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
-
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
+ (MetaField::CommitTime.as_ref(), ">", start_timestamp),
+ (MetaField::CommitTime.as_ref(), "<=", end_timestamp),
];
- let fg_reader =
- self.create_file_group_reader_with_filters(filters,
MetaField::schema().as_ref())?;
+ let schema = MetaField::schema();
+ let fg_reader = self.create_file_group_reader_with_filters(filters,
&schema)?;
// Read-optimized mode does not apply to incremental query semantics.
let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
@@ -381,7 +406,9 @@ impl Table {
Ok(batches)
}
- /// Get the change-data-capture (CDC) records between the given
timestamps. The CDC records should reflect the records that were inserted,
updated, and deleted between the timestamps.
+ /// Get the change-data-capture (CDC) records between the given timestamps.
+ /// The CDC records should reflect the records that were inserted,
updated, and deleted
+ /// between the timestamps.
#[allow(dead_code)]
async fn read_incremental_changes(
&self,
@@ -405,7 +432,6 @@ mod tests {
use crate::config::HUDI_CONF_DIR;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
- use crate::table::Filter;
use hudi_test::SampleTable;
use std::collections::HashSet;
use std::fs::canonicalize;
@@ -431,7 +457,10 @@ mod tests {
}
/// Test helper to get relative file paths from the table with filters.
- async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) ->
Result<Vec<String>> {
+ async fn get_file_paths_with_filters(
+ table: &Table,
+ filters: &[(&str, &str, &str)],
+ ) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let base_url = table.base_url();
for f in table.get_file_slices(filters).await? {
@@ -783,11 +812,11 @@ mod tests {
);
// as of the latest timestamp
- let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
- let hudi_table = Table::new_with_options(base_url.path(), opts)
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices = hudi_table
+ .get_file_slices_as_of("20240418173551906", &[])
.await
.unwrap();
- let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -847,11 +876,8 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let filter_ge_10 = Filter::try_from(("byteField", ">=",
"10")).unwrap();
-
- let filter_lt_30 = Filter::try_from(("byteField", "<", "30")).unwrap();
-
- let actual = get_file_paths_with_filters(&hudi_table, &[filter_ge_10,
filter_lt_30])
+ let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
+ let actual = get_file_paths_with_filters(&hudi_table, &filters)
.await
.unwrap()
.into_iter()
@@ -865,8 +891,7 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let filter_gt_30 = Filter::try_from(("byteField", ">", "30")).unwrap();
- let actual = get_file_paths_with_filters(&hudi_table, &[filter_gt_30])
+ let actual = get_file_paths_with_filters(&hudi_table, &[("byteField",
">", "30")])
.await
.unwrap()
.into_iter()
@@ -897,16 +922,16 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let filter_gte_10 = Filter::try_from(("byteField", ">=",
"10")).unwrap();
- let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
- let filter_ne_100 = Filter::try_from(("shortField", "!=",
"100")).unwrap();
-
- let actual =
- get_file_paths_with_filters(&hudi_table, &[filter_gte_10,
filter_lt_20, filter_ne_100])
- .await
- .unwrap()
- .into_iter()
- .collect::<HashSet<_>>();
+ let filters = [
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ];
+ let actual = get_file_paths_with_filters(&hudi_table, &filters)
+ .await
+ .unwrap()
+ .into_iter()
+ .collect::<HashSet<_>>();
let expected = [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
@@ -914,10 +939,9 @@ mod tests {
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let filter_lt_20 = Filter::try_from(("byteField", ">", "20")).unwrap();
- let filter_eq_300 = Filter::try_from(("shortField", "=",
"300")).unwrap();
- let actual = get_file_paths_with_filters(&hudi_table, &[filter_lt_20,
filter_eq_300])
+ let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
+ let actual = get_file_paths_with_filters(&hudi_table, &filters)
.await
.unwrap()
.into_iter()
@@ -966,7 +990,9 @@ mod tests {
#[tokio::test]
async fn test_non_partitioned_read_optimized() -> Result<()> {
let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
- let hudi_table = Table::new(base_url.path()).await?;
+ let hudi_table =
+ Table::new_with_options(base_url.path(),
[(UseReadOptimizedMode.as_ref(), "true")])
+ .await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -974,9 +1000,7 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let latest_commit = commit_timestamps.last().unwrap();
- let records = hudi_table
- .read_snapshot_as_of(latest_commit, &[], true)
- .await?;
+ let records = hudi_table.read_snapshot_as_of(latest_commit,
&[]).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1019,9 +1043,9 @@ mod tests {
let hudi_table = Table::new(base_url.path()).await?;
let filters = &[
- Filter::try_from(("byteField", ">=", "10"))?,
- Filter::try_from(("byteField", "<", "20"))?,
- Filter::try_from(("shortField", "!=", "100"))?,
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
];
let records = hudi_table.read_snapshot(filters).await?;
let schema = &records[0].schema();
@@ -1044,9 +1068,7 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records = hudi_table
- .read_snapshot_as_of(first_commit, &[], false)
- .await?;
+ let records = hudi_table.read_snapshot_as_of(first_commit,
&[]).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 84fa313..21306c9 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -18,19 +18,17 @@
*/
pub mod arrow;
-pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str,
&str, &str)> {
- vec.iter()
- .map(|(a, b, c)| (a.as_str(), b.as_str(), c.as_str()))
- .collect()
+pub trait StrTupleRef {
+ fn as_strs(&self) -> Vec<(&str, &str, &str)>;
}
-#[macro_export]
-macro_rules! vec_to_slice {
- ($vec:expr) => {
- &convert_vec_to_slice(&$vec)[..]
- };
+impl StrTupleRef for Vec<(String, String, String)> {
+ fn as_strs(&self) -> Vec<(&str, &str, &str)> {
+ self.iter()
+ .map(|(s1, s2, s3)| (s1.as_str(), s2.as_str(), s3.as_str()))
+ .collect()
+ }
}
-pub use vec_to_slice;
#[cfg(test)]
mod tests {
@@ -50,11 +48,11 @@ mod tests {
String::from("baz"),
),
];
-
- let expected_slice = vec![("date", "=", "2022-01-02"), ("foo", "bar",
"baz")];
-
- let result = vec_to_slice!(&vec_of_strings);
-
- assert_eq!(result, expected_slice);
+ let binding = vec_of_strings.as_strs();
+ let str_slice = &binding[..];
+ assert_eq!(
+ str_slice,
+ [("date", "=", "2022-01-02"), ("foo", "bar", "baz")]
+ );
}
}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 07ab3f3..50a7a05 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -47,6 +47,7 @@ use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_core::config::util::empty_options;
use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
use hudi_core::table::Table as HudiTable;
+use hudi_core::util::StrTupleRef;
/// Create a `HudiDataSource`.
/// Used for Datafusion to query Hudi tables
@@ -178,7 +179,7 @@ impl TableProvider for HudiDataSource {
let pushdown_filters = exprs_to_filters(filters);
let file_slices = self
.table
- .get_file_slices_splits(self.get_input_partitions(),
pushdown_filters.as_slice())
+ .get_file_slices_splits(self.get_input_partitions(),
&pushdown_filters.as_strs())
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
let base_url = self.table.base_url();
diff --git a/crates/datafusion/src/util/expr.rs
b/crates/datafusion/src/util/expr.rs
index 3a8c94b..7eb0eed 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -34,7 +34,7 @@ use hudi_core::expr::filter::{col, Filter as HudiFilter};
/// otherwise returns `None`.
///
/// TODO: Handle other DataFusion [`Expr`]
-pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<HudiFilter> {
+pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> {
exprs
.iter()
.filter_map(|expr| match expr {
@@ -42,6 +42,7 @@ pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<HudiFilter> {
Expr::Not(not_expr) => not_expr_to_filter(not_expr),
_ => None,
})
+ .map(|filter| filter.into())
.collect()
}
@@ -111,10 +112,14 @@ mod tests {
operator: ExprOperator::Eq,
field_value: "42".to_string(),
};
-
- assert_eq!(result[0].field_name, expected_filter.field_name);
- assert_eq!(result[0].operator, expected_filter.operator);
- assert_eq!(*result[0].field_value.clone(),
expected_filter.field_value);
+ assert_eq!(
+ result[0],
+ (
+ expected_filter.field_name,
+ expected_filter.operator.to_string(),
+ expected_filter.field_value
+ )
+ );
}
#[test]
@@ -139,10 +144,14 @@ mod tests {
operator: ExprOperator::Ne,
field_value: "42".to_string(),
};
-
- assert_eq!(result[0].field_name, expected_filter.field_name);
- assert_eq!(result[0].operator, expected_filter.operator);
- assert_eq!(*result[0].field_value.clone(),
expected_filter.field_value);
+ assert_eq!(
+ result[0],
+ (
+ expected_filter.field_name,
+ expected_filter.operator.to_string(),
+ expected_filter.field_value
+ )
+ );
}
#[test]
@@ -193,9 +202,14 @@ mod tests {
assert_eq!(result.len(), expected_filters.len());
for (result, expected_filter) in
result.iter().zip(expected_filters.iter()) {
- assert_eq!(result.field_name, expected_filter.field_name);
- assert_eq!(result.operator, expected_filter.operator);
- assert_eq!(*result.field_value.clone(),
expected_filter.field_value);
+ assert_eq!(
+ result,
+ &(
+ expected_filter.field_name.clone(),
+ expected_filter.operator.to_string(),
+ expected_filter.field_value.clone()
+ )
+ );
}
}
@@ -229,10 +243,14 @@ mod tests {
operator: expected_op,
field_value: String::from("42"),
};
-
- assert_eq!(result[0].field_name, expected_filter.field_name);
- assert_eq!(result[0].operator, expected_filter.operator);
- assert_eq!(*result[0].field_value.clone(),
expected_filter.field_value);
+ assert_eq!(
+ result[0],
+ (
+ expected_filter.field_name,
+ expected_filter.operator.to_string(),
+ expected_filter.field_value
+ )
+ );
}
}
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 9d300fd..1bfb953 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -26,13 +26,13 @@ use arrow::pyarrow::ToPyArrow;
use tokio::runtime::Runtime;
use hudi::error::CoreError;
-use hudi::expr::filter::Filter;
use hudi::file_group::file_slice::FileSlice;
use hudi::file_group::reader::FileGroupReader;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
-use pyo3::exceptions::{PyException, PyValueError};
+use hudi::util::StrTupleRef;
+use pyo3::exceptions::PyException;
use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject,
PyResult, Python};
create_exception!(_internal, HudiCoreError, PyException);
@@ -195,11 +195,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
- let filters = convert_filters(filters)?;
+ let filters = filters.unwrap_or_default();
py.allow_threads(|| {
let file_slices = rt()
- .block_on(self.inner.get_file_slices_splits(n, &filters))
+ .block_on(self.inner.get_file_slices_splits(n,
&filters.as_strs()))
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
@@ -214,11 +214,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
- let filters = convert_filters(filters)?;
+ let filters = filters.unwrap_or_default();
py.allow_threads(|| {
let file_slices = rt()
- .block_on(self.inner.get_file_slices(&filters))
+ .block_on(self.inner.get_file_slices(&filters.as_strs()))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
@@ -235,9 +235,9 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
- let filters = convert_filters(filters)?;
+ let filters = filters.unwrap_or_default();
- rt().block_on(self.inner.read_snapshot(&filters))
+ rt().block_on(self.inner.read_snapshot(&filters.as_strs()))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
@@ -258,21 +258,6 @@ impl HudiTable {
}
}
-fn convert_filters(filters: Option<Vec<(String, String, String)>>) ->
PyResult<Vec<Filter>> {
- filters
- .unwrap_or_default()
- .into_iter()
- .map(|(field, op, value)| {
- Filter::try_from((field.as_str(), op.as_str(),
value.as_str())).map_err(|e| {
- PyValueError::new_err(format!(
- "Invalid filter ({}, {}, {}): {}",
- field, op, value, e
- ))
- })
- })
- .collect()
-}
-
#[cfg(not(tarpaulin))]
#[pyfunction]
#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None,
options=None))]