xushiyan commented on code in PR #203:
URL: https://github.com/apache/hudi-rs/pull/203#discussion_r1865174599
##########
crates/core/src/table/mod.rs:
##########
@@ -194,7 +195,7 @@ impl Table {
pub async fn get_file_slices_splits(
&self,
n: usize,
- filters: &[(&str, &str, &str)],
+ filters: &[PartitionFilter],
Review Comment:
we should provide a generic struct for constructing filter, and not limited
to PartitionFilter. And let's mark the existing api deprecated in the upcoming
release and only remove in a future release
##########
crates/core/src/exprs/mod.rs:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod filter;
+
+use anyhow::{anyhow, Error};
+use std::cmp::PartialEq;
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::str::FromStr;
+
+pub use filter::*;
+
+/// An operator that represents a comparison operation used in a partition
filter expression.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum HudiOperator {
Review Comment:
```suggestion
pub enum ExprOperator {
```
Let's make the name more precise. Plus, within `hudi-core`, we keep a
convention to omit "Hudi" prefix since these are all for Hudi implicitly. When
imported names have conflicts, we give the imported aliases.
##########
crates/core/src/table/fs_view.rs:
##########
@@ -296,8 +306,16 @@ mod tests {
.await
.unwrap();
let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
+
+ let schema = create_test_schema();
+ let filter_lt_20 = PartitionFilter::try_from((("byteField", "<",
"20"), &schema))
+ .map_err(|e| anyhow!("Failed to create PartitionFilter: {}", e))
+ .unwrap();
+ let filter_eq_300 = PartitionFilter::try_from((("shortField", "=",
"300"), &schema))
+ .map_err(|e| anyhow!("Failed to create PartitionFilter: {}", e))
Review Comment:
these 2 `map_err` are not necessary - they won't fail, just unwrap them.
We're removing anyhow in another PR, removing will reduce its footprint.
##########
crates/datafusion/src/utils/exprs_to_filter.rs:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use arrow_array::{Array, Scalar};
+use arrow_schema::SchemaRef;
+use datafusion::logical_expr::Operator;
+use datafusion_expr::{BinaryExpr, Expr};
+use hudi_core::exprs::{HudiOperator, PartitionFilter};
+use std::sync::Arc;
+
+// TODO: Handle other Datafusion `Expr`
+
+/// Converts a slice of DataFusion expressions (`Expr`) into a vector of
`PartitionFilter`.
+/// Returns `Some(Vec<PartitionFilter>)` if at least one filter is
successfully converted,
+/// otherwise returns `None`.
+pub fn convert_exprs_to_filter(
+ filters: &[Expr],
+ partition_schema: &SchemaRef,
+) -> Vec<PartitionFilter> {
+ let mut partition_filters = Vec::new();
+
+ for expr in filters {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => {
+ if let Some(partition_filter) =
convert_binary_expr(binary_expr, partition_schema) {
+ partition_filters.push(partition_filter);
+ }
+ }
+ Expr::Not(not_expr) => {
+ // Handle NOT expressions
+ if let Some(partition_filter) = convert_not_expr(not_expr,
partition_schema) {
+ partition_filters.push(partition_filter);
+ }
+ }
+ _ => {
+ continue;
+ }
+ }
+ }
+
+ partition_filters
+}
+
+/// Converts a binary expression (`Expr::BinaryExpr`) into a `PartitionFilter`.
+fn convert_binary_expr(
+ binary_expr: &BinaryExpr,
+ partition_schema: &SchemaRef,
+) -> Option<PartitionFilter> {
+ // extract the column and literal from the binary expression
+ let (column, literal) = match (&*binary_expr.left, &*binary_expr.right) {
+ (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
+ (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
+ _ => return None,
+ };
+
+ let field = partition_schema
+ .field_with_name(column.name())
+ .unwrap()
Review Comment:
this could panic - can we handle the error with a proper datafusion error
type?
##########
crates/core/src/exprs/filter.rs:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::exprs::HudiOperator;
+
+use anyhow::{Context, Result};
+use arrow_array::{ArrayRef, Scalar, StringArray};
+use arrow_cast::{cast_with_options, CastOptions};
+use arrow_schema::{DataType, Field, Schema};
+use std::str::FromStr;
+
+/// A partition filter that represents a filter expression for partition
pruning.
+#[derive(Debug, Clone)]
+pub struct PartitionFilter {
Review Comment:
partition filter should be exclusively used by `PartitionPruner`, so it
should stay within `partition.rs`. Here in `filter.rs`, we can have a generic
model for filter, and get pushed down from datafusion or other engine. Only
when constructing a `PartitionPruner` can we decide if the filter should be
utilized by the pruner
##########
crates/datafusion/src/utils/exprs_to_filter.rs:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use arrow_array::{Array, Scalar};
+use arrow_schema::SchemaRef;
+use datafusion::logical_expr::Operator;
+use datafusion_expr::{BinaryExpr, Expr};
+use hudi_core::exprs::{HudiOperator, PartitionFilter};
+use std::sync::Arc;
+
+// TODO: Handle other Datafusion `Expr`
+
+/// Converts a slice of DataFusion expressions (`Expr`) into a vector of
`PartitionFilter`.
+/// Returns `Some(Vec<PartitionFilter>)` if at least one filter is
successfully converted,
+/// otherwise returns `None`.
+pub fn convert_exprs_to_filter(
+ filters: &[Expr],
+ partition_schema: &SchemaRef,
+) -> Vec<PartitionFilter> {
+ let mut partition_filters = Vec::new();
+
+ for expr in filters {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => {
+ if let Some(partition_filter) =
convert_binary_expr(binary_expr, partition_schema) {
+ partition_filters.push(partition_filter);
+ }
+ }
+ Expr::Not(not_expr) => {
+ // Handle NOT expressions
+ if let Some(partition_filter) = convert_not_expr(not_expr,
partition_schema) {
+ partition_filters.push(partition_filter);
+ }
+ }
+ _ => {
+ continue;
+ }
+ }
+ }
+
+ partition_filters
+}
+
+/// Converts a binary expression (`Expr::BinaryExpr`) into a `PartitionFilter`.
+fn convert_binary_expr(
Review Comment:
instead of helper functions, can we implement `TryFrom` for Hudi's generic
filter struct to convert Expr to Hudi filter?
##########
crates/datafusion/src/utils/exprs_to_filter.rs:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use arrow_array::{Array, Scalar};
+use arrow_schema::SchemaRef;
+use datafusion::logical_expr::Operator;
+use datafusion_expr::{BinaryExpr, Expr};
+use hudi_core::exprs::{HudiOperator, PartitionFilter};
+use std::sync::Arc;
+
+// TODO: Handle other Datafusion `Expr`
+
+/// Converts a slice of DataFusion expressions (`Expr`) into a vector of
`PartitionFilter`.
+/// Returns `Some(Vec<PartitionFilter>)` if at least one filter is
successfully converted,
+/// otherwise returns `None`.
+pub fn convert_exprs_to_filter(
+ filters: &[Expr],
+ partition_schema: &SchemaRef,
Review Comment:
as previous comment, we convert whatever filters we support today (e.g.,
depend on the operators), and push down generic filters to table API, and then
let `PartitionPruner` to decide which one to use for partition pruning. We may
not need to check against schema here, as the `Expr` should be valid as being
provided by datafusion.
##########
crates/datafusion/src/lib.rs:
##########
@@ -129,10 +167,11 @@ impl TableProvider for HudiDataSource {
) -> Result<Arc<dyn ExecutionPlan>> {
self.table.register_storage(state.runtime_env().clone());
+ // Convert Datafusion `Expr` to `PartitionFilter`
+ let partition_filters = convert_exprs_to_filter(filters,
&self.schema());
Review Comment:
`self.schema()` is table schema, not partition schema.
##########
python/src/internal.rs:
##########
@@ -163,10 +164,13 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
Review Comment:
we should keep python API aligned with rust API. if we introduce a generic
filter struct, we would need to bind it over here using pyo3, and then use it
in the API
##########
python/src/internal.rs:
##########
@@ -201,14 +206,37 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
- rt().block_on(
- self.inner
- .read_snapshot(vec_to_slice!(filters.unwrap_or_default())),
- )?
- .to_pyarrow(py)
+ let schema: Schema = rt().block_on(self.inner.get_schema())?;
+ let partition_filters = convert_filters(filters, &schema)?;
+
+ rt().block_on(self.inner.read_snapshot(partition_filters.as_slice()))?
+ .to_pyarrow(py)
}
}
+// Temporary fix
Review Comment:
what does this comment mean?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]