TheR1sing3un opened a new pull request, #7813:
URL: https://github.com/apache/paimon/pull/7813
## Purpose
When `write_paimon` is given a Ray Dataset, Ray's default round-robin
block distribution scatters rows that share the same `(partition,
bucket)` across many Ray tasks. Each task opens its own writer and emits
its own data file, so the write produces
`partitions × buckets × ray_tasks` files instead of the
`partitions × buckets` the writer would naturally produce.
Spark and Flink already cluster rows by `(partition, bucket)` before
writing — see `PaimonSparkWriter.repartitionByPartitionsAndBucket` and
the `RowAssignerChannelComputer` / `RowWithBucketChannelComputer` chain.
This PR brings the same pre-clustering to the Ray path.
## Linked Issue
N/A — kwai-internal change being upstreamed.
## Effect
Two new keyword-only parameters on `write_paimon`:
- **`shuffle: bool = False`** — for HASH_FIXED tables, group rows by
`(partition_keys..., bucket)` via Ray's `groupby` / `map_groups` so
each `(partition, bucket)` lands in one Ray task. Bucket assignment
is computed with `FixedBucketRowKeyExtractor`, the same extractor the
writer uses, so the shuffle-time bucket is byte-equivalent to the
writer's. Non-HASH_FIXED tables log a warning and write as before.
- **`num_blocks: Optional[int] = None`** — optional Ray output block
count. With `shuffle=True` it is a parallelism hint for the groupby;
with `shuffle=False` it triggers a plain Ray block rebalance.
Defaults preserve the previous behaviour, so no existing caller is
affected.
## Tests
- `pypaimon/tests/test_ray_shuffle_helper.py` — 8 unit tests covering
the bucket-key UDF (column type, empty input, multi-chunk combine)
and every no-op / soft-fallback branch.
- `pypaimon/tests/ray_repartition_test.py` — 7 end-to-end tests:
- default `shuffle=False` roundtrip equality
- `shuffle=True` roundtrip on a HASH_FIXED PK table
- `shuffle=True` on a partitioned HASH_FIXED PK table (post-groupby
schema integrity check)
- file-count reduction on a multi-block HASH_FIXED write
- soft fallback for BUCKET_UNAWARE + warning emitted
- `num_blocks=0` raises `ValueError`
- `num_blocks`-only plain block rebalance
Existing Ray tests (`ray_integration_test.py`, `ray_data_test.py`)
remain green.
## API & Format Impact
- Public API: adds two new keyword-only parameters with safe defaults
to `pypaimon.ray.write_paimon`. No signature break.
- File format: unchanged. The transient `__paimon_bucket__` column is
stripped before the sink sees the dataset, so on-disk layout is
unaffected.
## Documentation
- `docs/content/pypaimon/ray-data.md` — new section explaining the
small-file problem and the `shuffle` / `num_blocks` options.
## Generative AI Disclosure
Drafted with Claude Code assistance, reviewed and tested by the author.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]