gruuya opened a new issue, #21292:
URL: https://github.com/apache/datafusion/issues/21292

   ### Describe the bug
   
   We've been seeing a panic during streaming for a large query (on sizable 
tables with high cardinality)
   ```
    Column 'val' is declared as non-nullable but contains null values
   ```
   This is unexpected as the plan seems fine, i.e. the column is nullable from 
the leaf nodes up to the root node, so this doesn't appear to be a planning 
error.
   
   The query itself has a union operator where one child has a nullable schema 
(column projection), while the other child has non-nullable schema (literal 
projection), followed by some sorting operators (among many other things).
   
   The problem seems to occur when there's spilling and the input batches with 
mixed nullability arive in a specific order
   
https://github.com/apache/datafusion/blob/bc2b36cf56846e0c697b0f8b98619f346a72a9bf/datafusion/physical-plan/src/spill/in_progress_spill_file.rs#L52-L65
   
   Note that the schema is inherited from the first batch, instead of the 
spill_writer.
   
   ### To Reproduce
   
   It's quite hard to reproduce this at a higher level (i.e. with plain SQL or 
DataFrame API), but the best I could Claude up is this
   ```rust
   const NUM_BATCHES: usize = 200;
   const ROWS_PER_BATCH: usize = 10;
   
   fn non_nullable_schema() -> Arc<Schema> {
       Arc::new(Schema::new(vec![
           Field::new("key", DataType::Int64, false),
           Field::new("val", DataType::Int64, false),
       ]))
   }
   
   fn nullable_schema() -> Arc<Schema> {
       Arc::new(Schema::new(vec![
           Field::new("key", DataType::Int64, false),
           Field::new("val", DataType::Int64, true),
       ]))
   }
   
   fn non_nullable_batches() -> Vec<RecordBatch> {
       (0..NUM_BATCHES)
           .map(|i| {
               let start = (i * ROWS_PER_BATCH) as i64;
               let keys: Vec<i64> = (start..start + ROWS_PER_BATCH as 
i64).collect();
               RecordBatch::try_new(
                   non_nullable_schema(),
                   vec![
                       Arc::new(Int64Array::from(keys)),
                       Arc::new(Int64Array::from(vec![0i64; ROWS_PER_BATCH])),
                   ],
               )
               .unwrap()
           })
           .collect()
   }
   
   fn nullable_batches() -> Vec<RecordBatch> {
       (0..NUM_BATCHES)
           .map(|i| {
               let start = (i * ROWS_PER_BATCH) as i64;
               let keys: Vec<i64> = (start..start + ROWS_PER_BATCH as 
i64).collect();
               let vals: Vec<Option<i64>> = (0..ROWS_PER_BATCH)
                   .map(|j| if j % 3 == 1 { None } else { Some(j as i64) })
                   .collect();
               RecordBatch::try_new(
                   nullable_schema(),
                   vec![
                       Arc::new(Int64Array::from(keys)),
                       Arc::new(Int64Array::from(vals)),
                   ],
               )
               .unwrap()
           })
           .collect()
   }
   
   fn build_task_ctx(pool_size: usize) -> 
Arc<datafusion_execution::TaskContext> {
       let session_config = SessionConfig::new().with_batch_size(2);
       let runtime = RuntimeEnvBuilder::new()
           .with_memory_pool(Arc::new(FairSpillPool::new(pool_size)))
           .build_arc()
           .unwrap();
       Arc::new(
           datafusion_execution::TaskContext::default()
               .with_session_config(session_config)
               .with_runtime(runtime),
       )
   }
   
   /// Exercises spilling through UnionExec -> RepartitionExec where union 
children
   /// have mismatched nullability (one child's `val` is non-nullable, the 
other's
   /// is nullable with NULLs). A tiny FairSpillPool forces all batches to 
spill.
   ///
   /// UnionExec returns child streams without schema coercion, so batches from
   /// different children carry different per-field nullability into the shared
   /// SpillPool. The IPC writer must use the SpillManager's canonical 
(nullable)
   /// schema — not the first batch's schema — so readback batches are valid.
   ///
   /// Otherwise, sort_batch will panic with
   /// `Column 'val' is declared as non-nullable but contains null values`
   #[tokio::test]
   async fn test_sort_union_repartition_spill_mixed_nullability() {
       let non_nullable_exec = MemorySourceConfig::try_new_exec(
           &[non_nullable_batches()],
           non_nullable_schema(),
           None,
       )
           .unwrap();
   
       let nullable_exec =
           MemorySourceConfig::try_new_exec(&[nullable_batches()], 
nullable_schema(), None)
               .unwrap();
   
       let union_exec =
           UnionExec::try_new(vec![non_nullable_exec, nullable_exec]).unwrap();
       assert!(union_exec.schema().field(1).is_nullable());
   
       let repartition = Arc::new(
           RepartitionExec::try_new(union_exec, 
Partitioning::RoundRobinBatch(1)).unwrap(),
       );
   
       let task_ctx = build_task_ctx(200);
       let mut stream = repartition.execute(0, task_ctx).unwrap();
   
       let sort_expr = LexOrdering::new(vec![PhysicalSortExpr {
           expr: col("key", &nullable_schema()).unwrap(),
           options: SortOptions::default(),
       }])
           .unwrap();
   
       let mut total_rows = 0usize;
       let mut total_nulls = 0usize;
       while let Some(result) = stream.next().await {
           let batch = result.unwrap();
   
           let batch = sort_batch(&batch, &sort_expr, None).unwrap();
   
           total_rows += batch.num_rows();
           total_nulls += batch.column(1).null_count();
       }
   
       assert_eq!(
           total_rows,
           NUM_BATCHES * ROWS_PER_BATCH * 2,
           "All rows from both UNION branches should be present"
       );
       assert!(
           total_nulls > 0,
           "Expected some null values in output (i.e. nullable batches were 
processed)"
       );
   }
   ```
   
   This test fails with `Column 'val' is declared as non-nullable but contains 
null values`.
   
   On the other hand the test does pass with the following change
   ```diff
   diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs 
b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
   index 2666ab882..362768ee5 100644
   --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
   +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
   @@ -62,7 +62,11 @@ impl InProgressSpillFile {
                ));
            }
            if self.writer.is_none() {
   -            let schema = batch.schema();
   +            // Use the SpillManager's declared schema rather than the 
batch's schema.
   +            // Individual batches may have different schemas (e.g., 
different nullability)
   +            // when they come from different branches of a UnionExec. The 
SpillManager's
   +            // schema represents the canonical schema that all batches 
should conform to.
   +            let schema = self.spill_writer.schema();
                if let Some(in_progress_file) = &mut self.in_progress_file {
                    self.writer = Some(IPCStreamWriter::new(
                        in_progress_file.path(),
   ```
   
   
   ### Expected behavior
   
   There should be no streaming errors.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to