This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 16906c127 fix: stack overflow when loading large equality deletes
(#1915)
16906c127 is described below
commit 16906c127d521395a789a9019350e467cc34d063
Author: Lo <[email protected]>
AuthorDate: Thu Dec 11 18:32:34 2025 +0800
fix: stack overflow when loading large equality deletes (#1915)
## Which issue does this PR close?
- Closes #.
## What changes are included in this PR?
A stack overflow occurs when processing data files containing a large
number of equality deletes (e.g., > 6000 rows).
This happens because parse_equality_deletes_record_batch_stream
previously constructed the final predicate by linearly calling .and() in
a loop:
```rust
result_predicate = result_predicate.and(row_predicate.not());
```
This resulted in a deeply nested, left-skewed tree structure with a
depth equal to the number of rows (N). When rewrite_not() (which uses a
recursive visitor
pattern) was subsequently called on this structure, or when the
structure was dropped, the call stack limit was exceeded.
Changes
1. Balanced Tree Construction: Refactored the predicate combination
logic. Instead of linear accumulation, row predicates are collected and
combined using a
pairwise combination approach to build a balanced tree. This reduces the
tree depth from O(N) to O(log N).
2. Early Rewrite: rewrite_not() is now called immediately on each
individual row predicate before they are combined. This ensures we are
combining simplified
predicates and avoids traversing a massive unoptimized tree later.
3. Regression Test: Added
test_large_equality_delete_batch_stack_overflow, which processes 20,000
equality delete rows to verify the fix.
## Are these changes tested?
- [x] New regression test
test_large_equality_delete_batch_stack_overflow passed.
- [x] All existing tests in arrow::caching_delete_file_loader passed.
Co-authored-by: Renjie Liu <[email protected]>
---
.../src/arrow/caching_delete_file_loader.rs | 72 +++++++++++++++++++++-
1 file changed, 69 insertions(+), 3 deletions(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 192ca390a..250fc5e8d 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -330,7 +330,7 @@ impl CachingDeleteFileLoader {
mut stream: ArrowRecordBatchStream,
equality_ids: HashSet<i32>,
) -> Result<Predicate> {
- let mut result_predicate = AlwaysTrue;
+ let mut row_predicates = Vec::new();
let mut batch_schema_iceberg: Option<Schema> = None;
let accessor = EqDelRecordBatchPartnerAccessor;
@@ -374,10 +374,29 @@ impl CachingDeleteFileLoader {
row_predicate = row_predicate.and(cell_predicate)
}
}
- result_predicate = result_predicate.and(row_predicate.not());
+ row_predicates.push(row_predicate.not().rewrite_not());
}
}
- Ok(result_predicate.rewrite_not())
+
+ // All row predicates are combined to a single predicate by creating a
balanced binary tree.
+ // Using a simple fold would result in a deeply nested predicate that
can cause a stack overflow.
+ while row_predicates.len() > 1 {
+ let mut next_level =
Vec::with_capacity(row_predicates.len().div_ceil(2));
+ let mut iter = row_predicates.into_iter();
+ while let Some(p1) = iter.next() {
+ if let Some(p2) = iter.next() {
+ next_level.push(p1.and(p2));
+ } else {
+ next_level.push(p1);
+ }
+ }
+ row_predicates = next_level;
+ }
+
+ match row_predicates.pop() {
+ Some(p) => Ok(p),
+ None => Ok(AlwaysTrue),
+ }
}
}
@@ -912,4 +931,51 @@ mod tests {
result.err()
);
}
+
+ #[tokio::test]
+ async fn test_large_equality_delete_batch_stack_overflow() {
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
+ let file_io =
FileIO::from_path(table_location).unwrap().build().unwrap();
+
+ // Create a large batch of equality deletes
+ let num_rows = 20_000;
+ let col_y_vals: Vec<i64> = (0..num_rows).collect();
+ let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
+
+ let schema = Arc::new(arrow_schema::Schema::new(vec![
+ Field::new("y", arrow_schema::DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ ]));
+
+ let record_batch = RecordBatch::try_new(schema.clone(),
vec![col_y]).unwrap();
+
+ // Write to file
+ let path = format!("{}/large-eq-deletes.parquet", &table_location);
+ let file = File::create(&path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+ let mut writer = ArrowWriter::try_new(file, schema,
Some(props)).unwrap();
+ writer.write(&record_batch).unwrap();
+ writer.close().unwrap();
+
+ let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone());
+ let record_batch_stream = basic_delete_file_loader
+ .parquet_to_batch_stream(&path)
+ .await
+ .expect("could not get batch stream");
+
+ let eq_ids = HashSet::from_iter(vec![2]);
+
+ let result =
CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
+ record_batch_stream,
+ eq_ids,
+ )
+ .await;
+
+ assert!(result.is_ok());
+ }
}