alamb commented on code in PR #22604:
URL: https://github.com/apache/datafusion/pull/22604#discussion_r3397604009
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -25,16 +25,19 @@ use std::hash::Hash;
use std::sync::Arc;
use arrow::array::RecordBatch;
-use arrow::datatypes::{DataType, FieldRef, SchemaRef};
+use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef};
use datafusion_common::{
DataFusionError, Result, ScalarValue, exec_err,
metadata::FieldMetadata,
nested_struct::validate_data_type_compatibility,
- tree_node::{Transformed, TransformedResult, TreeNode},
+ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion},
+};
+use datafusion_expr::ScalarUDFImpl;
+use datafusion_functions::core::{
Review Comment:
It is unfortunate to me that the physical expr has to special case
`FileRowIndexFunc`, but sort of some very specific API on PhysicalExpr (like
`is_row_index_func()`) I can't really think of anything better.
I realize we already have the special case for GetFieldFunc so I think this
is ok
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -81,6 +84,118 @@ where
.data()
}
+/// Return true if `expr` references scalar UDF `T`.
+///
+/// This matches the concrete [`ScalarUDFImpl`] type rather than the function
+/// name, so unrelated UDFs with the same name are not treated as matches.
+pub fn expr_references_scalar_udf<T: ScalarUDFImpl>(
+ expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+ let mut found = false;
+
+ expr.apply(|node| {
+ if ScalarFunctionExpr::try_downcast_func::<T>(node.as_ref()).is_some()
{
+ found = true;
+ return Ok(TreeNodeRecursion::Stop);
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .expect("Infallible traversal of PhysicalExpr tree failed");
+
+ found
+}
+
+/// Rewrite occurrences of scalar UDF `T` in `expr` using `replacement`.
+///
+/// The rewrite matches the concrete [`ScalarUDFImpl`] type rather than the
+/// function name. `replacement` is called with each matching
+/// [`ScalarFunctionExpr`] after its children have been rewritten.
+pub fn rewrite_scalar_udf<T, F>(
Review Comment:
why make this a public API? It seems fine to me, but somewhat random
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -484,6 +489,13 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
return Ok(recursion);
}
+ if
ScalarFunctionExpr::try_downcast_func::<FileRowIndexFunc>(node.as_ref())
Review Comment:
I am surprised that marking the function `volatile` isn't sufficient to
prevent pushdown 🤔
##########
docs/source/user-guide/sql/scalar_functions.md:
##########
@@ -5852,6 +5853,27 @@ cast_to_type(expression, reference)
+-----+
```
+### `file_row_index`
Review Comment:
We discussed the actual API in
https://github.com/apache/datafusion/issues/20135
The alternate we discussed was some sort of `_metadata` virtual column
However, it seems like the final decision (per @adriangb was
https://github.com/apache/datafusion/issues/20135#issuecomment-4489764202)
> IIUC the plan we end up with is:
> ```
> FilterExec: projection=[other_col], filter=[file_row_index() > 3]
> DataSourceExec: projection=[other_col]
> ```
Which seems reasonable to me. I didn't find any common API across
implementations. See comment here
- https://github.com/apache/datafusion/issues/20135#issuecomment-4683022019
##########
datafusion/sqllogictest/test_files/file_row_index.slt:
##########
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+statement ok
Review Comment:
Can you also add a test for how this function works for some source that
doesn't support the file_row_index()? According to me testing it seems like it
just returns NULL:
```sql
> select *, file_row_index() from './datafusion-examples/data/csv/cars.csv';
+-------+-------+---------------------+------------------+
| car | speed | time | file_row_index() |
+-------+-------+---------------------+------------------+
| red | 20.0 | 1996-04-12T12:05:03 | NULL |
| red | 20.3 | 1996-04-12T12:05:04 | NULL |
...
| green | 8.0 | 1996-04-12T12:05:13 | NULL |
| green | 2.0 | 1996-04-12T12:05:14 | NULL |
+-------+-------+---------------------+------------------+
25 row(s) fetched.
Elapsed 0.002 seconds.
```
What do you think about returning a runtime error if it is evaluated instead
of NULL?
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -81,6 +84,118 @@ where
.data()
}
+/// Return true if `expr` references scalar UDF `T`.
+///
+/// This matches the concrete [`ScalarUDFImpl`] type rather than the function
+/// name, so unrelated UDFs with the same name are not treated as matches.
+pub fn expr_references_scalar_udf<T: ScalarUDFImpl>(
+ expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+ let mut found = false;
+
+ expr.apply(|node| {
+ if ScalarFunctionExpr::try_downcast_func::<T>(node.as_ref()).is_some()
{
+ found = true;
+ return Ok(TreeNodeRecursion::Stop);
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .expect("Infallible traversal of PhysicalExpr tree failed");
+
+ found
+}
+
+/// Rewrite occurrences of scalar UDF `T` in `expr` using `replacement`.
+///
+/// The rewrite matches the concrete [`ScalarUDFImpl`] type rather than the
+/// function name. `replacement` is called with each matching
+/// [`ScalarFunctionExpr`] after its children have been rewritten.
+pub fn rewrite_scalar_udf<T, F>(
+ expr: Arc<dyn PhysicalExpr>,
+ mut replacement: F,
+) -> Result<Arc<dyn PhysicalExpr>>
+where
+ T: ScalarUDFImpl,
+ F: FnMut(&ScalarFunctionExpr) -> Result<Arc<dyn PhysicalExpr>>,
+{
+ expr.transform_up(|node| {
+ if let Some(scalar_fn) =
ScalarFunctionExpr::try_downcast_func::<T>(node.as_ref())
+ {
+ Ok(Transformed::yes(replacement(scalar_fn)?))
+ } else {
+ Ok(Transformed::no(node))
+ }
+ })
+ .map(|transformed| transformed.data)
+}
+
+/// Rewrite `file_row_index()` in `expr` to read from a source-provided
+/// row-index column.
+///
+/// `row_index_idx` is the index of `row_index_name` in the schema that the
+/// rewritten expression will be evaluated against. The rewrite uses ordinary
+/// physical expressions: a [`Column`] that reads the source row-index values
+/// wrapped in a [`CastExpr`] that exposes the public `file_row_index: Int64`
+/// return field without source-specific extension metadata.
+pub fn rewrite_file_row_index_expr(
+ expr: Arc<dyn PhysicalExpr>,
+ row_index_name: &str,
+ row_index_idx: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ rewrite_scalar_udf::<FileRowIndexFunc, _>(expr, |_| {
+ let source = Arc::new(Column::new(row_index_name, row_index_idx));
+ let target_field = Arc::new(Field::new("file_row_index",
DataType::Int64, true));
+ Ok(Arc::new(CastExpr::new_with_target_field(
+ source,
+ target_field,
+ None,
+ )))
+ })
+}
+
+/// Rewrite `file_row_index()` in a pushed projection to read from a
+/// source-provided row-index column.
+///
Review Comment:
A concrete example would help me
```suggestion
///
/// For example if `row_index_column` is `__datafusion_row_idx` this
function rewrites all
/// instances of `file_row_index()` to `__datafusion_row_index` column
references
```
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -669,7 +678,26 @@ impl FileSource for ParquetSource {
projection: &ProjectionExprs,
) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
- source.projection = self.projection.try_merge(projection)?;
+
+ if !projection.iter().any(|projection_expr| {
Review Comment:
Can you add a comment about why the merge is skipped if there is a
file_row_index() call?
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -81,6 +84,118 @@ where
.data()
}
+/// Return true if `expr` references scalar UDF `T`.
+///
+/// This matches the concrete [`ScalarUDFImpl`] type rather than the function
+/// name, so unrelated UDFs with the same name are not treated as matches.
+pub fn expr_references_scalar_udf<T: ScalarUDFImpl>(
+ expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+ let mut found = false;
+
+ expr.apply(|node| {
+ if ScalarFunctionExpr::try_downcast_func::<T>(node.as_ref()).is_some()
{
+ found = true;
+ return Ok(TreeNodeRecursion::Stop);
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .expect("Infallible traversal of PhysicalExpr tree failed");
+
+ found
+}
+
+/// Rewrite occurrences of scalar UDF `T` in `expr` using `replacement`.
+///
+/// The rewrite matches the concrete [`ScalarUDFImpl`] type rather than the
+/// function name. `replacement` is called with each matching
+/// [`ScalarFunctionExpr`] after its children have been rewritten.
+pub fn rewrite_scalar_udf<T, F>(
+ expr: Arc<dyn PhysicalExpr>,
+ mut replacement: F,
+) -> Result<Arc<dyn PhysicalExpr>>
+where
+ T: ScalarUDFImpl,
+ F: FnMut(&ScalarFunctionExpr) -> Result<Arc<dyn PhysicalExpr>>,
+{
+ expr.transform_up(|node| {
+ if let Some(scalar_fn) =
ScalarFunctionExpr::try_downcast_func::<T>(node.as_ref())
+ {
+ Ok(Transformed::yes(replacement(scalar_fn)?))
+ } else {
+ Ok(Transformed::no(node))
+ }
+ })
+ .map(|transformed| transformed.data)
+}
+
+/// Rewrite `file_row_index()` in `expr` to read from a source-provided
+/// row-index column.
+///
+/// `row_index_idx` is the index of `row_index_name` in the schema that the
+/// rewritten expression will be evaluated against. The rewrite uses ordinary
+/// physical expressions: a [`Column`] that reads the source row-index values
+/// wrapped in a [`CastExpr`] that exposes the public `file_row_index: Int64`
+/// return field without source-specific extension metadata.
+pub fn rewrite_file_row_index_expr(
+ expr: Arc<dyn PhysicalExpr>,
+ row_index_name: &str,
+ row_index_idx: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ rewrite_scalar_udf::<FileRowIndexFunc, _>(expr, |_| {
+ let source = Arc::new(Column::new(row_index_name, row_index_idx));
+ let target_field = Arc::new(Field::new("file_row_index",
DataType::Int64, true));
+ Ok(Arc::new(CastExpr::new_with_target_field(
+ source,
+ target_field,
+ None,
+ )))
+ })
+}
+
+/// Rewrite `file_row_index()` in a pushed projection to read from a
+/// source-provided row-index column.
+///
+/// `base_projection` is the current projection already pushed into a source.
+/// The row-index source column is appended to that base projection if it is
not
+/// already present. `projection` is rewritten to read from the projected
+/// row-index column and then merged on top of the extended base projection.
+pub fn rewrite_file_row_index_projection(
+ base_projection: &ProjectionExprs,
+ projection: &ProjectionExprs,
+ row_index_col: &Column,
+) -> Result<ProjectionExprs> {
+ let mut base_exprs = base_projection.as_ref().to_vec();
+ let row_index_projection_idx = row_index_projection_idx(&base_exprs,
row_index_col);
+ if row_index_projection_idx == base_exprs.len() {
+ base_exprs.push(ProjectionExpr {
+ expr: Arc::new(row_index_col.clone()),
+ alias: row_index_col.name().to_owned(),
+ });
+ }
+
+ let rewritten_projection = projection.clone().try_map_exprs(|expr| {
+ rewrite_file_row_index_expr(expr, row_index_col.name(),
row_index_projection_idx)
+ })?;
+
+ ProjectionExprs::new(base_exprs).try_merge(&rewritten_projection)
+}
+
+fn row_index_projection_idx(
Review Comment:
This seems like a good method to add to `ProjectionExprs` (and add some
documentation) as it could potentially be used by others
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -989,6 +1014,61 @@ impl FileSource for ParquetSource {
}
}
+fn table_schema_with_row_index_col(table_schema: &TableSchema) ->
(TableSchema, Column) {
Review Comment:
some comments here would help future readers I think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]