toutane opened a new pull request, #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298
## Which issue does this PR close?
- Closes: #2220
- Related:
- #1604 (EPIC: sub-file parallelism, long-term direction)
- #128 (size-based planning)
## What changes are included in this PR?
### Approach
Rather than introducing new types (`IcebergPartitionedScan`,
`IcebergPartitionedTableProvider` as originally proposed), this PR extends the
existing `IcebergTableProvider` / `IcebergTableScan` with an eager mode where
file scan tasks are planned at `scan()` time and distributed into buckets, one
bucket per DataFusion partition.
The main motivation is to let DataFusion schedule file reads concurrently.
Previously all files streamed through a single partition
(`UnknownPartitioning(1)`); now `IcebergTableProvider::scan` distributes tasks
across `min(target_partitions, n_files)` partitions, and declares
`Partitioning::Hash` when the data is identity-partitioned.
### Key changes
- **`TableScan::to_arrow_from_tasks`** - New public method on `TableScan`
that accepts a pre-collected `FileScanTaskStream` instead of calling
`plan_files()` internally. This is the hook used by
`IcebergTableScan::execute(i)` to replay each bucket through the Arrow reader
while preserving all reader-side configuration (concurrency limit, row-group
filtering, batch size). Tasks must come from a `TableScan` with the same
projection and filters as `self` - predicates are baked into each task at
planning time and are not re-applied by the reader. The doc comment makes this
contract explicit.
- **`IcebergTableScan` is now `pub`** - Previously `pub(crate)`. Made public
so that downstream integrations that need to inspect or wrap the physical plan
can do so without going through the table provider.
- **`with_new_children` now returns an error** - `IcebergTableScan` is a
leaf node and does not support children. Previously the implementation silently
dropped any children passed to it; it now returns `DataFusionError::Internal`
when `children` is non-empty, matching the contract of `IcebergCommitExec`.
- **Eager task planning in `IcebergTableProvider::scan`** - `plan_files()`
is now called at planning time (inside `TableProvider::scan`) rather than at
execution time (inside `ExecutionPlan::execute`). The collected tasks are
distributed into `min(target_partitions, n_files)` buckets by
`bucketing::bucket_tasks` and stored in the scan. Each `execute(i)` call then
fetches its pre-assigned bucket and streams it through `to_arrow_from_tasks` -
no redundant metadata reads per partition.
- **`bucketing` module** - Handles bucket assignment and `Partitioning`
declaration. For tables with a single partition spec using only identity
transforms, tasks are hashed on their partition values using DataFusion's
`create_hashes` + `REPARTITION_RANDOM_STATE`, and the scan declares
`Partitioning::Hash`. This lets DataFusion recognize that the output is already
hash-partitioned and skip a downstream `RepartitionExec`. Non-identity
transforms (`bucket`, `truncate`, `year`/`month`/`day`/`hour`) are lossy: the
partition value in task metadata does not match what DataFusion would compute
by hashing the actual column values, so those cases fall back to
`UnknownPartitioning`. Any task that cannot be fully hashed with the identity
key (unsupported literal type, null partition value) also falls back.
_Credit: This bucketing solution was proposed by @timsaucer._
### Design choices - planning at `scan()` time vs. at `execute()` time
Planning eagerly at `scan()` time is a deliberate trade-off:
- **Pro:** Tasks are computed once and shared across all partitions; the
plan is reproducible; `execute(i)` is pure I/O with no catalog round-trips.
- **Con:** `TableProvider::scan` now does network I/O (catalog + metadata
reads), which is unusual for a planning-phase method. An alternative design -
planning lazily at execute time - would keep `scan()` cheap but requires one
`plan_files()` call per partition (redundant). A future extension could expose
this as an option for use cases where snapshot staleness matters more than plan
reproducibility.
### Known limitations
- **Limited type support for `Partitioning::Hash`** - `literal_to_array`
supports seven primitive Arrow types (`Bool`, `Int32`, `Int64`, `Float32`,
`Float64`, `Utf8`, `Date32`). Timestamps, `Decimal128`, `LargeUtf8`, etc. are
not yet covered; any unsupported type forces fallback to `UnknownPartitioning`.
- **Spec evolution disables `Partitioning::Hash`** - If the table has more
than one historical partition spec, the bucketing module conservatively returns
`UnknownPartitioning` to avoid mismatches between old and new partition tuple
layouts.
## Are these changes tested?
**Unit tests** in `table/mod.rs` covering the new bucketed scan path:
- `test_empty_table_single_empty_bucket` - Empty table produces one empty
bucket, guarding against out-of-bounds panic on `execute(0)`.
- `test_unpartitioned_falls_back_to_unknown` - Unpartitioned table declares
`UnknownPartitioning`.
- `test_bucket_count_capped_at_file_count` - When `target_partitions >
n_files`, bucket count is capped at `n_files`.
- `test_single_target_partition_single_bucket` - `target_partitions=1`
produces a single bucket regardless of file count, reproducing the original
single-threaded behavior.
- `test_identity_partitioned_declares_hash` - Identity-partitioned table
declares `Partitioning::Hash` referencing the partition column.
- `test_projection_without_partition_col_falls_back_to_unknown` - Projecting
away an identity column falls back to `UnknownPartitioning`.
Additional tests are added for `IcebergTableProvider` to cover limit
pushdown, insert behavior, and schema consistency, ensuring the refactor
introduces no regressions on existing functionality.
**SQL logic tests** - `EXPLAIN` snapshots are updated to reflect the new
`buckets:[N] file_count:[M]` display format and the correct `input_partitions`
counts.
**Production validation** - We plan to test these changes in our
infrastructure by shadowing real-world queries.
## Follow-up work
- **Redundant `FilterExec`** - @timsaucer reports that
`supports_filters_pushdown` returns `Inexact` for all filters, causing
DataFusion to insert a `FilterExec` above `IcebergTableScan` even though the
Arrow reader already applies the predicate via `ArrowPredicateFn`. Returning
`Exact` for losslessly-converted filters would eliminate this redundant
re-evaluation.
He proposed a solution in earlier commits, but those have been reverted
as they are out of scope for this PR. This issue is tracked in #2363.
## Note
`IcebergStaticTableProvider` is unchanged - it still uses
`IcebergTableScan::new` (lazy, single-partition). Static snapshots do not
benefit from eager planning because the task list is fixed by construction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]