nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r798098988
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -215,19 +215,16 @@ public HoodieMetadataPayload
preCombine(HoodieMetadataPayload previousRecord) {
if (filesystemMetadata != null) {
filesystemMetadata.forEach((filename, fileInfo) -> {
- // If the filename wasnt present then we carry it forward
- if (!combinedFileInfo.containsKey(filename)) {
- combinedFileInfo.put(filename, fileInfo);
+ if (fileInfo.getIsDeleted()) {
+ combinedFileInfo.remove(filename);
} else {
- if (fileInfo.getIsDeleted()) {
- // file deletion
- combinedFileInfo.remove(filename);
- } else {
- // file appends.
- combinedFileInfo.merge(filename, fileInfo, (oldFileInfo,
newFileInfo) -> {
- return new HoodieMetadataFileInfo(oldFileInfo.getSize() +
newFileInfo.getSize(), false);
- });
- }
+ // NOTE: There are 2 possible cases here:
+ // - New file is created: in that case we're simply adding its
info
+ // - File is appended to (only log-files of MOR tables on
supported FS): in that case
+ // we simply pick the info w/ largest file-size as the most
recent one, since file's
+ // sizes are increasing monotonically (meaning that the larger
file-size is more recent one)
+ combinedFileInfo.merge(filename, fileInfo, (oldFileInfo,
newFileInfo) ->
Review comment:
merge func takes care of adding an entry for the first time and hence
remove L219 and 220 ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -87,40 +89,58 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
* @return a list of metadata table records
*/
public static List<HoodieRecord>
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- List<String> allPartitions = new LinkedList<>();
- commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
- final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
- allPartitions.add(partition);
-
- Map<String, Long> newFiles = new HashMap<>(writeStats.size());
- writeStats.forEach(hoodieWriteStat -> {
- String pathWithPartition = hoodieWriteStat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- LOG.warn("Unable to find path in write stat to update metadata table
" + hoodieWriteStat);
- return;
- }
-
- int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
- String filename = pathWithPartition.substring(offset);
- long totalWriteBytes = newFiles.containsKey(filename)
- ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
- : hoodieWriteStat.getTotalWriteBytes();
- newFiles.put(filename, totalWriteBytes);
- });
- // New files added to a partition
- HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
- partition, Option.of(newFiles), Option.empty());
- records.add(record);
- });
+ List<HoodieRecord> records = new
ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
+
+ // Add record bearing partitions list
+ ArrayList<String> partitionsList = new
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
+
+
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
+
+ // New files added to a partition
+ List<HoodieRecord<HoodieMetadataPayload>> updatedFilesRecords =
+ commitMetadata.getPartitionToWriteStats().entrySet()
+ .stream()
+ .map(entry -> {
+ String partitionStatName = entry.getKey();
+ List<HoodieWriteStat> writeStats = entry.getValue();
+
+ String partition =
partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME :
partitionStatName;
+
+ HashMap<String, Long> updatedFilesToSizesMapping =
+ writeStats.stream().reduce(new HashMap<>(writeStats.size()),
+ (map, stat) -> {
+ String pathWithPartition = stat.getPath();
+ if (pathWithPartition == null) {
+ // Empty partition
+ LOG.warn("Unable to find path in write stat to
update metadata table " + stat);
+ return map;
+ }
+
+ int offset = partition.equals(NON_PARTITIONED_NAME)
+ ? (pathWithPartition.startsWith("/") ? 1 : 0)
+ : partition.length() + 1;
+ String filename = pathWithPartition.substring(offset);
+
+ // Since write-stats are coming in no particular
order, if the same
+ // file have previously been appended to w/in the txn,
we simply pick max
+ // of the sizes as reported after every write, since
file-sizes are
+ // monotonically increasing (ie file-size never goes
down, unless deleted)
+ map.merge(filename, stat.getFileSizeInBytes(),
Math::max);
Review comment:
I am not sure if fileSizeInBytes will contain the entire file size.
Can you add some extra assertions to existing tests in TestHoodieLogFormat.
we can be certain the fileSizeIbBytes always contain full file size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]