gruuya opened a new issue, #21292: URL: https://github.com/apache/datafusion/issues/21292
### Describe the bug We've been seeing a panic during streaming for a large query (on sizable tables with high cardinality) ``` Column 'val' is declared as non-nullable but contains null values ``` This is unexpected as the plan seems fine, i.e. the column is nullable from the leaf nodes up to the root node, so this doesn't appear to be a planning error. The query itself has a union operator where one child has a nullable schema (column projection), while the other child has non-nullable schema (literal projection), followed by some sorting operators (among many other things). The problem seems to occur when there's spilling and the input batches with mixed nullability arive in a specific order https://github.com/apache/datafusion/blob/bc2b36cf56846e0c697b0f8b98619f346a72a9bf/datafusion/physical-plan/src/spill/in_progress_spill_file.rs#L52-L65 Note that the schema is inherited from the first batch, instead of the spill_writer. ### To Reproduce It's quite hard to reproduce this at a higher level (i.e. with plain SQL or DataFrame API), but the best I could Claude up is this ```rust const NUM_BATCHES: usize = 200; const ROWS_PER_BATCH: usize = 10; fn non_nullable_schema() -> Arc<Schema> { Arc::new(Schema::new(vec![ Field::new("key", DataType::Int64, false), Field::new("val", DataType::Int64, false), ])) } fn nullable_schema() -> Arc<Schema> { Arc::new(Schema::new(vec![ Field::new("key", DataType::Int64, false), Field::new("val", DataType::Int64, true), ])) } fn non_nullable_batches() -> Vec<RecordBatch> { (0..NUM_BATCHES) .map(|i| { let start = (i * ROWS_PER_BATCH) as i64; let keys: Vec<i64> = (start..start + ROWS_PER_BATCH as i64).collect(); RecordBatch::try_new( non_nullable_schema(), vec![ Arc::new(Int64Array::from(keys)), Arc::new(Int64Array::from(vec![0i64; ROWS_PER_BATCH])), ], ) .unwrap() }) .collect() } fn nullable_batches() -> Vec<RecordBatch> { (0..NUM_BATCHES) .map(|i| { let start = (i * ROWS_PER_BATCH) as i64; let keys: Vec<i64> = (start..start + ROWS_PER_BATCH as i64).collect(); let vals: Vec<Option<i64>> = (0..ROWS_PER_BATCH) .map(|j| if j % 3 == 1 { None } else { Some(j as i64) }) .collect(); RecordBatch::try_new( nullable_schema(), vec![ Arc::new(Int64Array::from(keys)), Arc::new(Int64Array::from(vals)), ], ) .unwrap() }) .collect() } fn build_task_ctx(pool_size: usize) -> Arc<datafusion_execution::TaskContext> { let session_config = SessionConfig::new().with_batch_size(2); let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(pool_size))) .build_arc() .unwrap(); Arc::new( datafusion_execution::TaskContext::default() .with_session_config(session_config) .with_runtime(runtime), ) } /// Exercises spilling through UnionExec -> RepartitionExec where union children /// have mismatched nullability (one child's `val` is non-nullable, the other's /// is nullable with NULLs). A tiny FairSpillPool forces all batches to spill. /// /// UnionExec returns child streams without schema coercion, so batches from /// different children carry different per-field nullability into the shared /// SpillPool. The IPC writer must use the SpillManager's canonical (nullable) /// schema — not the first batch's schema — so readback batches are valid. /// /// Otherwise, sort_batch will panic with /// `Column 'val' is declared as non-nullable but contains null values` #[tokio::test] async fn test_sort_union_repartition_spill_mixed_nullability() { let non_nullable_exec = MemorySourceConfig::try_new_exec( &[non_nullable_batches()], non_nullable_schema(), None, ) .unwrap(); let nullable_exec = MemorySourceConfig::try_new_exec(&[nullable_batches()], nullable_schema(), None) .unwrap(); let union_exec = UnionExec::try_new(vec![non_nullable_exec, nullable_exec]).unwrap(); assert!(union_exec.schema().field(1).is_nullable()); let repartition = Arc::new( RepartitionExec::try_new(union_exec, Partitioning::RoundRobinBatch(1)).unwrap(), ); let task_ctx = build_task_ctx(200); let mut stream = repartition.execute(0, task_ctx).unwrap(); let sort_expr = LexOrdering::new(vec![PhysicalSortExpr { expr: col("key", &nullable_schema()).unwrap(), options: SortOptions::default(), }]) .unwrap(); let mut total_rows = 0usize; let mut total_nulls = 0usize; while let Some(result) = stream.next().await { let batch = result.unwrap(); let batch = sort_batch(&batch, &sort_expr, None).unwrap(); total_rows += batch.num_rows(); total_nulls += batch.column(1).null_count(); } assert_eq!( total_rows, NUM_BATCHES * ROWS_PER_BATCH * 2, "All rows from both UNION branches should be present" ); assert!( total_nulls > 0, "Expected some null values in output (i.e. nullable batches were processed)" ); } ``` This test fails with `Column 'val' is declared as non-nullable but contains null values`. On the other hand the test does pass with the following change ```diff diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 2666ab882..362768ee5 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -62,7 +62,11 @@ impl InProgressSpillFile { )); } if self.writer.is_none() { - let schema = batch.schema(); + // Use the SpillManager's declared schema rather than the batch's schema. + // Individual batches may have different schemas (e.g., different nullability) + // when they come from different branches of a UnionExec. The SpillManager's + // schema represents the canonical schema that all batches should conform to. + let schema = self.spill_writer.schema(); if let Some(in_progress_file) = &mut self.in_progress_file { self.writer = Some(IPCStreamWriter::new( in_progress_file.path(), ``` ### Expected behavior There should be no streaming errors. ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
