This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 16906c127 fix: stack overflow when loading large equality deletes 
(#1915)
16906c127 is described below

commit 16906c127d521395a789a9019350e467cc34d063
Author: Lo <[email protected]>
AuthorDate: Thu Dec 11 18:32:34 2025 +0800

    fix: stack overflow when loading large equality deletes (#1915)
    
    ## Which issue does this PR close?
    
    - Closes #.
    
    ## What changes are included in this PR?
    
    A stack overflow occurs when processing data files containing a large
    number of equality deletes (e.g., > 6000 rows).
    This happens because parse_equality_deletes_record_batch_stream
    previously constructed the final predicate by linearly calling .and() in
    a loop:
    ```rust
    result_predicate = result_predicate.and(row_predicate.not());
    ```
    This resulted in a deeply nested, left-skewed tree structure with a
    depth equal to the number of rows (N). When rewrite_not() (which uses a
    recursive visitor
    pattern) was subsequently called on this structure, or when the
    structure was dropped, the call stack limit was exceeded.
    
    Changes
    1. Balanced Tree Construction: Refactored the predicate combination
    logic. Instead of linear accumulation, row predicates are collected and
    combined using a
    pairwise combination approach to build a balanced tree. This reduces the
    tree depth from O(N) to O(log N).
    2. Early Rewrite: rewrite_not() is now called immediately on each
    individual row predicate before they are combined. This ensures we are
    combining simplified
          predicates and avoids traversing a massive unoptimized tree later.
    3. Regression Test: Added
    test_large_equality_delete_batch_stack_overflow, which processes 20,000
    equality delete rows to verify the fix.
    
    ## Are these changes tested?
    - [x] New regression test
    test_large_equality_delete_batch_stack_overflow passed.
       - [x] All existing tests in arrow::caching_delete_file_loader passed.
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 .../src/arrow/caching_delete_file_loader.rs        | 72 +++++++++++++++++++++-
 1 file changed, 69 insertions(+), 3 deletions(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 192ca390a..250fc5e8d 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -330,7 +330,7 @@ impl CachingDeleteFileLoader {
         mut stream: ArrowRecordBatchStream,
         equality_ids: HashSet<i32>,
     ) -> Result<Predicate> {
-        let mut result_predicate = AlwaysTrue;
+        let mut row_predicates = Vec::new();
         let mut batch_schema_iceberg: Option<Schema> = None;
         let accessor = EqDelRecordBatchPartnerAccessor;
 
@@ -374,10 +374,29 @@ impl CachingDeleteFileLoader {
                         row_predicate = row_predicate.and(cell_predicate)
                     }
                 }
-                result_predicate = result_predicate.and(row_predicate.not());
+                row_predicates.push(row_predicate.not().rewrite_not());
             }
         }
-        Ok(result_predicate.rewrite_not())
+
+        // All row predicates are combined to a single predicate by creating a 
balanced binary tree.
+        // Using a simple fold would result in a deeply nested predicate that 
can cause a stack overflow.
+        while row_predicates.len() > 1 {
+            let mut next_level = 
Vec::with_capacity(row_predicates.len().div_ceil(2));
+            let mut iter = row_predicates.into_iter();
+            while let Some(p1) = iter.next() {
+                if let Some(p2) = iter.next() {
+                    next_level.push(p1.and(p2));
+                } else {
+                    next_level.push(p1);
+                }
+            }
+            row_predicates = next_level;
+        }
+
+        match row_predicates.pop() {
+            Some(p) => Ok(p),
+            None => Ok(AlwaysTrue),
+        }
     }
 }
 
@@ -912,4 +931,51 @@ mod tests {
             result.err()
         );
     }
+
+    #[tokio::test]
+    async fn test_large_equality_delete_batch_stack_overflow() {
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
+        let file_io = 
FileIO::from_path(table_location).unwrap().build().unwrap();
+
+        // Create a large batch of equality deletes
+        let num_rows = 20_000;
+        let col_y_vals: Vec<i64> = (0..num_rows).collect();
+        let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
+
+        let schema = Arc::new(arrow_schema::Schema::new(vec![
+            Field::new("y", arrow_schema::DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+        ]));
+
+        let record_batch = RecordBatch::try_new(schema.clone(), 
vec![col_y]).unwrap();
+
+        // Write to file
+        let path = format!("{}/large-eq-deletes.parquet", &table_location);
+        let file = File::create(&path).unwrap();
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+        let mut writer = ArrowWriter::try_new(file, schema, 
Some(props)).unwrap();
+        writer.write(&record_batch).unwrap();
+        writer.close().unwrap();
+
+        let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
+        let record_batch_stream = basic_delete_file_loader
+            .parquet_to_batch_stream(&path)
+            .await
+            .expect("could not get batch stream");
+
+        let eq_ids = HashSet::from_iter(vec![2]);
+
+        let result = 
CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
+            record_batch_stream,
+            eq_ids,
+        )
+        .await;
+
+        assert!(result.is_ok());
+    }
 }

Reply via email to