Copilot commented on code in PR #510:
URL: https://github.com/apache/hudi-rs/pull/510#discussion_r2660094963
##########
crates/datafusion/src/lib.rs:
##########
@@ -148,6 +208,43 @@ impl HudiDataSource {
_ => false,
}
}
+
+ /// Returns partition column names from partition schema.
+ fn get_partition_columns(&self) -> Vec<String> {
+ self.partition_schema
+ .fields()
+ .iter()
+ .map(|f| f.name().clone())
+ .collect()
+ }
+
+ /// Checks if expression filters on a partition column.
+ ///
+ /// Partition column filters can be marked as `Exact` because they are
+ /// fully handled by partition pruning and don't need post-filtering.
+ fn is_partition_column_filter(expr: &Expr, partition_cols: &[String]) ->
bool {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => {
+ match binary_expr.op {
+ Operator::And => {
+ // For AND, check if both sides are partition filters
+ Self::is_partition_column_filter(&binary_expr.left,
partition_cols)
+ &&
Self::is_partition_column_filter(&binary_expr.right, partition_cols)
+ }
+ _ => {
+ // Check if either side is a Column that matches a
partition column
+ matches!(&*binary_expr.left, Expr::Column(col) if
partition_cols.contains(&col.name))
+ || matches!(&*binary_expr.right, Expr::Column(col)
if partition_cols.contains(&col.name))
Review Comment:
The logic for determining if a binary expression is a partition column
filter has a flaw. When the operator is not `And`, it checks if either the left
OR right side is a partition column. This is incorrect for binary comparison
operators.
For a filter like `byteField = 1` to be considered a partition filter, the
column should be on the left side and the literal on the right (or vice versa),
but both shouldn't be columns. The current logic would incorrectly consider
`byteField = otherColumn` as a partition filter if `byteField` is a partition
column.
The check should verify that one side is a partition column AND the other
side is a literal value (not another column), ensuring it's a proper partition
pruning filter.
```suggestion
// For non-AND binary expressions, treat as a
partition filter
// only when exactly one side is a partition column
and the
// other side is a literal value.
match (&*binary_expr.left, &*binary_expr.right) {
(Expr::Column(col), Expr::Literal(_))
if partition_cols.contains(&col.name) =>
true,
(Expr::Literal(_), Expr::Column(col))
if partition_cols.contains(&col.name) =>
true,
_ => false,
}
```
##########
crates/core/src/storage/mod.rs:
##########
@@ -297,7 +302,20 @@ impl Storage {
builder = builder.with_batch_size(batch_size);
}
- if let Some(projection) = options.projection {
+ // Handle projection: convert column names to indices using builder's
schema
+ if let Some(ref column_names) = options.projection {
+ let arrow_schema = builder.schema();
+ let projection: Vec<usize> = column_names
+ .iter()
+ .map(|name| {
+ arrow_schema.index_of(name).map_err(|_| {
+ StorageError::InvalidColumn(format!(
+ "Column '{name}' not found in parquet file schema"
Review Comment:
The error message format could be more consistent with Rust error
conventions. Instead of "Column 'name' not found in parquet file schema",
consider using a more descriptive message that helps the user understand what
went wrong.
For example: "Column 'name' does not exist in the parquet file schema.
Available columns: [list of columns]". This would help users identify the
correct column names without having to inspect the schema separately.
```suggestion
let available_columns = arrow_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect::<Vec<_>>()
.join(", ");
StorageError::InvalidColumn(format!(
"Column '{name}' does not exist in Parquet file
schema. Available columns: [{available_columns}]"
```
##########
crates/datafusion/src/lib.rs:
##########
@@ -79,9 +80,31 @@ use hudi_core::table::Table as HudiTable;
/// Ok(())
/// }
/// ```
-#[derive(Clone, Debug)]
+/// A DataFusion table provider for Apache Hudi tables.
+#[derive(Clone)]
pub struct HudiDataSource {
table: Arc<HudiTable>,
+ /// Cached partition schema for determining partition columns.
+ /// This is cached at construction since partition schema rarely changes
+ /// and is needed synchronously in `supports_filters_pushdown`.
+ partition_schema: Schema,
+}
+
+impl std::fmt::Debug for HudiDataSource {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HudiDataSource")
+ .field("table", &self.table)
+ .field(
+ "partition_columns",
+ &self
+ .partition_schema
+ .fields()
+ .iter()
+ .map(|f| f.name())
Review Comment:
In the Debug implementation, the variable name `f` is used both for the
formatter parameter and the field iterator variable. While this compiles, it's
confusing and reduces code readability. Consider renaming the field iterator
variable to something more descriptive like `field` to avoid shadowing the
formatter parameter.
```suggestion
.map(|field| field.name())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]