mhilton opened a new issue, #6914:
URL: https://github.com/apache/arrow-datafusion/issues/6914

   ### Describe the bug
   
   Somewhere in the execution path of user-defined window functions the results 
are erroneously detected be non-nullable. Any NULLs in the output then cause an 
`ArgumentError`.
   
   ### To Reproduce
   
   Using this (somewhat contrived) test:
   
   ```
   use datafusion::arrow::array::ArrayRef;
   use datafusion::arrow::datatypes::DataType;
   use datafusion::common::ScalarValue;
   use datafusion::error::Result;
   use datafusion::logical_expr::{
       PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction, 
Signature, Volatility,
       WindowUDF,
   };
   use datafusion::prelude::*;
   use std::ops::Range;
   use std::sync::Arc;
   
   pub fn new_window_udf() -> WindowUDF {
       let return_type: ReturnTypeFunction = Arc::new(return_type);
       let partition_evaluator_factory: PartitionEvaluatorFactory =
           Arc::new(partition_evaluator_factory);
       WindowUDF {
           name: "test_window_udf".into(),
           signature: Signature::exact(vec![DataType::Int64], 
Volatility::Immutable),
           return_type,
           partition_evaluator_factory,
       }
   }
   
   fn return_type(_: &[DataType]) -> Result<Arc<DataType>> {
       Ok(Arc::new(DataType::UInt64))
   }
   
   fn partition_evaluator_factory() -> Result<Box<dyn PartitionEvaluator>> {
       Ok(Box::new(TestPartitionEvaluator {}))
   }
   
   #[derive(Debug)]
   struct TestPartitionEvaluator {}
   
   impl PartitionEvaluator for TestPartitionEvaluator {
       fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> 
Result<ScalarValue> {
           let array = Arc::clone(&values[0]);
   
           let mut total = ScalarValue::UInt64(None);
           for idx in range.clone() {
               let v = ScalarValue::try_from_array(&Arc::clone(&array), idx)?;
               match v {
                   ScalarValue::Int64(Some(n)) if n >= 0 => {
                       total = total.add(ScalarValue::UInt64(Some(n as u64)))?
                   }
                   _ => {}
               }
           }
           Ok(total)
       }
   
       fn uses_window_frame(&self) -> bool {
           true
       }
   }
   
   #[cfg(test)]
   mod tests {
       use super::*;
       use datafusion::arrow::array::Int64Array;
       use datafusion::arrow::datatypes::{Field, Schema};
       use datafusion::arrow::record_batch::RecordBatch;
       use datafusion::datasource::{DefaultTableSource, MemTable};
       use datafusion::logical_expr::expr::WindowFunction;
       use datafusion::logical_expr::{
           LogicalPlanBuilder, WindowFrame, WindowFrameBound, WindowFrameUnits,
       };
   
       #[tokio::test]
       async fn window_udf() {
           let ctx = SessionContext::new();
   
           let mut builder = Int64Array::builder(10);
           for i in -5..5 {
               builder.append_value(i);
           }
           let schema = Arc::new(Schema::new(vec![Field::new(
               "numbers",
               DataType::Int64,
               false,
           )]));
           let data =
               RecordBatch::try_new(Arc::clone(&schema), 
vec![Arc::new(builder.finish())]).unwrap();
           let table = MemTable::try_new(schema, vec![vec![data]]).unwrap();
           ctx.register_table("test_table", Arc::new(table)).unwrap();
   
           let table_provider = ctx.table_provider("test_table").await.unwrap();
           let plan = LogicalPlanBuilder::scan(
               "test_table",
               Arc::new(DefaultTableSource::new(Arc::clone(&table_provider))),
               None,
           )
           .unwrap()
           .window(vec![Expr::WindowFunction(WindowFunction::new(
               
datafusion::logical_expr::WindowFunction::WindowUDF(Arc::new(new_window_udf())),
               vec![col("numbers")],
               vec![],
               vec![],
               WindowFrame {
                   units: WindowFrameUnits::Rows,
                   start_bound: 
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
                   end_bound: 
WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
               },
           ))])
           .unwrap()
           .build()
           .unwrap();
   
           println!("{:?}", plan.schema());
   
           let records = ctx
               .execute_logical_plan(plan)
               .await
               .unwrap()
               .collect()
               .await
               .unwrap();
           assert_eq!(records.len(), 1);
       }
   }
   
   ```
   
   The result is:
   
   ```
   thread 'wudf::tests::window_udf' panicked at 'called `Result::unwrap()` on 
an `Err` value: ArrowError(InvalidArgumentError("Column 
'test_window_udf(test_table.numbers)' is declared as non-nullable but contains 
null values"))', src/wudf.rs:118:14
   ```
   
   ### Expected behavior
   
   The user-defined window function should be able to output NULLs without 
causing an `ArgumentError`.
   
   ### Additional context
   
   If one requests the schema from the logical plan before execution it 
produces:
   
   ```
   DFSchema { fields: [DFField { qualifier: Some(Bare { table: "test_table" }), 
field: Field { name: "numbers", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} } }, DFField { qualifier: None, field: 
Field { name: "test_window_udf(test_table.numbers) ROWS BETWEEN 1 PRECEDING AND 
1 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} } }], metadata: {} }
   ```
   
   This suggests that the inconsistency creeps in somewhere in the 
physical-plan or execution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to