This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b15a4d632e5 [HUDI-7696] Refactoring functions in
HoodieTableMetadataUtil (#11823)
b15a4d632e5 is described below
commit b15a4d632e5109d481135f088011511b7968d6aa
Author: Bibhu <[email protected]>
AuthorDate: Fri Aug 30 06:23:39 2024 +0530
[HUDI-7696] Refactoring functions in HoodieTableMetadataUtil (#11823)
Refactor `HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords`
and `convertMetadataToPartitionStatsRecords` for reusability.
---------
Co-authored-by: Bibhu Pala <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/metadata/HoodieTableMetadataUtil.java | 45 ++++++++++++----------
1 file changed, 25 insertions(+), 20 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cbfb42d104a..015dd368081 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2041,6 +2041,23 @@ public class HoodieTableMetadataUtil {
};
}
+ private static Stream<HoodieRecord> collectAndProcessColumnMetadata(
+ List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata,
+ String partitionPath) {
+
+ // Step 1: Flatten and Group by Column Name
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap
= fileColumnMetadata.stream()
+ .flatMap(List::stream)
+
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
Collectors.toList()));
+
+ // Step 2: Aggregate Column Ranges
+ Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata
= columnMetadataMap.entrySet().stream()
+ .map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
+
+ // Create Partition Stats Records
+ return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(Collectors.toList()), false);
+ }
+
public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
List<DirectoryInfo> partitionInfoList,
HoodieMetadataConfig metadataConfig,
@@ -2056,16 +2073,10 @@ public class HoodieTableMetadataUtil {
final String partitionPath = partitionInfo.getRelativePath();
// Step 1: Collect Column Metadata for Each File
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getFileNameToSizeMap().keySet().stream()
- .map(fileName -> getFileStatsRangeMetadata(partitionPath,
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
- .collect(toList());
- // Step 2: Flatten and Group by Column Name
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnMetadataMap = fileColumnMetadata.stream()
- .flatMap(List::stream)
-
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
- // Step 3: Aggregate Column Ranges
- Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
- .map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
- return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
+ .map(fileName -> getFileStatsRangeMetadata(partitionPath,
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
+ .collect(Collectors.toList());
+
+ return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionPath).iterator();
});
}
@@ -2120,16 +2131,10 @@ public class HoodieTableMetadataUtil {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
// Step 1: Collect Column Metadata for Each File
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
- .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex))
- .collect(toList());
- // Step 2: Flatten and Group by Column Name
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnMetadataMap = fileColumnMetadata.stream()
- .flatMap(List::stream)
-
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
- // Step 3: Aggregate Column Ranges
- Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
- .map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionName, entry.getValue()));
- return
HoodieMetadataPayload.createPartitionStatsRecords(partitionName,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
+ .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex))
+ .collect(Collectors.toList());
+
+ return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionName).iterator();
});
} catch (Exception e) {
throw new HoodieException("Failed to generate column stats records for
metadata table", e);