alamb commented on code in PR #11163:
URL: https://github.com/apache/datafusion/pull/11163#discussion_r1661072390
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1247,36 +1164,50 @@ pub async fn from_substrait_rex(
None => substrait_err!("Cast expression without output type is not
allowed"),
},
Some(RexType::WindowFunction(window)) => {
- let fun = match extensions.get(&window.function_reference) {
- Some(function_name) => {
- // check udaf
- match ctx.udaf(function_name) {
- Ok(udaf) => {
-
Ok(Some(WindowFunctionDefinition::AggregateUDF(udaf)))
- }
- Err(_) => Ok(find_df_window_func(function_name)),
- }
- }
- None => not_impl_err!(
- "Window function not found: function anchor = {:?}",
- &window.function_reference
- ),
+ let Some(fn_name) = extensions.get(&window.function_reference)
else {
+ return plan_err!(
+ "Window function not found: function reference = {:?}",
+ window.function_reference
+ );
};
+ let fn_name = substrait_fun_name(fn_name);
+
+ // check udaf first, then built-in functions
+ let fun = match ctx.udaf(fn_name) {
+ Ok(udaf) => Ok(WindowFunctionDefinition::AggregateUDF(udaf)),
+ Err(_) => find_df_window_func(fn_name).ok_or_else(|| {
+ not_impl_datafusion_err!(
+ "Window function {} is not supported: function anchor
= {:?}",
+ fn_name,
+ window.function_reference
+ )
+ }),
+ }?;
+
let order_by =
from_substrait_sorts(ctx, &window.sorts, input_schema,
extensions)
.await?;
- // Substrait does not encode WindowFrameUnits so we're using a
simple logic to determine the units
- // If there is no `ORDER BY`, then by default, the frame counts
each row from the lower up to upper boundary
- // If there is `ORDER BY`, then by default, each frame is a range
starting from unbounded preceding to current row
- // TODO: Consider the cases where window frame is specified in
query and is different from default
- let units = if order_by.is_empty() {
- WindowFrameUnits::Rows
- } else {
- WindowFrameUnits::Range
- };
+
+ let bound_units =
+ match BoundsType::try_from(window.bounds_type).map_err(|e| {
+ plan_datafusion_err!("Invalid bound type {}: {e}",
window.bounds_type)
+ })? {
+ BoundsType::Rows => WindowFrameUnits::Rows,
+ BoundsType::Range => WindowFrameUnits::Range,
+ BoundsType::Unspecified => {
+ // If the plan does not specify the bounds type, then
we use a simple logic to determine the units
+ // If there is no `ORDER BY`, then by default, the
frame counts each row from the lower up to upper boundary
+ // If there is `ORDER BY`, then by default, each frame
is a range starting from unbounded preceding to current row
+ if order_by.is_empty() {
Review Comment:
I would personally suggest a `not_impl_err` to avoid silently ignored
errors, but we can do it as a follow on PR
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1247,36 +1164,50 @@ pub async fn from_substrait_rex(
None => substrait_err!("Cast expression without output type is not
allowed"),
},
Some(RexType::WindowFunction(window)) => {
- let fun = match extensions.get(&window.function_reference) {
- Some(function_name) => {
- // check udaf
- match ctx.udaf(function_name) {
- Ok(udaf) => {
-
Ok(Some(WindowFunctionDefinition::AggregateUDF(udaf)))
- }
- Err(_) => Ok(find_df_window_func(function_name)),
- }
- }
- None => not_impl_err!(
- "Window function not found: function anchor = {:?}",
- &window.function_reference
- ),
+ let Some(fn_name) = extensions.get(&window.function_reference)
else {
+ return plan_err!(
+ "Window function not found: function reference = {:?}",
+ window.function_reference
+ );
};
+ let fn_name = substrait_fun_name(fn_name);
+
+ // check udaf first, then built-in functions
+ let fun = match ctx.udaf(fn_name) {
+ Ok(udaf) => Ok(WindowFunctionDefinition::AggregateUDF(udaf)),
+ Err(_) => find_df_window_func(fn_name).ok_or_else(|| {
+ not_impl_datafusion_err!(
+ "Window function {} is not supported: function anchor
= {:?}",
+ fn_name,
+ window.function_reference
+ )
+ }),
+ }?;
+
let order_by =
from_substrait_sorts(ctx, &window.sorts, input_schema,
extensions)
.await?;
- // Substrait does not encode WindowFrameUnits so we're using a
simple logic to determine the units
- // If there is no `ORDER BY`, then by default, the frame counts
each row from the lower up to upper boundary
- // If there is `ORDER BY`, then by default, each frame is a range
starting from unbounded preceding to current row
- // TODO: Consider the cases where window frame is specified in
query and is different from default
- let units = if order_by.is_empty() {
- WindowFrameUnits::Rows
- } else {
- WindowFrameUnits::Range
- };
+
+ let bound_units =
+ match BoundsType::try_from(window.bounds_type).map_err(|e| {
+ plan_datafusion_err!("Invalid bound type {}: {e}",
window.bounds_type)
+ })? {
+ BoundsType::Rows => WindowFrameUnits::Rows,
+ BoundsType::Range => WindowFrameUnits::Range,
+ BoundsType::Unspecified => {
+ // If the plan does not specify the bounds type, then
we use a simple logic to determine the units
+ // If there is no `ORDER BY`, then by default, the
frame counts each row from the lower up to upper boundary
+ // If there is `ORDER BY`, then by default, each frame
is a range starting from unbounded preceding to current row
+ if order_by.is_empty() {
Review Comment:
I would personally suggest a `not_impl_err` to avoid silently ignored
errors, but we can do it as a follow on PR (or never)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]