valkum opened a new issue, #20673:
URL: https://github.com/apache/datafusion/issues/20673

   ### Describe the bug
   
   `FULL OUTER JOIN` panics on tables with >23M rows containing a 
`FixedSizeListArray` column (e.g. embeddings) when you use a single partition.
   
   Root cause: u32 overflow in `take_value_indices_from_fixed_size_list` 
(arrow-select take.rs). DataFusion's `HashJoinExec` concatenates all build-side 
batches into a single `RecordBatch` via `concat_batches`.
   When the `FixedSizeList` column has more than `u32::MAX / value_length` 
rows, `take()` overflows internally and panics.
   
   Not sure what the correct outcome should be. This might be the desired 
outcome, but if it is, it might be a bit underdocumented, and the panic is not 
very graceful.
   
   ### To Reproduce
   
   main.rs:
   ```rust
   //! Reproducer: FULL OUTER JOIN panics on tables with >23M rows containing a
   //! `FixedSizeListArray` column (e.g. embeddings).
   //!
   //! Root cause: u32 overflow in `take_value_indices_from_fixed_size_list`
   //! (arrow-select take.rs). DataFusion's `HashJoinExec` concatenates all
   //! build-side batches into a single `RecordBatch` via `concat_batches`.
   //! When the `FixedSizeList` column has more than `u32::MAX / value_length`
   //! rows, `take()` overflows internally and panics with:
   //!
   //!     assertion failed: (offset + length) <= self.len()
   //!
   //! For value_length=184, the threshold is 23,342,213 rows.
   //!
   //! Uses FixedSizeList(Boolean, 184) to keep memory at ~600 MB. The same bug
   //! triggers with Float32 embeddings but would require ~17 GB.
   //!
   //! Run:  cargo run --release          (requires ~1.5 GB RAM, panics)
   //!       cargo run --release -- 5000  (5k rows, passes)
   //!       cargo run --release -- 25000000 2  (25M rows, 2 partitions, passes)
   
   use arrow_array::{BooleanArray, FixedSizeListArray, RecordBatch, 
UInt32Array};
   use arrow_buffer::{BooleanBuffer, MutableBuffer};
   use arrow_schema::{DataType, Field, Schema};
   use datafusion::datasource::MemTable;
   use datafusion::prelude::*;
   use std::sync::Arc;
   
   const DIM: i32 = 184;
   const NUM_ROWS: usize = 24_000_000;
   const CHUNK: usize = 1_000_000;
   
   #[tokio::main]
   async fn main() {
       let n: usize = std::env::args()
           .nth(1)
           .and_then(|s| s.parse().ok())
           .unwrap_or(NUM_ROWS);
       let partitions: usize = std::env::args()
           .nth(2)
           .and_then(|s| s.parse().ok())
           .unwrap_or(1);
   
       let threshold = (u32::MAX as u64) / (DIM as u64);
       eprintln!("rows={n}, dim={DIM}, overflow threshold={threshold}");
   
       // target_partitions=1 ensures all build-side batches are concatenated 
into
       // a single RecordBatch, matching what happens in practice (e.g. lance's
       // merge_insert v1 path sets target_partitions(1) for the FULL OUTER 
JOIN).
       let config = SessionConfig::default().with_target_partitions(partitions);
       let ctx = SessionContext::new_with_config(config);
   
       eprintln!("creating {n}-row table...");
       let table = MemTable::try_new(schema(), vec![make_batches(n)]).unwrap();
       ctx.register_table("t", Arc::new(table)).unwrap();
   
       // Self-join: both sides have 24M rows with FixedSizeList(Boolean, 184).
       // Whichever side becomes the HashJoin build side will trigger the 
overflow.
       eprintln!("running FULL OUTER JOIN...");
       let df = ctx
           .sql("SELECT * FROM t AS a FULL OUTER JOIN t AS b ON a.id = b.id")
           .await
           .unwrap();
       let _results = df.collect().await.unwrap();
   
       eprintln!("join succeeded (bug not triggered at this scale)");
   }
   
   fn schema() -> Arc<Schema> {
       Arc::new(Schema::new(vec![
           Field::new("id", DataType::UInt32, false),
           Field::new(
               "vec",
               DataType::FixedSizeList(Arc::new(Field::new("item", 
DataType::Boolean, true)), DIM),
               true,
           ),
       ]))
   }
   
   fn make_batch(start: usize, count: usize) -> RecordBatch {
       let ids: Vec<u32> = (start..start + count).map(|i| i as u32).collect();
       let n = count * DIM as usize;
       let buf: arrow_buffer::Buffer = MutableBuffer::from_len_zeroed((n + 7) / 
8).into();
       let bools = BooleanArray::new(BooleanBuffer::new(buf, 0, n), None);
       let fsl = FixedSizeListArray::new(
           Arc::new(Field::new("item", DataType::Boolean, true)),
           DIM,
           Arc::new(bools),
           None,
       );
       RecordBatch::try_new(
           schema(),
           vec![Arc::new(UInt32Array::from(ids)), Arc::new(fsl)],
       )
       .unwrap()
   }
   
   fn make_batches(total: usize) -> Vec<RecordBatch> {
       let mut result = Vec::new();
       let mut off = 0;
       while off < total {
           let n = (total - off).min(CHUNK);
           result.push(make_batch(off, n));
           off += n;
       }
       result
   }
   
   ```
   
   Cargo.toml
   ```
   [workspace]
   
   [package]
   name = "arrow-take-fsl-overflow"
   version = "0.1.0"
   edition = "2021"
   
   [dependencies]
   arrow-array = "58"
   arrow-buffer = "58"
   arrow-schema = "58"
   datafusion = "52"
   tokio = { version = "1", features = ["full"] }
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to