EnyMan opened a new pull request, #2943:
URL: https://github.com/apache/iceberg-python/pull/2943
## Summary
This PR improves the performance of the `upsert()` operation, particularly
for large upserts with 10,000+ rows. The changes address three main bottlenecks
in the current implementation.
## Problem
The current upsert implementation has several performance issues that become
significant with large datasets:
1. **Expensive match filter generation**: For composite keys,
`create_match_filter()` generates `Or(And(EqualTo, EqualTo), ...)` expressions
- one `And` clause per unique key combination. With 10k+ rows, this creates big
expression trees that are slow to evaluate, up to n*m leaves for a single key
column.
2. **Per-batch insert filtering**: The insert logic filters rows using
expression evaluation (`expression_to_pyarrow`) on each batch, which is
inefficient and doesn't leverage PyArrow's join capabilities.
3. **Row-by-row comparison**: `get_rows_to_update()` uses Python loops to
compare rows one at a time (`source_table.slice(source_idx, 1)`), missing the
opportunity for vectorized operations.
## Solution
### 1. Coarse Match Filter for Initial Scan (Biggest Performance Win)
Added `create_coarse_match_filter()` that generates a less precise but much
faster filter for the initial table scan. **This is where the majority of the
performance improvement comes from.**
- For small datasets (< 10,000 unique keys): Uses `And(In(col1, values),
In(col2, values))` instead of exact key-tuple matching
- For large datasets with dense numeric keys (>10% density): Uses range
filters (`col >= min AND col <= max`)
- For large datasets with sparse keys: Returns `AlwaysTrue()` to allow full
scan (exact matching happens downstream anyway)
This is safe because exact key matching occurs in `get_rows_to_update()` via
the join operation.
**Key insight - `AlwaysTrue()` is where the biggest win happens:**
The benchmark data was sparse, triggering the `AlwaysTrue()` path.
Counter-intuitively, this is actually the **best case** for performance
improvement. The speedup doesn't come from reading fewer files - it comes from
**avoiding the massive expression tree construction and evaluation**:
- Original: Build `Or(And(...), And(...), ...)` with millions of nodes (8s),
then evaluate during scan (382s)
- Optimized: Return `AlwaysTrue()` instantly (0.07s), scan without filter
overhead (1.4s)
With sparse data, you'd read most/all files anyway, so the "full scan" isn't
a penalty - but avoiding an expression tree with n×m nodes (n keys × m columns)
and evaluating it across f files is a huge win.
**When this optimization helps less:**
- Small datasets (< 10k keys): Already fast with In() predicates, minimal
improvement
- Dense numeric keys: Range filter helps, but less dramatic than the sparse
case
- Workloads where the original filter already performed well
### 2. Anti-Join for Insert Filtering
Replaced per-batch expression filtering with a single anti-join operation
after processing all batches:
```python
# Before: Per-batch expression filtering (slow)
for batch in matched_iceberg_record_batches:
expr_match = create_match_filter(rows, join_cols)
expr_match_arrow = expression_to_pyarrow(bind(...))
rows_to_insert = rows_to_insert.filter(~expr_match_arrow)
# After: Single anti-join (fast)
combined_matched_keys =
pa.concat_tables(matched_target_keys).group_by(join_cols).aggregate([])
rows_to_insert = df_keys.join(combined_matched_keys, keys=join_cols,
join_type="left anti")
```
**Note on memory usage:** The new approach accumulates matched keys in
memory during batch processing. We only store key columns (not full rows) to
minimize memory footprint, and deduplicate after the loop. For tables with
millions of matching rows, this could increase peak memory usage compared to
the previous approach. A potential future improvement would be incremental
deduplication during the loop.
### 3. Vectorized Row Comparison
Replaced row-by-row Python comparison with vectorized PyArrow operations:
```python
# Before: Python loop with slice()
for source_idx, target_idx in zip(...):
source_row = source_table.slice(source_idx, 1)
target_row = target_table.slice(target_idx, 1)
for key in non_key_cols:
if source_row.column(key)[0].as_py() !=
target_row.column(key)[0].as_py():
to_update_indices.append(source_idx)
# After: Vectorized with take() and compute
matched_source = source_table.take(source_indices)
matched_target = target_table.take(target_indices)
for col in non_key_cols:
col_diff = _compare_columns_vectorized(source_col, target_col)
diff_masks.append(col_diff)
combined_mask = functools.reduce(pc.or_, diff_masks)
```
The `_compare_columns_vectorized()` function handles:
- Primitive types: Uses `pc.not_equal()` with proper null handling
- Struct types: Recursively compares each nested field
- List/Map types: Falls back to Python comparison (still batched)
## Benchmark Results
Ran benchmarks on a table with ~2M rows, doing incremental upserts:
| Run | Table Size | Batches | Original | Optimized | Speedup |
|-----|------------|---------|----------|-----------|---------|
| 2 | 2M rows | 32 | 11.9 min | 2.3 min | **5.1x** |
| 3 | 2M rows | 96 | 31.5 min | 3.9 min | **8.0x** |
| 4 | 2M rows | 160+ | 51.2 min | 5.5 min | **9.3x** |
**Why times increase with each run:** The table uses bucketing, and each
upsert modifies files independently, causing file count to increase over time.
The original implementation's big filter expression (`Or(And(...), ...)`) had
to be evaluated against **every file**, so more files = dramatically more time.
The optimized version avoids this by using `AlwaysTrue()`, making the scan time
grow linearly with data size rather than exponentially with file count.
This file increase could be mitigated with table maintenance (compaction),
which is not yet implemented in PyIceberg.
### Where the Time Went (Run 2: 2M rows, 32 batches)
| Step | Original | Optimized |
|------|----------|-----------|
| Filter creation | 7.9s | 0.07s (114x faster) |
| Table scan | 382.2s | 1.4s (273x faster) |
| Batch processing | 212.0s | 24.5s |
| Insert filtering | (included above) | 0.2s |
The coarse filter approach shows the biggest improvement:
- Original filter complexity: `Or(And(...), And(...), ...)` with millions of
nodes
- Optimized filter: `AlwaysTrue()` or simple `And(In(), In())`
## Incremental Adoption
If the anti-join change is concerning due to memory implications, the coarse
match filter optimization can be contributed separately as it provides the
majority of the performance benefit and doesn't change the memory
characteristics.
**Suggested PR split:**
1. **Coarse match filter for initial scan** (biggest win, minimal risk)
2. Vectorized row comparison in `get_rows_to_update()`
3. Anti-join for insert filtering
## Future Considerations
**Why Rust bindings weren't explored for this PR:**
In ticket #2159, a suggestion was made to side-step performance issues by
using the Python binding of the rust implementation. However, we would like to
stick with a Python-centric implementation, because our use case requires
mocking datetime using `time-machine` for:
- Creating historical backfills with accurate snapshot timestamps
- Deterministically rerunning failed pipeline runs with the same timestamps
This is why I kept the implementation in pure Python rather than exploring
Rust bindings.
**Potential hybrid approach:**
The data processing (filtering, joins, comparisons) is where most of the
time is spent and could benefit from Rust bindings. However - and I'll be
selfish here - snapshot creation and metadata operations should remain in
Python to preserve the ability to mock time. Without this, our backfill and
replay workflows would break.
A future optimization could:
- Move scan filtering and row comparison to Rust for performance
- Keep snapshot/commit operations in Python for datetime mocking flexibility
I'd happily trade some performance for keeping our time-mocking capability
intact.
## Testing
Added comprehensive tests for:
- `create_coarse_match_filter()` behavior across different dataset sizes and
types
- Threshold boundary conditions (< 10k, = 10k, > 10k unique keys)
- Density calculations for range filter decisions
- `_compare_columns_vectorized()` with primitives, nulls, structs, nested
structs, and lists
- Edge cases: empty datasets, single values, negative numbers, composite keys
## Breaking Changes
None. The API remains unchanged; this is purely an internal optimization.
## Files Changed
- `pyiceberg/table/__init__.py` - upsert method optimizations
- `pyiceberg/table/upsert_util.py` - new `create_coarse_match_filter()`,
vectorized comparison functions
- `tests/table/test_upsert.py` - new tests for optimization functions
---
**Note:** This code was co-written with the help of an AI agent (Claude),
primarily to speed up exploration and understanding of the PyIceberg codebase.
All the speed up ideas are mine. The benchmark results are from our real-world
production data that we actively use and store. I have reviewed all the
generated code. All related tests pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]