This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cdfacba1bd [HUDI-5496] Avoid unnecessary file system parsing to 
initialize metadata table for a new data table (#7841)
5cdfacba1bd is described below

commit 5cdfacba1bddfddb7b2b6a75a3ff0e3b6766392b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Feb 3 17:59:18 2023 -0800

    [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata 
table for a new data table (#7841)
    
    - Optimizing instantiation of metadata table for a fresh table by avoiding 
file listing
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +
 .../metadata/HoodieBackedTableMetadataWriter.java  | 64 ++++++++++++----------
 .../SparkHoodieBackedTableMetadataWriter.java      |  1 +
 .../internal/HoodieDataSourceInternalWriter.java   |  4 +-
 4 files changed, 39 insertions(+), 32 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c3260914bd5..17956479762 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -519,6 +519,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    */
   protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, 
String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+      context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker 
directories for commit " + instantTime + " in table "
+          + config.getTableName());
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8356ff9c71..5e8367e2095 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1086,39 +1086,45 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
     Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
 
-    List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
-    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-          return Pair.of(partitionName, p.getFileNameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-    List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-    if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-      // Record which saves the list of all partitions
-      HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-      HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-      ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-      partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-    }
+    // skip file system listing to populate metadata records if its a fresh 
table.
+    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
+    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
+      // If not, last completed commit in data table will be chosen as the 
initial commit time.
+      LOG.info("Triggering empty Commit to metadata to initialize");
+    } else {
+      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
+      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+          .map(p -> {
+            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+            return Pair.of(partitionName, p.getFileNameToSizeMap());
+          })
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
+      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
+
+      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
+        // Record which saves the list of all partitions
+        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
+        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
+        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
+        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
+      }
 
-    if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-      final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-          engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-      partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-    }
+      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
+        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
+      }
 
-    if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-      final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-          engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-      partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
+      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
+        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
+      }
+      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
     }
 
-    LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
-
     commit(createInstantTime, partitionToRecordsMap, false);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 81526c25bcc..23537f6f798 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -133,6 +133,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     JavaRDD<HoodieRecord> preppedRecordRDD = 
HoodieJavaRDD.getJavaRDD(preppedRecords);
 
+    engineContext.setJobStatus(this.getClass().getName(), "Committing " + 
instantTime + " to metadata table " + metadataWriteConfig.getTableName());
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
       // rollback partially failed writes if any.
       if (writeClient.rollbackFailedWrites()) {
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index c4b21483e8f..11f5d5030b4 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -34,7 +34,6 @@ import 
org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -51,7 +50,6 @@ public class HoodieDataSourceInternalWriter implements 
DataSourceWriter {
   private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
   private final boolean populateMetaFields;
   private final Boolean arePartitionRecordsSorted;
-  private Map<String, String> extraMetadataMap = new HashMap<>();
 
   public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig 
writeConfig, StructType structType,
                                         SparkSession sparkSession, 
Configuration configuration, DataSourceOptions dataSourceOptions,
@@ -61,7 +59,7 @@ public class HoodieDataSourceInternalWriter implements 
DataSourceWriter {
     this.structType = structType;
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-    this.extraMetadataMap = 
DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
+    Map<String, String> extraMetadataMap = 
DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
     this.dataSourceInternalWriterHelper = new 
DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
         sparkSession, configuration, extraMetadataMap);
   }

Reply via email to