mhilton opened a new issue, #6914:
URL: https://github.com/apache/arrow-datafusion/issues/6914
### Describe the bug
Somewhere in the execution path of user-defined window functions the results
are erroneously detected be non-nullable. Any NULLs in the output then cause an
`ArgumentError`.
### To Reproduce
Using this (somewhat contrived) test:
```
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::ScalarValue;
use datafusion::error::Result;
use datafusion::logical_expr::{
PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction,
Signature, Volatility,
WindowUDF,
};
use datafusion::prelude::*;
use std::ops::Range;
use std::sync::Arc;
pub fn new_window_udf() -> WindowUDF {
let return_type: ReturnTypeFunction = Arc::new(return_type);
let partition_evaluator_factory: PartitionEvaluatorFactory =
Arc::new(partition_evaluator_factory);
WindowUDF {
name: "test_window_udf".into(),
signature: Signature::exact(vec![DataType::Int64],
Volatility::Immutable),
return_type,
partition_evaluator_factory,
}
}
fn return_type(_: &[DataType]) -> Result<Arc<DataType>> {
Ok(Arc::new(DataType::UInt64))
}
fn partition_evaluator_factory() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(TestPartitionEvaluator {}))
}
#[derive(Debug)]
struct TestPartitionEvaluator {}
impl PartitionEvaluator for TestPartitionEvaluator {
fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) ->
Result<ScalarValue> {
let array = Arc::clone(&values[0]);
let mut total = ScalarValue::UInt64(None);
for idx in range.clone() {
let v = ScalarValue::try_from_array(&Arc::clone(&array), idx)?;
match v {
ScalarValue::Int64(Some(n)) if n >= 0 => {
total = total.add(ScalarValue::UInt64(Some(n as u64)))?
}
_ => {}
}
}
Ok(total)
}
fn uses_window_frame(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{DefaultTableSource, MemTable};
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::{
LogicalPlanBuilder, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
#[tokio::test]
async fn window_udf() {
let ctx = SessionContext::new();
let mut builder = Int64Array::builder(10);
for i in -5..5 {
builder.append_value(i);
}
let schema = Arc::new(Schema::new(vec![Field::new(
"numbers",
DataType::Int64,
false,
)]));
let data =
RecordBatch::try_new(Arc::clone(&schema),
vec![Arc::new(builder.finish())]).unwrap();
let table = MemTable::try_new(schema, vec![vec![data]]).unwrap();
ctx.register_table("test_table", Arc::new(table)).unwrap();
let table_provider = ctx.table_provider("test_table").await.unwrap();
let plan = LogicalPlanBuilder::scan(
"test_table",
Arc::new(DefaultTableSource::new(Arc::clone(&table_provider))),
None,
)
.unwrap()
.window(vec![Expr::WindowFunction(WindowFunction::new(
datafusion::logical_expr::WindowFunction::WindowUDF(Arc::new(new_window_udf())),
vec![col("numbers")],
vec![],
vec![],
WindowFrame {
units: WindowFrameUnits::Rows,
start_bound:
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
end_bound:
WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
},
))])
.unwrap()
.build()
.unwrap();
println!("{:?}", plan.schema());
let records = ctx
.execute_logical_plan(plan)
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(records.len(), 1);
}
}
```
The result is:
```
thread 'wudf::tests::window_udf' panicked at 'called `Result::unwrap()` on
an `Err` value: ArrowError(InvalidArgumentError("Column
'test_window_udf(test_table.numbers)' is declared as non-nullable but contains
null values"))', src/wudf.rs:118:14
```
### Expected behavior
The user-defined window function should be able to output NULLs without
causing an `ArgumentError`.
### Additional context
If one requests the schema from the logical plan before execution it
produces:
```
DFSchema { fields: [DFField { qualifier: Some(Bare { table: "test_table" }),
field: Field { name: "numbers", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} } }, DFField { qualifier: None, field:
Field { name: "test_window_udf(test_table.numbers) ROWS BETWEEN 1 PRECEDING AND
1 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} } }], metadata: {} }
```
This suggests that the inconsistency creeps in somewhere in the
physical-plan or execution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]