Davis-Zhang-Onehouse commented on code in PR #13617:
URL: https://github.com/apache/hudi/pull/13617#discussion_r2231908648


##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -44,12 +48,66 @@ public class HoodieDataUtils {
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
     // Deduplicate locally before shuffling to reduce data movement
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    // Filter out null keys before reduceByKey as it cannot handle null keys
+    pairData.persist("MEMORY_AND_DISK_SER");
+    Map<K, V> res;
+    try {
+      // Reduce by key could not handle null for some of the engines like 
spark. Handle non-null key first.
+      res = pairData.filter((key, value) -> key != null)
+          .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+          .collectAsList()
+          .stream()
+          .collect(HashMap::new,
+              (map, pair) -> map.put(pair.getKey(), pair.getValue()),
+              HashMap::putAll);
+      // Get values for the null key. Add a random one to the result map.
+      List<V> valuesForNullKey = pairData.filter((key, value) -> key == 
null).values().distinct().collectAsList();
+      if (!valuesForNullKey.isEmpty()) {
+        res.put(null, valuesForNullKey.get(0));
+      }
+    } finally {
+      pairData.unpersist();
+    }
+    return res;
   }
-} 
\ No newline at end of file
+
+  /**
+   * Collects results of the pair data into a {@link Map<K, Set<V>>} where 
values with the same key
+   * are grouped into a set.
+   *
+   * @param pairData Hoodie Pair Data to be collected
+   * @param <K> type of the key
+   * @param <V> type of the value
+   * @return a Map containing keys mapped to sets of values
+   */
+  public static <K, V> Map<K, Set<V>> 
dedupeAndCollectAsMapOfSet(HoodiePairData<K, V> pairData) {

Review Comment:
   sorry I hasn't self review yet. marked it as draft



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to