Davis-Zhang-Onehouse commented on code in PR #13617:
URL: https://github.com/apache/hudi/pull/13617#discussion_r2231909004
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -44,12 +48,66 @@ public class HoodieDataUtils {
public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V>
pairData) {
// Deduplicate locally before shuffling to reduce data movement
// If there are multiple entries sharing the same key, use the incoming one
- return pairData.reduceByKey((existing, incoming) -> incoming,
pairData.deduceNumPartitions())
- .collectAsList()
- .stream()
- .collect(Collectors.toMap(
- Pair::getKey,
- Pair::getValue
- ));
+ // Filter out null keys before reduceByKey as it cannot handle null keys
+ pairData.persist("MEMORY_AND_DISK_SER");
+ Map<K, V> res;
+ try {
+ // Reduce by key could not handle null for some of the engines like
spark. Handle non-null key first.
+ res = pairData.filter((key, value) -> key != null)
+ .reduceByKey((existing, incoming) -> incoming,
pairData.deduceNumPartitions())
+ .collectAsList()
+ .stream()
+ .collect(HashMap::new,
+ (map, pair) -> map.put(pair.getKey(), pair.getValue()),
+ HashMap::putAll);
+ // Get values for the null key. Add a random one to the result map.
+ List<V> valuesForNullKey = pairData.filter((key, value) -> key ==
null).values().distinct().collectAsList();
+ if (!valuesForNullKey.isEmpty()) {
+ res.put(null, valuesForNullKey.get(0));
+ }
+ } finally {
+ pairData.unpersist();
+ }
+ return res;
}
-}
\ No newline at end of file
+
+ /**
+ * Collects results of the pair data into a {@link Map<K, Set<V>>} where
values with the same key
+ * are grouped into a set.
+ *
+ * @param pairData Hoodie Pair Data to be collected
+ * @param <K> type of the key
+ * @param <V> type of the value
+ * @return a Map containing keys mapped to sets of values
+ */
+ public static <K, V> Map<K, Set<V>>
dedupeAndCollectAsMapOfSet(HoodiePairData<K, V> pairData) {
+ // Deduplicate locally before shuffling to reduce data movement
+ // If there are multiple entries sharing the same key, use the incoming one
+ // Filter out null keys before reduceByKey as it cannot handle null keys
+ pairData.persist("MEMORY_AND_DISK_SER");
+ Map<K, Set<V>> res;
+ try {
+ // Reduce by key could not handle null for some of the engines like
spark. Handle non-null key first.
+ res = pairData.filter((key, value) -> key != null)
Review Comment:
thanks for the pointer! Definitely this is a better idea, revised the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]