raminqaf opened a new pull request, #26942:
URL: https://github.com/apache/flink/pull/26942

   ## What is the purpose of the change
   
   This pull request fixes non-deterministic behavior in the MAP function when 
duplicate keys are provided. The MAP function was producing inconsistent 
results across different environments and test runs, causing CI failures and 
breaking reproducibility guarantees.
   
   The root cause was in the code generation logic in 
`ScalarOperatorGens.scala`, where `groupBy` was used to deduplicate keys, but 
the subsequent `.keys` and `.values` extraction had non-deterministic iteration 
order, breaking the correspondence between key and value arrays in the 
generated code.
   
   ## Brief change log
   
   - Added `groupByOrdered` utility method in `GenerateUtils` that uses 
`LinkedHashMap` to preserve insertion order during grouping operations
   - Updated MAP function code generation in `ScalarOperatorGens.scala` to use 
deterministic order-preserving deduplication instead of non-deterministic 
`groupBy`
   - Ensured "last value wins" semantics for duplicate keys by taking the last 
occurrence in argument order
   - Fixed key-value array correspondence in generated code to prevent 
mismatched entries
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   
   - **MapFunctionITCase.test()** - Contains the specific failing test case 
`map(f0, f0, f0, f1)` that was producing non-deterministic results
   - The fix makes the previously flaky test `MAP[1, 1, 1, 2] → {1=2}` 
consistently pass
   - All existing MAP function tests continue to pass with deterministic 
behavior
   - Manual verification shows consistent results across multiple test runs and 
environments
   
   The change specifically addresses test failures that occurred when constant 
folding was disabled, ensuring both code paths (optimized and runtime) produce 
consistent results.
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **yes** - 
affects MAP function code generation, but with minimal performance impact (same 
O(n) complexity)
   - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **no**
   - If yes, how is the feature documented? **not applicable** - this is a bug 
fix that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to