kosiew commented on code in PR #20922:
URL: https://github.com/apache/datafusion/pull/20922#discussion_r2959548579


##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -126,49 +129,97 @@ impl BatchBuilder {
         &self.schema
     }
 
+    /// Try to interleave all columns using the given index slice.
+    fn try_interleave_columns(
+        &self,
+        indices: &[(usize, usize)],
+    ) -> Result<Vec<Arc<dyn arrow::array::Array>>> {
+        (0..self.schema.fields.len())
+            .map(|column_idx| {
+                let arrays: Vec<_> = self
+                    .batches
+                    .iter()
+                    .map(|(_, batch)| batch.column(column_idx).as_ref())
+                    .collect();
+                // Arrow's interleave panics on i32 offset overflow with
+                // `.expect("overflow")`. Catch that panic so the caller
+                // can retry with fewer rows.
+                match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, 
indices))) {
+                    Ok(result) => Ok(result?),
+                    Err(panic_payload) => {
+                        if is_overflow_panic(&panic_payload) {
+                            Err(DataFusionError::ArrowError(
+                                Box::new(ArrowError::OffsetOverflowError(0)),
+                                None,
+                            ))
+                        } else {
+                            std::panic::resume_unwind(panic_payload);
+                        }
+                    }
+                }
+            })
+            .collect::<Result<Vec<_>>>()
+    }
+
     /// Drains the in_progress row indexes, and builds a new RecordBatch from 
them
     ///
-    /// Will then drop any batches for which all rows have been yielded to the 
output
+    /// Will then drop any batches for which all rows have been yielded to the 
output.
+    /// If an offset overflow occurs (e.g. string/list offsets exceed 
i32::MAX),
+    /// retries with progressively fewer rows until it succeeds.
     ///
     /// Returns `None` if no pending rows
     pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
         if self.is_empty() {
             return Ok(None);
         }
 
-        let columns = (0..self.schema.fields.len())
-            .map(|column_idx| {
-                let arrays: Vec<_> = self
-                    .batches
-                    .iter()
-                    .map(|(_, batch)| batch.column(column_idx).as_ref())
-                    .collect();
-                Ok(interleave(&arrays, &self.indices)?)
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        self.indices.clear();
-
-        // New cursors are only created once the previous cursor for the stream
-        // is finished. This means all remaining rows from all but the last 
batch
-        // for each stream have been yielded to the newly created record batch
-        //
-        // We can therefore drop all but the last batch for each stream
-        let mut batch_idx = 0;
-        let mut retained = 0;
-        self.batches.retain(|(stream_idx, batch)| {
-            let stream_cursor = &mut self.cursors[*stream_idx];
-            let retain = stream_cursor.batch_idx == batch_idx;
-            batch_idx += 1;
-
-            if retain {
-                stream_cursor.batch_idx = retained;
-                retained += 1;
-            } else {
-                self.batches_mem_used -= get_record_batch_memory_size(batch);
+        // Try interleaving all indices. On offset overflow, halve and retry.
+        let mut end = self.indices.len();

Review Comment:
    The retry loop is clear, but I think `end` is really "rows_to_emit". 
   Renaming that variable or extracting a helper like 
`build_partial_record_batch` would make the control flow a bit easier to scan 
now that `build_record_batch` has to coordinate retry, draining, and delayed 
cleanup.



##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -126,49 +129,97 @@ impl BatchBuilder {
         &self.schema
     }
 
+    /// Try to interleave all columns using the given index slice.
+    fn try_interleave_columns(
+        &self,
+        indices: &[(usize, usize)],
+    ) -> Result<Vec<Arc<dyn arrow::array::Array>>> {
+        (0..self.schema.fields.len())
+            .map(|column_idx| {
+                let arrays: Vec<_> = self
+                    .batches
+                    .iter()
+                    .map(|(_, batch)| batch.column(column_idx).as_ref())
+                    .collect();
+                // Arrow's interleave panics on i32 offset overflow with
+                // `.expect("overflow")`. Catch that panic so the caller
+                // can retry with fewer rows.
+                match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, 
indices))) {
+                    Ok(result) => Ok(result?),
+                    Err(panic_payload) => {
+                        if is_overflow_panic(&panic_payload) {

Review Comment:
   Catching any panic whose message merely contains `"overflow"` is too broad 
for a recovery path in the merge operator. 
   
   This now converts unrelated bugs such as Rust arithmetic overflows 
(`"attempt to multiply with overflow"`) or allocation failures like `"capacity 
overflow"` into a synthetic `OffsetOverflowError`, causing the stream to 
silently split batches instead of surfacing the real defect. 
   Since this code is on the hot path and intentionally swallows panics, I 
think we need a tighter discriminator before merging. Ideally the overflow 
detection should match the specific Arrow panic we expect, or be isolated 
behind a smaller helper/API so we are not turning arbitrary panics into 
data-dependent control flow.



##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -208,6 +217,19 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
         cx: &mut Context<'_>,
     ) -> Poll<Option<Result<RecordBatch>>> {
         if self.done {
+            // When `build_record_batch()` hits an i32 offset overflow (e.g.

Review Comment:
   The `done` branch and the normal emit path both repeat the same `before = 
len(); build_record_batch(); produced += ...` bookkeeping. 
   
   This feels like it wants a small helper on `SortPreservingMergeStream` or 
`BatchBuilder` so the overflow/drain behavior stays in one place.



##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -200,3 +251,75 @@ pub(crate) fn try_grow_reservation_to_at_least(
     }
     Ok(())
 }
+
+/// Returns true if the error is an Arrow offset overflow.
+fn is_offset_overflow(e: &DataFusionError) -> bool {
+    matches!(
+        e,
+        DataFusionError::ArrowError(boxed, _)
+            if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
+    )
+}
+
+/// Returns true if a caught panic payload looks like an Arrow offset overflow.
+fn is_overflow_panic(payload: &Box<dyn std::any::Any + Send>) -> bool {
+    if let Some(msg) = payload.downcast_ref::<&str>() {
+        return msg.contains("overflow");
+    }
+    if let Some(msg) = payload.downcast_ref::<String>() {
+        return msg.contains("overflow");
+    }
+    false
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::StringArray;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_execution::memory_pool::{
+        MemoryConsumer, MemoryPool, UnboundedMemoryPool,
+    };
+
+    /// Test that interleaving string columns whose combined byte length
+    /// exceeds i32::MAX does not panic. Arrow's `interleave` panics with
+    /// `.expect("overflow")` in this case; `BatchBuilder` catches the
+    /// panic and retries with fewer rows until the output fits in i32
+    /// offsets.
+    #[test]
+    fn test_interleave_overflow_is_caught() {

Review Comment:
   this and `test_sort_merge_fetch_interleave_overflow` 
   allocate  enormous strings (`768 * 1024 * 1024` bytes each) and then 
materialize them into multiple `StringArray`s. 
   
   In practice that means several gigabytes of heap allocation per test, which 
is likely to make CI flaky or OOM outright. 
   
   The coverage is important, but I do not think these tests are better 
replaced with a lower-memory reproduction, for example by constructing the 
overflow condition with a purpose-built array fixture/helper instead of copying 
multi-GB payloads into `StringArray`s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to