alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230806847


##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => 
Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, 
name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {
+            let aggregate =
+                udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, 
name)?;
+
+            if !unbounded_window && aggregate.retractable()? {

Review Comment:
   I will make this change -- thank you @mustafasrepo  -- I didn't appreciate 
that there is no correct way to run the AggregateUDF correctly without 
retract_batch. Thus I think the correct solution is to return an error when 
such a combination is attempted. I will update the PR to reflect this



##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract 
batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = 
TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract 
batch is used

Review Comment:
   Thank you -- I am glad I asked!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to