This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 612d02b35a0d2236a0be0d6e94d09fe8d0962c5e
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Aug 11 09:37:19 2023 -0700

    [HUDI-6553] Speedup column stats and bloom index creation on large 
datasets. (#9223)
    
    * [HUDI-6553] Speedup column stats and bloom index creation on large 
datasets.
    
    * addressing feedback
    
    * Fix log message
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   6 +
 hudi-common/pom.xml                                |   7 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 160 +++++++++------------
 3 files changed, 84 insertions(+), 89 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 74d8ae16176..e99ec493558 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -431,6 +431,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
             + " bootstrap failed for " + metadataMetaClient.getBasePath(), e);
       }
 
+      LOG.info(String.format("Initializing %s index with %d mappings and %d 
file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(),
+          fileGroupCountAndRecordsPair.getValue().count()));
+      HoodieTimer partitionInitTimer = HoodieTimer.start();
+
       // Generate the file groups
       final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
       ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType.name() + " should be > 0");
@@ -443,6 +447,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
       // initialize the metadata reader again so the MDT partition can be read 
after initialization
       initMetadataReader();
+      long totalInitTime = partitionInitTimer.endTimer();
+      LOG.info(String.format("Initializing %s index in metadata table took " + 
totalInitTime + " in ms", partitionType.name()));
     }
 
     return true;
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 2b4eb2829b8..71f7cf85ab9 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -103,6 +103,13 @@
   </build>
 
   <dependencies>
+    <!-- Scala -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.openjdk.jol</groupId>
       <artifactId>jol-core</artifactId>
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b50ff114250..08fc663fbad 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.ConvertingGenericData;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
@@ -87,7 +88,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -95,6 +95,8 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import scala.Tuple3;
+
 import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
@@ -787,61 +789,39 @@ public class HoodieTableMetadataUtil {
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
                                                                           
MetadataRecordsGenerationParams recordsGenerationParams,
                                                                           
String instantTime) {
-    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
-    List<Pair<String, String>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
-      return entry.getValue().stream().map(file -> Pair.of(entry.getKey(), 
file));
-    }).collect(Collectors.toList());
-
-    int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
-    HoodieData<Pair<String, String>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
-    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.map(partitionToDeletedFilePair -> {
-      String partitionName = partitionToDeletedFilePair.getLeft();
-      String deletedFile = partitionToDeletedFilePair.getRight();
-      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-        return null;
-      }
-      final String partition = getPartitionIdentifier(partitionName);
-      return (HoodieRecord) 
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
-    }).filter(Objects::nonNull);
-    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
-
-    List<Pair<String, String>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
-      return entry.getValue().keySet().stream().map(file -> 
Pair.of(entry.getKey(), file));
-    }).collect(Collectors.toList());
-
-    parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
-    HoodieData<Pair<String, String>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
-
-    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.map(partitionToAppendedFilesPair -> {
-      String partitionName = partitionToAppendedFilesPair.getLeft();
-      String appendedFile = partitionToAppendedFilesPair.getRight();
-      String partition = getPartitionIdentifier(partitionName);
-      if (!FSUtils.isBaseFile(new Path(appendedFile))) {
-        return null;
+    // Create the tuple (partition, filename, isDeleted) to handle both 
deletes and appends
+    final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
fetchPartitionFileInfoTriplets(partitionToDeletedFiles, 
partitionToAppendedFiles);
+
+    // Create records MDT
+    int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
+    return engineContext.parallelize(partitionFileFlagTupleList, 
parallelism).flatMap(partitionFileFlagTuple -> {
+      final String partitionName = partitionFileFlagTuple._1();
+      final String filename = partitionFileFlagTuple._2();
+      final boolean isDeleted = partitionFileFlagTuple._3();
+      if (!FSUtils.isBaseFile(new Path(filename))) {
+        LOG.warn(String.format("Ignoring file %s as it is not a base file", 
filename));
+        return Stream.<HoodieRecord>empty().iterator();
       }
-      final String pathWithPartition = partitionName + "/" + appendedFile;
-      final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
-      try (HoodieFileReader fileReader =
-               
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
-        final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
-        if (fileBloomFilter == null) {
-          LOG.error("Failed to read bloom filter for " + appendedFilePath);
-          return null;
+
+      // Read the bloom filter from the base file if the file is being added
+      ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
+      if (!isDeleted) {
+        final String pathWithPartition = partitionName + "/" + filename;
+        final Path addedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+        bloomFilterBuffer = 
readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(), 
addedFilePath);
+
+        // If reading the bloom filter failed then do not add a record for 
this file
+        if (bloomFilterBuffer == null) {
+          LOG.error("Failed to read bloom filter from " + addedFilePath);
+          return Stream.<HoodieRecord>empty().iterator();
         }
-        ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
-        return (HoodieRecord) 
(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-            partition, appendedFile, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false));
-      } catch (IOException e) {
-        LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
       }
-      return null;
-    }).filter(Objects::nonNull);
-    allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
 
-    return allRecordsRDD;
+      final String partition = getPartitionIdentifier(partitionName);
+      return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              partition, filename, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, 
partitionFileFlagTuple._3()))
+          .iterator();
+    });
   }
 
   /**
@@ -851,59 +831,61 @@ public class HoodieTableMetadataUtil {
                                                                           
Map<String, List<String>> partitionToDeletedFiles,
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
                                                                           
MetadataRecordsGenerationParams recordsGenerationParams) {
-    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+    // Find the columns to index
     HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
-
     final List<String> columnsToIndex =
         getColumnsToIndex(recordsGenerationParams,
             Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
-
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
       return engineContext.emptyHoodieData();
     }
 
-    List<Pair<String, String>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet().stream().flatMap(entry -> {
-      return entry.getValue().stream().map(file -> Pair.of(entry.getKey(), 
file));
-    }).collect(Collectors.toList());
-
-    int deletedFilesTargetParallelism = 
Math.max(Math.min(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    final HoodieData<Pair<String, String>> partitionToDeletedFilesRDD =
-        engineContext.parallelize(partitionToDeletedFilesList, 
deletedFilesTargetParallelism);
-
-    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
-      String partitionPath = partitionToDeletedFilesPair.getLeft();
-      String partitionId = getPartitionIdentifier(partitionPath);
-      String deletedFile = partitionToDeletedFilesPair.getRight();
-      String filePathWithPartition = partitionPath + "/" + deletedFile;
-      return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, true).iterator();
-    });
-
-    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+    LOG.info(String.format("Indexing %d columns for column stats index", 
columnsToIndex.size()));
 
-    List<Pair<String, String>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet().stream().flatMap(entry -> {
-      return entry.getValue().keySet().stream().map(file -> 
Pair.of(entry.getKey(), file));
-    }).collect(Collectors.toList());
+    // Create the tuple (partition, filename, isDeleted) to handle both 
deletes and appends
+    final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
fetchPartitionFileInfoTriplets(partitionToDeletedFiles, 
partitionToAppendedFiles);
 
-    int appendedFilesTargetParallelism = 
Math.max(Math.min(partitionToAppendedFilesList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    final HoodieData<Pair<String, String>> partitionToAppendedFilesRDD =
-        engineContext.parallelize(partitionToAppendedFilesList, 
appendedFilesTargetParallelism);
-
-    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
-      String partitionPath = partitionToAppendedFilesPair.getLeft();
-      String partitionId = getPartitionIdentifier(partitionPath);
-      String appendedFile = partitionToAppendedFilesPair.getRight();
-      if (!FSUtils.isBaseFile(new Path(appendedFile))
-          || 
!appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+    // Create records MDT
+    int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionFileFlagTupleList, 
parallelism).flatMap(partitionFileFlagTuple -> {
+      final String partitionName = partitionFileFlagTuple._1();
+      final String filename = partitionFileFlagTuple._2();
+      final boolean isDeleted = partitionFileFlagTuple._3();
+      if (!FSUtils.isBaseFile(new Path(filename)) || 
!filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file", 
filename));
         return Stream.<HoodieRecord>empty().iterator();
       }
-      final String filePathWithPartition = partitionPath + "/" + appendedFile;
-      return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, false).iterator();
+
+      final String filePathWithPartition = partitionName + "/" + filename;
+      final String partitionId = getPartitionIdentifier(partitionName);
+      return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
     });
+  }
 
-    allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+  private static ByteBuffer readBloomFilter(Configuration conf, Path filePath) 
throws IOException {
+    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(conf,
 filePath)) {
+      final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+      if (fileBloomFilter == null) {
+        return null;
+      }
+      return ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+    }
+  }
 
-    return allRecordsRDD;
+  private static List<Tuple3<String, String, Boolean>> 
fetchPartitionFileInfoTriplets(Map<String, List<String>> 
partitionToDeletedFiles,
+                                                                               
       Map<String, Map<String, Long>> partitionToAppendedFiles) {
+    // Total number of files which are added or deleted
+    final int totalFiles = 
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+        + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+    final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
new ArrayList<>(totalFiles);
+    partitionToDeletedFiles.entrySet().stream()
+        .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new 
Tuple3<>(entry.getKey(), deletedFile, true)))
+        .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+    partitionToAppendedFiles.entrySet().stream()
+        .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile -> 
new Tuple3<>(entry.getKey(), addedFile, false)))
+        .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+    return partitionFileFlagTupleList;
   }
 
   /**

Reply via email to