Below0 opened a new issue, #15731:
URL: https://github.com/apache/iceberg/issues/15731

   ### Apache Iceberg version
   
   1.10.1 (latest release)
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   `HashKeyGenerator.SelectorKey` does not include `writeParallelism` or 
`distributionMode` in its cache key, so runtime changes to these fields are 
silently ignored after the first record for a given table is processed.
   
   **Root cause**
   
   `SelectorKey` fields (`HashKeyGenerator.java:317-341`):
   
   ```java
   tableName, branch, schemaId, specId, schema, spec, equalityFields
   // writeParallelism — MISSING
   // distributionMode — MISSING
   ```
   
   `computeIfAbsent` at line 92 only creates a new `KeySelector` on cache miss. 
Because `writeParallelism` is not part of the key, subsequent records with a 
different `writeParallelism` for the same table hit the cache and reuse the 
stale selector.
   
   The `TargetLimitedKeySelector.distinctKeys` array is fixed at construction 
time (`new int[writeParallelism]`), so the old parallelism value is baked in 
permanently.
   
   **Expected behavior**
   
   When `writeParallelism` or `distributionMode` changes for a table at 
runtime, a new `KeySelector` should be created reflecting the updated values.
   
   **Actual behavior**
   
   | Scenario | Expected | Actual |
   |----------|----------|--------|
   | Table A: first record `writeParallelism=4`, later changed to `8` | 8 
subtasks | Still 4 subtasks |
   | Table A: first record `distributionMode=NONE`, later changed to `HASH` | 
Hash-based routing | Still round-robin |
   
   This contradicts the class-level Javadoc (line 55-56):
   
   > "Caching ensures that a new key selector is also created when … the 
user-provided metadata changes (e.g. distribution mode, write parallelism)."
   
   **Suggested fix**
   
   Add `writeParallelism` and `distributionMode` to `SelectorKey` fields, 
`equals()`, and `hashCode()`:
   
   ```java
   static class SelectorKey {
       // existing fields ...
       private final int writeParallelism;              // ADD
       private final DistributionMode distributionMode; // ADD
   }
   ```
   
   ### Willingness to contribute
   
   - [x] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to