manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r786392523
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -124,14 +202,111 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
return records;
}
+ /**
+ * Convert commit action metadata to bloom filter records.
+ *
+ * @param commitMetadata - Commit action metadata
+ * @param dataMetaClient - Meta client for the data table
+ * @param instantTime - Action instant time
+ * @return List of metadata table records
+ */
+ public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
+
HoodieTableMetaClient dataMetaClient,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
+ final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
+ Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+ writeStats.forEach(hoodieWriteStat -> {
+ // No action for delta logs
+ if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
+ return;
+ }
+
+ String pathWithPartition = hoodieWriteStat.getPath();
+ if (pathWithPartition == null) {
+ // Empty partition
+ LOG.error("Failed to find path in write stat to update metadata
table " + hoodieWriteStat);
+ return;
+ }
+ int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) :
+ partition.length() + 1;
+
+ final String fileName = pathWithPartition.substring(offset);
+ if (!FSUtils.isBaseFile(new Path(fileName))) {
+ return;
+ }
+ ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate
files in HoodieCommitMetadata");
+
+ final Path writeFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
+ try {
+ HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
writeFilePath);
+ try {
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
+ LOG.error("Failed to read bloom filter for " + writeFilePath);
+ return;
+ }
+ ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, fileName, instantTime, bloomByteBuffer, false);
+ records.add(record);
+ } catch (Exception e) {
+ LOG.error("Failed to read bloom filter for " + writeFilePath);
+ return;
Review comment:
Missing keys are handled at the caller level by throwing an exception.
This layer only logs the errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]