codope commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r820839507
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -187,94 +178,90 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
/**
* Convert commit action metadata to bloom filter records.
*
- * @param commitMetadata - Commit action metadata
- * @param dataMetaClient - Meta client for the data table
- * @param instantTime - Action instant time
- * @return List of metadata table records
+ * @param context - Engine context to use
+ * @param commitMetadata - Commit action metadata
+ * @param instantTime - Action instant time
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
+ * @return HoodieData of metadata table records
*/
- public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
-
HoodieTableMetaClient dataMetaClient,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
- final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
- Map<String, Long> newFiles = new HashMap<>(writeStats.size());
- writeStats.forEach(hoodieWriteStat -> {
- // No action for delta logs
- if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
- return;
- }
+ public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
+ HoodieEngineContext context, HoodieCommitMetadata commitMetadata,
+ String instantTime, MetadataRecordsGenerationParams
recordsGenerationParams) {
+ final List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return context.emptyHoodieData();
+ }
- String pathWithPartition = hoodieWriteStat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- LOG.error("Failed to find path in write stat to update metadata
table " + hoodieWriteStat);
- return;
- }
- int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) :
- partition.length() + 1;
+ HoodieData<HoodieWriteStat> allWriteStatsRDD =
context.parallelize(allWriteStats,
+ Math.max(recordsGenerationParams.getBloomIndexParallelism(),
allWriteStats.size()));
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]