voonhous commented on code in PR #18532:
URL: https://github.com/apache/hudi/pull/18532#discussion_r3409036396
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -479,31 +479,29 @@ public static List<HoodieRecord>
convertMetadataToFilesPartitionRecords(HoodieCo
String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue();
- HashMap<String, Long> updatedFilesToSizesMapping =
- writeStats.stream().reduce(new HashMap<>(writeStats.size()),
- (map, stat) -> {
- String pathWithPartition = stat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- log.warn("Unable to find path in write stat to
update metadata table {}", stat);
- return map;
- }
-
- String fileName =
FSUtils.getFileName(pathWithPartition, partitionStatName);
-
- // Since write-stats are coming in no particular
order, if the same
- // file have previously been appended to w/in the txn,
we simply pick max
- // of the sizes as reported after every write, since
file-sizes are
- // monotonically increasing (ie file-size never goes
down, unless deleted)
- map.merge(fileName, stat.getFileSizeInBytes(),
Math::max);
-
- Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
- if (cdcPathAndSizes != null &&
!cdcPathAndSizes.isEmpty()) {
- cdcPathAndSizes.forEach((key, value) ->
map.put(FSUtils.getFileName(key, partitionStatName), value));
- }
- return map;
- },
- CollectionUtils::combine);
+ HashMap<String, Long> updatedFilesToSizesMapping = new
HashMap<>(writeStats.size());
Review Comment:
> flatMap sounds good to me, did you ever try this?
Yeah, gave it a go. `#flatMap` does clear point 1 (exploding each stat into
its main + 0..N CDC entries), but it doesn't get us past point 2, so it ends up
being more moving parts than the loop rather than fewer.
The catch is the single `#toMap` merge function. Once everything is
flattened into one stream there's still only one merge fn, so it forces CDC
into `Math::max` as well:
```java
HashMap<String, Long> updatedFilesToSizesMapping = writeStats.stream()
.filter(stat -> {
if (stat.getPath() == null) {
log.warn("Unable to find path in write stat to update metadata table
{}", stat);
return false; // side-effect in a filter
}
return true;
})
.flatMap(stat -> {
Stream<Map.Entry<String, Long>> dataEntry = Stream.of(Map.entry(
FSUtils.getFileName(stat.getPath(), partitionStatName),
stat.getFileSizeInBytes()));
Map<String, Long> cdc = stat.getCdcStats();
return (cdc == null || cdc.isEmpty()) ? dataEntry
: Stream.concat(dataEntry, cdc.entrySet().stream()
.map(e -> Map.entry(FSUtils.getFileName(e.getKey(),
partitionStatName), e.getValue())));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
Math::max, HashMap::new));
```
That flips CDC from last-wins (`put`) to `max`. They don't share keys in
prod IIUC so it's likely a no-op in practice, but it's still a silent behavior
change I'd rather not bury in a readability refactor.
Keeping the current semantics faithfully means splitting into two collects
(one per merge policy) and overlaying CDC:
```java
HashMap<String, Long> updatedFilesToSizesMapping = writeStats.stream()
.filter(stat -> stat.getPath() != null)
.collect(Collectors.toMap(
stat -> FSUtils.getFileName(stat.getPath(), partitionStatName),
HoodieWriteStat::getFileSizeInBytes, Math::max, () -> new
HashMap<>(writeStats.size())));
writeStats.stream()
.filter(stat -> stat.getPath() != null)
.map(HoodieWriteStat::getCdcStats)
.filter(cdc -> cdc != null && !cdc.isEmpty())
.forEach(cdc -> cdc.forEach((k, v) ->
updatedFilesToSizesMapping.put(FSUtils.getFileName(k,
partitionStatName), v)));
```
That's two passes + a side-effecting filter for the warn + a `forEach`
overlay (the CDC part is really a `put`, it doesn't want to be a collect). Net
more complex than the single loop with no readability or perf win, so I'd keep
the for-loop here.
If you'd actually prefer CDC merged by `max` too (defensible, CDC log sizes
are monotonic as well), I'm happy to ship the single `#flatMap` + `#toMap`
version, just as an explicit behavior change rather than slipping it in. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]