mikedias opened a new pull request, #7865:
URL: https://github.com/apache/paimon/pull/7865

   ### Problem
   
   In partitioned Paimon tables, all partitions share the same bucket count 
defined at the table level. This becomes a bottleneck when data is highly 
skewed: a "hot" partition (e.g., a large tenant) may receive orders of 
magnitude more data than other partitions, yet it is forced to use the same 
number of buckets. The only workaround was to increse the number of buckets for 
the entire table, but that in turn end up creating too many buckets for smaller 
partitions, leading to a small file problem.
   
   ### Solution
   
   This PR introduces **per-partition bucket counts**, allowing individual 
partitions to be independently rescaled. Skewed partitions can be split into 
more buckets without affecting the rest of the table.
   
   The core idea is a new `PartitionBucketMapping` that maintains an explicit 
`partition → bucket count` map alongside a table-level default. Every component 
that needs to assign a bucket to a row (write selectors, key extractors) now 
consults this mapping rather than blindly using `schema().numBuckets()`. Each 
partition's bucket count is derived from the `totalBuckets` field already 
stamped on its data files in the manifest, so no schema migration is required.
   
   ### Changes
   
   #### Core (`paimon-core`)
   
   - **`PartitionBucketMapping`** _(new)_ — Serializable mapping of `BinaryRow 
partition → int bucketCount`, with a `loadFromTable` factory that scans the 
manifest to reconstruct the current per-partition layout and falls back to the 
schema default gracefully.
   - **`SchemaBucketFileStoreTable`** _(new)_ — A lightweight 
`DelegatedFileStoreTable` wrapper used during rescale/overwrite operations. It 
forces all writes to use the new target bucket count (ignoring the 
per-partition map), ensuring the overwrite lands in the right buckets.
   - **`FixedBucketRowKeyExtractor` / `FixedBucketWriteSelector`** — Updated to 
accept a `PartitionBucketMapping` and call `resolveNumBuckets(partition)` per 
row instead of using a fixed global count.
   - **`WriteRestore` / `FileSystemWriteRestore`** — Extended with 
`extractTotalBuckets` logic that correctly handles three cases: non-empty 
buckets (use the value from existing data files), empty buckets on partitioned 
tables (look up the per-partition override), and empty buckets on unpartitioned 
tables (fall back to schema default so the committer-side mismatch check still 
fires).
   - **`PartitionEntry`** — Minor fix for correct behaviour in non-partitioned 
table corner cases.
   
   #### Flink (`paimon-flink`)
   
   - **`FlinkSinkBuilder`** — Wires `PartitionBucketMapping` into the streaming 
sink pipeline so that per-partition bucket routing is applied at ingest time.
   - **`RescaleAction` / `CompactAction`** — Use `RescaleFileStoreTable` when 
performing rescale/overwrite so the new bucket count is applied only to the 
target partitions.
   - **`RowDataChannelComputer`** — Updated to route rows to the correct 
sub-task using the per-partition bucket count.
   - **`TableWriteCoordinator` / `PostponeFixedBucketChannelComputer`** — Fixed 
to handle the "empty bucket" scenario that can arise in write-restore flows 
when a partition exists in the mapping but has no files yet.
   - **`RowDataKeyAndBucketExtractor`** (deleted) —  Test helper class replaced 
with using the superclass types directly.
   
   ### Behaviour
   
   - **Partitioned tables**: each partition retains its own bucket count from 
its data files. New partitions use the current table-level default. Existing 
partitions are unaffected until explicitly rescaled.
   - **Unpartitioned tables**: behaviour is unchanged — a full rescale is still 
required before writing with a new bucket count, and a `RuntimeException` is 
thrown if this is violated.
   - **Rescaling a single partition**: use the `rescale` procedure or a manual 
`INSERT OVERWRITE` in batch mode:
     ```sql
     CALL sys.rescale(`table` => 'mydb.orders', `bucket_num` => 32, `partition` 
=> 'tenant_id=123');
   After the job completes, the rescaled partition uses 32 buckets while all 
other partitions are untouched.
   
   ### Testing
   
   We haven been soaking this change in our test environments and we are seeing 
good results. Plus, we add a bunch of new tests to validate we are not breaking 
anything:
   
   •  `PartitionBucketMappingTest` — unit tests for mapping resolution and 
loadFromTable.
   •  `FixedBucketRowKeyExtractorTest` — verifies correct bucket assignment 
with heterogeneous per-partition counts.
   •  `FileStoreCommitTest`— integration tests covering rescale commits with 
mixed bucket counts.
   •  `FileSystemWriteRestoreTest` — covers the empty-bucket write-restore 
scenario end-to-end, including the non-partitioned corner case.
   •  `RescaleBucketITCase` — end-to-end Flink integration tests for INSERT 
OVERWRITE-based rescale and streaming restore after rescale.
   •  `RescaleActionITCase` — end-to-end tests for the rescale procedure action 
with per-partition targeting.
   •  `TableWriteCoordinatorTest` — unit tests for coordinator behaviour under 
the new mapping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to