manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r786389763



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
+        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, 
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted);
+    HoodieMetadataPayload metadataPayload = new 
HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = 
combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, 
combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, 
combineColumnStats(previousRecord));

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to