alamb commented on code in PR #11163:
URL: https://github.com/apache/datafusion/pull/11163#discussion_r1661072390


##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1247,36 +1164,50 @@ pub async fn from_substrait_rex(
             None => substrait_err!("Cast expression without output type is not 
allowed"),
         },
         Some(RexType::WindowFunction(window)) => {
-            let fun = match extensions.get(&window.function_reference) {
-                Some(function_name) => {
-                    // check udaf
-                    match ctx.udaf(function_name) {
-                        Ok(udaf) => {
-                            
Ok(Some(WindowFunctionDefinition::AggregateUDF(udaf)))
-                        }
-                        Err(_) => Ok(find_df_window_func(function_name)),
-                    }
-                }
-                None => not_impl_err!(
-                    "Window function not found: function anchor = {:?}",
-                    &window.function_reference
-                ),
+            let Some(fn_name) = extensions.get(&window.function_reference) 
else {
+                return plan_err!(
+                    "Window function not found: function reference = {:?}",
+                    window.function_reference
+                );
             };
+            let fn_name = substrait_fun_name(fn_name);
+
+            // check udaf first, then built-in functions
+            let fun = match ctx.udaf(fn_name) {
+                Ok(udaf) => Ok(WindowFunctionDefinition::AggregateUDF(udaf)),
+                Err(_) => find_df_window_func(fn_name).ok_or_else(|| {
+                    not_impl_datafusion_err!(
+                        "Window function {} is not supported: function anchor 
= {:?}",
+                        fn_name,
+                        window.function_reference
+                    )
+                }),
+            }?;
+
             let order_by =
                 from_substrait_sorts(ctx, &window.sorts, input_schema, 
extensions)
                     .await?;
-            // Substrait does not encode WindowFrameUnits so we're using a 
simple logic to determine the units
-            // If there is no `ORDER BY`, then by default, the frame counts 
each row from the lower up to upper boundary
-            // If there is `ORDER BY`, then by default, each frame is a range 
starting from unbounded preceding to current row
-            // TODO: Consider the cases where window frame is specified in 
query and is different from default
-            let units = if order_by.is_empty() {
-                WindowFrameUnits::Rows
-            } else {
-                WindowFrameUnits::Range
-            };
+
+            let bound_units =
+                match BoundsType::try_from(window.bounds_type).map_err(|e| {
+                    plan_datafusion_err!("Invalid bound type {}: {e}", 
window.bounds_type)
+                })? {
+                    BoundsType::Rows => WindowFrameUnits::Rows,
+                    BoundsType::Range => WindowFrameUnits::Range,
+                    BoundsType::Unspecified => {
+                        // If the plan does not specify the bounds type, then 
we use a simple logic to determine the units
+                        // If there is no `ORDER BY`, then by default, the 
frame counts each row from the lower up to upper boundary
+                        // If there is `ORDER BY`, then by default, each frame 
is a range starting from unbounded preceding to current row
+                        if order_by.is_empty() {

Review Comment:
   I would personally suggest a `not_impl_err` to avoid silently ignored 
errors, but we can do it as a follow on PR



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1247,36 +1164,50 @@ pub async fn from_substrait_rex(
             None => substrait_err!("Cast expression without output type is not 
allowed"),
         },
         Some(RexType::WindowFunction(window)) => {
-            let fun = match extensions.get(&window.function_reference) {
-                Some(function_name) => {
-                    // check udaf
-                    match ctx.udaf(function_name) {
-                        Ok(udaf) => {
-                            
Ok(Some(WindowFunctionDefinition::AggregateUDF(udaf)))
-                        }
-                        Err(_) => Ok(find_df_window_func(function_name)),
-                    }
-                }
-                None => not_impl_err!(
-                    "Window function not found: function anchor = {:?}",
-                    &window.function_reference
-                ),
+            let Some(fn_name) = extensions.get(&window.function_reference) 
else {
+                return plan_err!(
+                    "Window function not found: function reference = {:?}",
+                    window.function_reference
+                );
             };
+            let fn_name = substrait_fun_name(fn_name);
+
+            // check udaf first, then built-in functions
+            let fun = match ctx.udaf(fn_name) {
+                Ok(udaf) => Ok(WindowFunctionDefinition::AggregateUDF(udaf)),
+                Err(_) => find_df_window_func(fn_name).ok_or_else(|| {
+                    not_impl_datafusion_err!(
+                        "Window function {} is not supported: function anchor 
= {:?}",
+                        fn_name,
+                        window.function_reference
+                    )
+                }),
+            }?;
+
             let order_by =
                 from_substrait_sorts(ctx, &window.sorts, input_schema, 
extensions)
                     .await?;
-            // Substrait does not encode WindowFrameUnits so we're using a 
simple logic to determine the units
-            // If there is no `ORDER BY`, then by default, the frame counts 
each row from the lower up to upper boundary
-            // If there is `ORDER BY`, then by default, each frame is a range 
starting from unbounded preceding to current row
-            // TODO: Consider the cases where window frame is specified in 
query and is different from default
-            let units = if order_by.is_empty() {
-                WindowFrameUnits::Rows
-            } else {
-                WindowFrameUnits::Range
-            };
+
+            let bound_units =
+                match BoundsType::try_from(window.bounds_type).map_err(|e| {
+                    plan_datafusion_err!("Invalid bound type {}: {e}", 
window.bounds_type)
+                })? {
+                    BoundsType::Rows => WindowFrameUnits::Rows,
+                    BoundsType::Range => WindowFrameUnits::Range,
+                    BoundsType::Unspecified => {
+                        // If the plan does not specify the bounds type, then 
we use a simple logic to determine the units
+                        // If there is no `ORDER BY`, then by default, the 
frame counts each row from the lower up to upper boundary
+                        // If there is `ORDER BY`, then by default, each frame 
is a range starting from unbounded preceding to current row
+                        if order_by.is_empty() {

Review Comment:
   I would personally suggest a `not_impl_err` to avoid silently ignored 
errors, but we can do it as a follow on PR (or never)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to