athlcode commented on code in PR #21720:
URL: https://github.com/apache/datafusion/pull/21720#discussion_r3203258298


##########
datafusion/spark/src/function/map/utils.rs:
##########
@@ -193,43 +216,193 @@ fn map_deduplicate_keys(
                 return exec_err!(
                     "map_deduplicate_keys: keys and values lists in the same 
row must have equal lengths"
                 );
-            } else if num_keys_entries != 0 {
-                let mut seen_keys = HashSet::new();
-
-                for cur_entry_idx in (0..num_keys_entries).rev() {
-                    let key = ScalarValue::try_from_array(
-                        &flat_keys,
-                        cur_keys_offset + cur_entry_idx,
-                    )?
-                    .compacted();
-                    if seen_keys.contains(&key) {
-                        // TODO: implement configuration and logic for 
spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
-                        // exec_err!("invalid argument: duplicate keys in map")
-                        // 
https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
-                    } else {
-                        // This code implements deduplication logic for 
spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
-                        keys_mask_one[cur_entry_idx] = true;
-                        values_mask_one[cur_entry_idx] = true;
-                        seen_keys.insert(key);
-                        new_last_offset += 1;
+            }
+            key_to_output_idx.clear();
+            for cur_entry_idx in 0..num_keys_entries {
+                let key = ScalarValue::try_from_array(
+                    &flat_keys,
+                    cur_keys_offset + cur_entry_idx,
+                )?
+                .compacted();
+                let abs_value_idx = (cur_values_offset + cur_entry_idx) as i32;
+
+                if let Some(&output_idx) = key_to_output_idx.get(&key) {
+                    if last_value_wins {
+                        value_indices[output_idx] = abs_value_idx;
+                        keys_mask_builder.append_value(false);
+                        continue;
                     }
+                    return exec_err!(
+                        "[DUPLICATED_MAP_KEY] Duplicate map key {key} was 
found, \
+                         please check the input data. To allow duplicate keys 
with \
+                         last-value-wins semantics, set \
+                         `datafusion.execution.map_key_dedup_policy` to 
`LAST_WIN`."
+                    );
                 }
+                keys_mask_builder.append_value(true);
+                key_to_output_idx.insert(key, value_indices.len());
+                value_indices.push(abs_value_idx);
+                new_last_offset += 1;
             }
         } else {
-            // the result entry is NULL
-            // both current row offsets are skipped
-            // keys or values in the current row are marked false in the masks
+            // The result entry is NULL — no keys/values emitted. Still pad the
+            // mask so it stays aligned with `flat_keys`.
+            keys_mask_builder.append_n(num_keys_entries, false);
         }
-        keys_mask_builder.append_array(&keys_mask_one.into());
-        values_mask_builder.append_array(&values_mask_one.into());
         new_offsets.push(new_last_offset);
         cur_keys_offset += num_keys_entries;
         cur_values_offset += num_values_entries;
     }
     let keys_mask = keys_mask_builder.finish();
-    let values_mask = values_mask_builder.finish();
     let needed_keys = filter(&flat_keys, &keys_mask)?;
-    let needed_values = filter(&flat_values, &values_mask)?;
+    let value_indices_array = Int32Array::from(value_indices);
+    let needed_values = take(&flat_values, &value_indices_array, None)?;
     let offsets = OffsetBuffer::new(new_offsets.into());
     Ok((needed_keys, needed_values, offsets))
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Int32Array, MapArray, StringArray};
+
+    fn as_map(array: &ArrayRef) -> &MapArray {
+        array.as_any().downcast_ref::<MapArray>().expect("MapArray")
+    }
+
+    fn int32_utf8_inputs(
+        keys: Vec<i32>,
+        values: Vec<Option<&str>>,
+    ) -> (ArrayRef, ArrayRef) {
+        let keys: ArrayRef = Arc::new(Int32Array::from(keys));
+        let values: ArrayRef = Arc::new(StringArray::from(values));
+        (keys, values)
+    }
+
+    #[test]
+    fn parse_policy_accepts_both_values_case_insensitively() {
+        assert!(!parse_map_key_dedup_policy("EXCEPTION").unwrap());
+        assert!(!parse_map_key_dedup_policy("exception").unwrap());
+        assert!(parse_map_key_dedup_policy("LAST_WIN").unwrap());
+        assert!(parse_map_key_dedup_policy("last_win").unwrap());
+    }
+
+    #[test]
+    fn parse_policy_rejects_unknown() {
+        let err = parse_map_key_dedup_policy("BOGUS").unwrap_err().to_string();
+        assert!(err.contains("Unknown map_key_dedup_policy"), "{err}");
+    }
+
+    #[test]
+    fn happy_path_two_rows_no_duplicates() {
+        let (keys, values) =
+            int32_utf8_inputs(vec![1, 2, 3], vec![Some("a"), Some("b"), 
Some("c")]);
+        let offsets = [0i32, 2, 3];
+
+        let result = map_from_keys_values_offsets_nulls(
+            &keys, &values, &offsets, &offsets, None, None, false,
+        )
+        .unwrap();
+
+        let map = as_map(&result);
+        assert_eq!(map.len(), 2);
+        assert_eq!(map.value_offsets(), &[0, 2, 3]);
+    }
+
+    #[test]
+    fn single_row_duplicate_errors_under_exception() {
+        let (keys, values) =
+            int32_utf8_inputs(vec![1, 2, 1], vec![Some("a"), Some("b"), 
Some("c")]);
+        let offsets = [0i32, 3];
+
+        let err = map_from_keys_values_offsets_nulls(
+            &keys, &values, &offsets, &offsets, None, None, false,
+        )
+        .unwrap_err()
+        .to_string();
+
+        assert!(err.contains("[DUPLICATED_MAP_KEY]"), "{err}");
+        assert!(err.contains("map_key_dedup_policy"), "{err}");
+    }
+
+    #[test]
+    fn last_win_keeps_final_occurrence() {
+        let (keys, values) = int32_utf8_inputs(
+            vec![1, 2, 1, 3, 2],
+            vec![Some("a"), Some("b"), Some("c"), Some("d"), Some("e")],
+        );
+        let offsets = [0i32, 5];
+
+        let result = map_from_keys_values_offsets_nulls(
+            &keys, &values, &offsets, &offsets, None, None, true,
+        )
+        .unwrap();
+
+        let map = as_map(&result);
+        assert_eq!(map.len(), 1);
+        // 5 entries in, 3 unique keys -> offsets [0, 3]
+        assert_eq!(map.value_offsets(), &[0, 3]);
+    }
+
+    #[test]
+    fn duplicate_in_later_row_still_errors() {
+        let (keys, values) = int32_utf8_inputs(
+            vec![1, 2, 1, 1],
+            vec![Some("a"), Some("b"), Some("x"), Some("y")],
+        );
+        let offsets = [0i32, 2, 4];
+
+        let err = map_from_keys_values_offsets_nulls(
+            &keys, &values, &offsets, &offsets, None, None, false,
+        )
+        .unwrap_err()
+        .to_string();
+
+        assert!(err.contains("[DUPLICATED_MAP_KEY]"), "{err}");
+    }
+
+    #[test]
+    fn empty_row_does_not_trigger_dedup() {
+        let (keys, values) = int32_utf8_inputs(vec![], vec![]);
+        let offsets = [0i32, 0];
+
+        let result = map_from_keys_values_offsets_nulls(
+            &keys, &values, &offsets, &offsets, None, None, false,
+        )
+        .unwrap();
+
+        let map = as_map(&result);
+        assert_eq!(map.len(), 1);
+        assert_eq!(map.value_offsets(), &[0, 0]);
+    }
+
+    #[test]
+    fn null_row_is_skipped_and_not_checked() {
+        // Row 0 is NULL (keys null). Its duplicate keys should be ignored;
+        // row 1 is a clean row.
+        let (keys, values) = int32_utf8_inputs(
+            vec![1, 1, 2, 3],
+            vec![Some("dup-a"), Some("dup-b"), Some("x"), Some("y")],
+        );
+        let offsets = [0i32, 2, 4];
+        let keys_nulls = NullBuffer::from(vec![false, true]);
+
+        let result = map_from_keys_values_offsets_nulls(
+            &keys,
+            &values,
+            &offsets,
+            &offsets,
+            Some(&keys_nulls),
+            None,
+            false,
+        )
+        .unwrap();
+
+        let map = as_map(&result);

Review Comment:
   thank you for the suggestion, pushed the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to