voonhous commented on code in PR #18532:
URL: https://github.com/apache/hudi/pull/18532#discussion_r3409036396


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -479,31 +479,29 @@ public static List<HoodieRecord> 
convertMetadataToFilesPartitionRecords(HoodieCo
               String partitionStatName = entry.getKey();
               List<HoodieWriteStat> writeStats = entry.getValue();
 
-              HashMap<String, Long> updatedFilesToSizesMapping =
-                  writeStats.stream().reduce(new HashMap<>(writeStats.size()),
-                      (map, stat) -> {
-                        String pathWithPartition = stat.getPath();
-                        if (pathWithPartition == null) {
-                          // Empty partition
-                          log.warn("Unable to find path in write stat to 
update metadata table {}", stat);
-                          return map;
-                        }
-
-                        String fileName = 
FSUtils.getFileName(pathWithPartition, partitionStatName);
-
-                        // Since write-stats are coming in no particular 
order, if the same
-                        // file have previously been appended to w/in the txn, 
we simply pick max
-                        // of the sizes as reported after every write, since 
file-sizes are
-                        // monotonically increasing (ie file-size never goes 
down, unless deleted)
-                        map.merge(fileName, stat.getFileSizeInBytes(), 
Math::max);
-
-                        Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
-                        if (cdcPathAndSizes != null && 
!cdcPathAndSizes.isEmpty()) {
-                          cdcPathAndSizes.forEach((key, value) -> 
map.put(FSUtils.getFileName(key, partitionStatName), value));
-                        }
-                        return map;
-                      },
-                      CollectionUtils::combine);
+              HashMap<String, Long> updatedFilesToSizesMapping = new 
HashMap<>(writeStats.size());

Review Comment:
   > flatMap sounds good to me, did you ever try this?
   
   Yeah, gave it a go. `#flatMap` does clear point 1 (exploding each stat into 
its main + 0..N CDC entries), but it doesn't get us past point 2, so it ends up 
being more moving parts than the loop rather than fewer.
   
   The catch is the single `#toMap` merge function. Once everything is 
flattened into one stream there's still only one merge fn, so it forces CDC 
into `Math::max` as well:
   
   ```java
   HashMap<String, Long> updatedFilesToSizesMapping = writeStats.stream()
       .filter(stat -> {
         if (stat.getPath() == null) {
           log.warn("Unable to find path in write stat to update metadata table 
{}", stat);
           return false;                       // side-effect in a filter
         }
         return true;
       })
       .flatMap(stat -> {
         Stream<Map.Entry<String, Long>> dataEntry = Stream.of(Map.entry(
             FSUtils.getFileName(stat.getPath(), partitionStatName), 
stat.getFileSizeInBytes()));
         Map<String, Long> cdc = stat.getCdcStats();
         return (cdc == null || cdc.isEmpty()) ? dataEntry
             : Stream.concat(dataEntry, cdc.entrySet().stream()
                 .map(e -> Map.entry(FSUtils.getFileName(e.getKey(), 
partitionStatName), e.getValue())));
       })
       .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
Math::max, HashMap::new));
   ```
   
   That flips CDC from last-wins (`put`) to `max`. They don't share keys in 
prod IIUC so it's likely a no-op in practice, but it's still a silent behavior 
change I'd rather not bury in a readability refactor.
   
   Keeping the current semantics faithfully means splitting into two collects 
(one per merge policy) and overlaying CDC:
   
   ```java
   HashMap<String, Long> updatedFilesToSizesMapping = writeStats.stream()
       .filter(stat -> stat.getPath() != null)
       .collect(Collectors.toMap(
           stat -> FSUtils.getFileName(stat.getPath(), partitionStatName),
           HoodieWriteStat::getFileSizeInBytes, Math::max, () -> new 
HashMap<>(writeStats.size())));
   
   writeStats.stream()
       .filter(stat -> stat.getPath() != null)
       .map(HoodieWriteStat::getCdcStats)
       .filter(cdc -> cdc != null && !cdc.isEmpty())
       .forEach(cdc -> cdc.forEach((k, v) ->
           updatedFilesToSizesMapping.put(FSUtils.getFileName(k, 
partitionStatName), v)));
   ```
   
   That's two passes + a side-effecting filter for the warn + a `forEach` 
overlay (the CDC part is really a `put`, it doesn't want to be a collect). Net 
more complex than the single loop with no readability or perf win, so I'd keep 
the for-loop here.
   
   If you'd actually prefer CDC merged by `max` too (defensible, CDC log sizes 
are monotonic as well), I'm happy to ship the single `#flatMap` + `#toMap` 
version, just as an explicit behavior change rather than slipping it in. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to