the-other-tim-brown commented on code in PR #13617:
URL: https://github.com/apache/hudi/pull/13617#discussion_r2230020766


##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -44,12 +48,66 @@ public class HoodieDataUtils {
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
     // Deduplicate locally before shuffling to reduce data movement
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    // Filter out null keys before reduceByKey as it cannot handle null keys
+    pairData.persist("MEMORY_AND_DISK_SER");
+    Map<K, V> res;
+    try {
+      // Reduce by key could not handle null for some of the engines like 
spark. Handle non-null key first.
+      res = pairData.filter((key, value) -> key != null)
+          .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+          .collectAsList()
+          .stream()
+          .collect(HashMap::new,
+              (map, pair) -> map.put(pair.getKey(), pair.getValue()),
+              HashMap::putAll);
+      // Get values for the null key. Add a random one to the result map.
+      List<V> valuesForNullKey = pairData.filter((key, value) -> key == 
null).values().distinct().collectAsList();
+      if (!valuesForNullKey.isEmpty()) {
+        res.put(null, valuesForNullKey.get(0));
+      }
+    } finally {
+      pairData.unpersist();
+    }
+    return res;
   }
-} 
\ No newline at end of file
+
+  /**
+   * Collects results of the pair data into a {@link Map<K, Set<V>>} where 
values with the same key
+   * are grouped into a set.
+   *
+   * @param pairData Hoodie Pair Data to be collected
+   * @param <K> type of the key
+   * @param <V> type of the value
+   * @return a Map containing keys mapped to sets of values
+   */
+  public static <K, V> Map<K, Set<V>> 
dedupeAndCollectAsMapOfSet(HoodiePairData<K, V> pairData) {
+    // Deduplicate locally before shuffling to reduce data movement
+    // If there are multiple entries sharing the same key, use the incoming one
+    // Filter out null keys before reduceByKey as it cannot handle null keys
+    pairData.persist("MEMORY_AND_DISK_SER");
+    Map<K, Set<V>> res;
+    try {
+      // Reduce by key could not handle null for some of the engines like 
spark. Handle non-null key first.
+      res = pairData.filter((key, value) -> key != null)

Review Comment:
   If you map to `Option(key)` can you get around the issue of null keys 
without requiring multiple iterations over the dataset?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -44,12 +48,66 @@ public class HoodieDataUtils {
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
     // Deduplicate locally before shuffling to reduce data movement
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    // Filter out null keys before reduceByKey as it cannot handle null keys
+    pairData.persist("MEMORY_AND_DISK_SER");
+    Map<K, V> res;
+    try {
+      // Reduce by key could not handle null for some of the engines like 
spark. Handle non-null key first.
+      res = pairData.filter((key, value) -> key != null)
+          .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+          .collectAsList()
+          .stream()
+          .collect(HashMap::new,
+              (map, pair) -> map.put(pair.getKey(), pair.getValue()),
+              HashMap::putAll);
+      // Get values for the null key. Add a random one to the result map.
+      List<V> valuesForNullKey = pairData.filter((key, value) -> key == 
null).values().distinct().collectAsList();
+      if (!valuesForNullKey.isEmpty()) {
+        res.put(null, valuesForNullKey.get(0));
+      }
+    } finally {
+      pairData.unpersist();
+    }
+    return res;
   }
-} 
\ No newline at end of file
+
+  /**
+   * Collects results of the pair data into a {@link Map<K, Set<V>>} where 
values with the same key
+   * are grouped into a set.
+   *
+   * @param pairData Hoodie Pair Data to be collected
+   * @param <K> type of the key
+   * @param <V> type of the value
+   * @return a Map containing keys mapped to sets of values
+   */
+  public static <K, V> Map<K, Set<V>> 
dedupeAndCollectAsMapOfSet(HoodiePairData<K, V> pairData) {

Review Comment:
   This is just a reduce by key with a custom combiner, right? Why do you call 
it dedupe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to