valkum opened a new issue, #20673:
URL: https://github.com/apache/datafusion/issues/20673
### Describe the bug
`FULL OUTER JOIN` panics on tables with >23M rows containing a
`FixedSizeListArray` column (e.g. embeddings) when you use a single partition.
Root cause: u32 overflow in `take_value_indices_from_fixed_size_list`
(arrow-select take.rs). DataFusion's `HashJoinExec` concatenates all build-side
batches into a single `RecordBatch` via `concat_batches`.
When the `FixedSizeList` column has more than `u32::MAX / value_length`
rows, `take()` overflows internally and panics.
Not sure what the correct outcome should be. This might be the desired
outcome, but if it is, it might be a bit underdocumented, and the panic is not
very graceful.
### To Reproduce
main.rs:
```rust
//! Reproducer: FULL OUTER JOIN panics on tables with >23M rows containing a
//! `FixedSizeListArray` column (e.g. embeddings).
//!
//! Root cause: u32 overflow in `take_value_indices_from_fixed_size_list`
//! (arrow-select take.rs). DataFusion's `HashJoinExec` concatenates all
//! build-side batches into a single `RecordBatch` via `concat_batches`.
//! When the `FixedSizeList` column has more than `u32::MAX / value_length`
//! rows, `take()` overflows internally and panics with:
//!
//! assertion failed: (offset + length) <= self.len()
//!
//! For value_length=184, the threshold is 23,342,213 rows.
//!
//! Uses FixedSizeList(Boolean, 184) to keep memory at ~600 MB. The same bug
//! triggers with Float32 embeddings but would require ~17 GB.
//!
//! Run: cargo run --release (requires ~1.5 GB RAM, panics)
//! cargo run --release -- 5000 (5k rows, passes)
//! cargo run --release -- 25000000 2 (25M rows, 2 partitions, passes)
use arrow_array::{BooleanArray, FixedSizeListArray, RecordBatch,
UInt32Array};
use arrow_buffer::{BooleanBuffer, MutableBuffer};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;
const DIM: i32 = 184;
const NUM_ROWS: usize = 24_000_000;
const CHUNK: usize = 1_000_000;
#[tokio::main]
async fn main() {
let n: usize = std::env::args()
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(NUM_ROWS);
let partitions: usize = std::env::args()
.nth(2)
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let threshold = (u32::MAX as u64) / (DIM as u64);
eprintln!("rows={n}, dim={DIM}, overflow threshold={threshold}");
// target_partitions=1 ensures all build-side batches are concatenated
into
// a single RecordBatch, matching what happens in practice (e.g. lance's
// merge_insert v1 path sets target_partitions(1) for the FULL OUTER
JOIN).
let config = SessionConfig::default().with_target_partitions(partitions);
let ctx = SessionContext::new_with_config(config);
eprintln!("creating {n}-row table...");
let table = MemTable::try_new(schema(), vec![make_batches(n)]).unwrap();
ctx.register_table("t", Arc::new(table)).unwrap();
// Self-join: both sides have 24M rows with FixedSizeList(Boolean, 184).
// Whichever side becomes the HashJoin build side will trigger the
overflow.
eprintln!("running FULL OUTER JOIN...");
let df = ctx
.sql("SELECT * FROM t AS a FULL OUTER JOIN t AS b ON a.id = b.id")
.await
.unwrap();
let _results = df.collect().await.unwrap();
eprintln!("join succeeded (bug not triggered at this scale)");
}
fn schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item",
DataType::Boolean, true)), DIM),
true,
),
]))
}
fn make_batch(start: usize, count: usize) -> RecordBatch {
let ids: Vec<u32> = (start..start + count).map(|i| i as u32).collect();
let n = count * DIM as usize;
let buf: arrow_buffer::Buffer = MutableBuffer::from_len_zeroed((n + 7) /
8).into();
let bools = BooleanArray::new(BooleanBuffer::new(buf, 0, n), None);
let fsl = FixedSizeListArray::new(
Arc::new(Field::new("item", DataType::Boolean, true)),
DIM,
Arc::new(bools),
None,
);
RecordBatch::try_new(
schema(),
vec![Arc::new(UInt32Array::from(ids)), Arc::new(fsl)],
)
.unwrap()
}
fn make_batches(total: usize) -> Vec<RecordBatch> {
let mut result = Vec::new();
let mut off = 0;
while off < total {
let n = (total - off).min(CHUNK);
result.push(make_batch(off, n));
off += n;
}
result
}
```
Cargo.toml
```
[workspace]
[package]
name = "arrow-take-fsl-overflow"
version = "0.1.0"
edition = "2021"
[dependencies]
arrow-array = "58"
arrow-buffer = "58"
arrow-schema = "58"
datafusion = "52"
tokio = { version = "1", features = ["full"] }
```
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]