jaimeferj commented on issue #548:
URL: https://github.com/apache/iceberg-python/issues/548#issuecomment-3411533044
Hi there! I'm encountering a similar use case and would like to revive this
discussion.
## Use Case: Efficient Partition-Aware Joins in Polars
I have multiple large Iceberg tables that are all partitioned identically
using `bucket(user_id, N)`. I need to perform joins between these tables using
Polars, but currently, I cannot load all tables into memory at once.
**What I want to achieve:**
- Join tables partition-by-partition to avoid loading all data into memory
- Leverage the fact that both tables share the same bucketing scheme on the
join key (`user_id`)
- Process each bucket independently, which should be much more
memory-efficient
**Current workaround:**
Based on @goalzz85's solution above, I can manually filter files by
partition:
```python
import polars as pl
from collections import defaultdict
# Group files by partition value
def get_files_by_partition(table):
files_by_partition = defaultdict(list)
for task in table.scan().plan_files():
partition_key = tuple(task.file.partition) if task.file.partition
else ()
files_by_partition[partition_key].append(task.file.file_path)
return files_by_partition
# Get partitioned files
files1_by_partition = get_files_by_partition(table1)
files2_by_partition = get_files_by_partition(table2)
# Process partition by partition
results = []
for partition_key in files1_by_partition.keys():
if partition_key in files2_by_partition:
lf1 = pl.scan_parquet(files1_by_partition[partition_key])
lf2 = pl.scan_parquet(files2_by_partition[partition_key])
result = lf1.join(lf2, on="user_id").collect()
results.append(result)
final = pl.concat(results)
```
**The problem:**
This workaround requires manually extracting file paths and bypassing
PyIceberg's scan interface entirely. It would be much cleaner to have native
support for partition-aware scans.
## Feature Request
It would be extremely valuable to have one of the following:
1. **Expose partition values in row filters**: Allow filtering by hidden
partition columns, e.g.:
```python
table.scan(row_filter=PartitionEquals("user_id_bucket", 5))
```
2. **Add a method to scan specific partitions directly**:
```python
table.scan(partition_filter={"user_id_bucket": 5}).to_polars()
```
3. **Add an iterator over partitions**:
```python
for partition_value, scan in table.scan_partitions():
df = scan.to_polars()
# process partition
```
This would enable memory-efficient partition-aware joins and other
partition-level processing patterns that are common in distributed systems like
Spark, but currently difficult to implement efficiently in PyIceberg + Polars.
Would love to hear thoughts on this! Happy to contribute if there's interest
in adding this functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]