[ https://issues.apache.org/jira/browse/FLINK-38286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ramin Gharib updated FLINK-38286: --------------------------------- Description: *Summary* The MAP function exhibits non-deterministic behavior when duplicate keys are provided, returning different results across environments and test runs. This breaks reproducibility and can cause CI failures. h3. *Description* h4. Problem The `MAP` function in Flink Table API/SQL produces inconsistent results when duplicate keys are provided. For example, `MAP[f0, f0, f0, f1]` where `f0=1` and `f1=2` should deterministically return `\{1=2}` (last value wins), but sometimes returns `\{1=1}` instead. h4. Root Cause The issue lies in the code generation logic in `ScalarOperatorGens.scala` (lines ~1510-1530). The current implementation uses: {code:java} val keyElements = elements .grouped(2) .map { case Seq(key, value) => (key, value) } .toSeq .groupBy(_._1) .map(_._2.last) .keys .toSeq{code} The problem is that `groupBy` returns a `Map`, and when we extract `.keys` and `.values`, the iteration order is {_}non-deterministic{_}. This breaks the correspondence between `keyArray[i]` and `valueArray[i]` in the generated code. h3. Steps to Reproduce 1. Run the `MapFunctionITCase` test with constant folding disabled 2. Execute the specific test case: `map(f0, f0, f0, f1)` where `f0=1, f1=2` 3. Observe that results vary between runs/environments *Test Code:* {code:java} // In MapFunctionITCase.java resultSpec( map($("f0"), $("f0"), $("f0"), $("f1")), "MAP[f0, f1]", Collections.singletonMap(1, 2), // Expected: {1=2} DataTypes.MAP(INT().notNull(), INT().notNull()).notNull() ){code} *Expected Behavior* - `MAP[1, 1, 1, 2]` should consistently return `\{1=2}` (last value wins) - Results should be deterministic across all environments *Actual Behavior* - Sometimes returns `\{1=2}` ✅ - Sometimes returns `\{1=1}` ❌ - Non-deterministic failures in CI environments was: *Summary* The MAP function exhibits non-deterministic behavior when duplicate keys are provided, returning different results across environments and test runs. This breaks reproducibility and can cause CI failures. h3. *Description* h4. Problem The `MAP` function in Flink Table API/SQL produces inconsistent results when duplicate keys are provided. For example, `MAP[f0, f0, f0, f1]` where `f0=1` and `f1=2` should deterministically return `\{1=2}` (last value wins), but sometimes returns `\{1=1}` instead. h4. Root Cause The issue lies in the code generation logic in `ScalarOperatorGens.scala` (lines ~1510-1530). The current implementation uses: ```scala val keyElements = elements .grouped(2) .map \{ case Seq(key, value) => (key, value) } .toSeq .groupBy(_._1) .map(_._2.last) .keys .toSeq ``` The problem is that `groupBy` returns a `Map`, and when we extract `.keys` and `.values`, the iteration order is {_}non-deterministic{_}. This breaks the correspondence between `keyArray[i]` and `valueArray[i]` in the generated code. h3. Steps to Reproduce 1. Run the `MapFunctionITCase` test with constant folding disabled 2. Execute the specific test case: `map(f0, f0, f0, f1)` where `f0=1, f1=2` 3. Observe that results vary between runs/environments *Test Code:* {code:java} // In MapFunctionITCase.java resultSpec( map($("f0"), $("f0"), $("f0"), $("f1")), "MAP[f0, f1]", Collections.singletonMap(1, 2), // Expected: {1=2} DataTypes.MAP(INT().notNull(), INT().notNull()).notNull() ){code} *Expected Behavior* - `MAP[1, 1, 1, 2]` should consistently return `\{1=2}` (last value wins) - Results should be deterministic across all environments *Actual Behavior* - Sometimes returns `\{1=2}` ✅ - Sometimes returns `\{1=1}` ❌ - Non-deterministic failures in CI environments > MAP function with duplicate keys produces non-deterministic results > ------------------------------------------------------------------- > > Key: FLINK-38286 > URL: https://issues.apache.org/jira/browse/FLINK-38286 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Reporter: Ramin Gharib > Priority: Major > Labels: pull-request-available > > *Summary* > The MAP function exhibits non-deterministic behavior when duplicate keys are > provided, returning different results across environments and test runs. This > breaks reproducibility and can cause CI failures. > h3. *Description* > h4. Problem > The `MAP` function in Flink Table API/SQL produces inconsistent results when > duplicate keys are provided. For example, `MAP[f0, f0, f0, f1]` where `f0=1` > and `f1=2` should deterministically return `\{1=2}` (last value wins), but > sometimes returns `\{1=1}` instead. > h4. Root Cause > The issue lies in the code generation logic in `ScalarOperatorGens.scala` > (lines ~1510-1530). The current implementation uses: > > {code:java} > val keyElements = elements > .grouped(2) > .map { case Seq(key, value) => (key, value) } > .toSeq > .groupBy(_._1) > .map(_._2.last) > .keys > .toSeq{code} > The problem is that `groupBy` returns a `Map`, and when we extract `.keys` > and `.values`, the iteration order is {_}non-deterministic{_}. This breaks > the correspondence between `keyArray[i]` and `valueArray[i]` in the generated > code. > h3. Steps to Reproduce > 1. Run the `MapFunctionITCase` test with constant folding disabled > 2. Execute the specific test case: `map(f0, f0, f0, f1)` where `f0=1, f1=2` > 3. Observe that results vary between runs/environments > *Test Code:* > > {code:java} > // In MapFunctionITCase.java > resultSpec( > map($("f0"), $("f0"), $("f0"), $("f1")), > "MAP[f0, f1]", > Collections.singletonMap(1, 2), // Expected: {1=2} > DataTypes.MAP(INT().notNull(), INT().notNull()).notNull() > ){code} > > > > *Expected Behavior* > - `MAP[1, 1, 1, 2]` should consistently return `\{1=2}` (last value wins) > - Results should be deterministic across all environments > *Actual Behavior* > - Sometimes returns `\{1=2}` ✅ > - Sometimes returns `\{1=1}` ❌ > - Non-deterministic failures in CI environments -- This message was sent by Atlassian Jira (v8.20.10#820010)