This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 629349c8cf9 [HUDI-6118] Some fixes to improve the MDT and record index 
code base. (#9106)
629349c8cf9 is described below

commit 629349c8cf9f7da68211c2d32d74e2c5d845b904
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Jul 21 19:13:05 2023 -0700

    [HUDI-6118] Some fixes to improve the MDT and record index code base. 
(#9106)
    
    * [HUDI-6118] Some fixes to improve the MDT and record index code base.
    
    1. Print MDT partition name instead of the enum tostring in logs
    2. Use fsView.loadAllPartitions()
    3. When publishing size metrics for MDT, only consider partitions which 
have been initialized
    4. Fixed job status names
    5. Limited logs which were printing the entire list of partitions. This is 
very verbose for datasets with large number of partitions
    6. Added a config to reduce the max parallelism of record index 
initialization.
    7. Changed defaults for MDT write configs to reasonable values
    8. Added config for MDT logBlock size. Larger blocks are preferred to 
reduce lookup time.
    9. Fixed the size metrics for MDT. These metrics should be set instead of 
incremented.
    10. Some metrics for reading from RI
    11. Renamed withMaxInitParallelism as it is only for RI
    12 * Fixing default values for cleaner commits retained with MDT
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 --
 .../metadata/HoodieBackedTableMetadataWriter.java  | 28 ++++++----
 .../hudi/metadata/HoodieMetadataWriteUtils.java    | 33 +++++++++---
 .../FlinkHoodieBackedTableMetadataWriter.java      |  2 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |  5 +-
 .../hudi/common/config/HoodieMetadataConfig.java   | 62 ++++++++++++----------
 .../hudi/common/table/HoodieTableConfig.java       |  2 +-
 .../table/view/AbstractTableFileSystemView.java    |  2 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  9 +++-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  4 +-
 .../hudi/metadata/HoodieMetadataMetrics.java       | 25 +++++----
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  2 +-
 .../hudi/metadata/MetadataPartitionType.java       |  9 ----
 13 files changed, 110 insertions(+), 77 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1581e21c070..88c852dd138 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2365,10 +2365,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBooleanOrDefault(HoodieMetadataConfig.ENABLE);
   }
 
-  public int getMetadataInsertParallelism() {
-    return getInt(HoodieMetadataConfig.INSERT_PARALLELISM_VALUE);
-  }
-
   public int getMetadataCompactDeltaCommitMax() {
     return getInt(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b63c6a5c649..7055ab993cd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -190,7 +190,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     }
 
     try {
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath());
+      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
       this.metadataMetaClient = metadata.getMetadataMetaClient();
     } catch (Exception e) {
       throw new HoodieException("Could not open MDT for reads", e);
@@ -359,15 +359,20 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     if (!filesPartitionAvailable) {
       partitionsToInit.remove(MetadataPartitionType.FILES);
       partitionsToInit.add(0, MetadataPartitionType.FILES);
+      // Initialize the metadata table for the first time
+      metadataMetaClient = initializeMetaClient();
     } else {
       // Check and then open the metadata table reader so FILES partition can 
be read during initialization of other partitions
       initMetadataReader();
+      // Load the metadata table metaclient if required
+      if (metadataMetaClient == null) {
+        metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+      }
     }
 
     // Already initialized partitions can be ignored
     partitionsToInit.removeIf(metadataPartition -> 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));
 
-    metadataMetaClient = initializeMetaClient();
 
     // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
     List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
@@ -382,7 +387,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
       String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
 
-      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+      LOG.info("Initializing MDT partition " + partitionType.name() + " at 
instant " + commitTimeForPartition);
 
       Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
       try {
@@ -413,7 +418,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
       // Generate the file groups
       final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
-      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType.name() + " should be > 0");
       initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
 
       // Perform the commit using bulkCommit
@@ -475,6 +480,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
     // Collect the list of latest base files present in each partition
     List<String> partitions = metadata.getAllPartitionPaths();
+    fsView.loadAllPartitions();
     final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
     for (String partition : partitions) {
       partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
@@ -509,8 +515,9 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       return engineContext.emptyHoodieData();
     }
 
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from base files");
-    return engineContext.parallelize(partitionBaseFilePairs, 
partitionBaseFilePairs.size()).flatMap(p -> {
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from " + partitionBaseFilePairs.size() + " base files");
+    final int parallelism = Math.min(partitionBaseFilePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(p -> {
       final String partition = p.getKey();
       final String filename = p.getValue();
       Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
@@ -558,7 +565,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     }
 
     // Records which save the file listing of each partition
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating 
records for MDT FILES partition");
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating 
records for metadata FILES partition");
     HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
       Map<String, Long> fileNameToSizeMap = 
partitionInfo.getFileNameToSizeMap();
       return HoodieMetadataPayload.createPartitionFilesRecord(
@@ -621,6 +628,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       // In each round we will list a section of directories
       int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
+      engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + 
numDirsToList + " partitions from filesystem");
       List<DirectoryInfo> processedDirectories = 
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
         FileSystem fs = path.get().getFileSystem(conf.get());
         String relativeDirPath = 
FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get());
@@ -707,12 +715,14 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // during initial commit, then the fileGroup would still be recognized (as 
a FileSlice with no baseFiles but a
     // valid logFile). Since these log files being created have no content, it 
is safe to add them here before
     // the bulkInsert.
-    LOG.info(String.format("Creating %d file groups for partition %s with base 
fileId %s at instant time %s",
-        fileGroupCount, metadataPartition.getPartitionPath(), 
metadataPartition.getFileIdPrefix(), instantTime));
+    final String msg = String.format("Creating %d file groups for partition %s 
with base fileId %s at instant time %s",
+        fileGroupCount, metadataPartition.getPartitionPath(), 
metadataPartition.getFileIdPrefix(), instantTime);
+    LOG.info(msg);
     final List<String> fileGroupFileIds = IntStream.range(0, fileGroupCount)
         .mapToObj(i -> 
HoodieTableMetadataUtil.getFileIDForFileGroup(metadataPartition, i))
         .collect(Collectors.toList());
     ValidationUtils.checkArgument(fileGroupFileIds.size() == fileGroupCount);
+    engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
     engineContext.foreach(fileGroupFileIds, fileGroupFileId -> {
       try {
         final HashMap<HeaderMetadataType, String> blockHeader = new 
HashMap<>();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 9d7ecc8be0f..ad87b5287ca 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -55,6 +56,19 @@ public class HoodieMetadataWriteUtils {
   // from the metadata payload schema.
   public static final String RECORD_KEY_FIELD_NAME = 
HoodieMetadataPayload.KEY_FIELD_NAME;
 
+  // MDT writes are always prepped. Hence, insert and upsert shuffle 
parallelism are not important to be configured. Same for delete
+  // parallelism as deletes are not used.
+  // The finalize, cleaner and rollback tasks will operate on each fileGroup 
so their parallelism should be as large as the total file groups.
+  // But it's not possible to accurately get the file group count here so 
keeping these values large enough. This parallelism would
+  // any ways be limited by the executor counts.
+  private static final int MDT_DEFAULT_PARALLELISM = 512;
+
+  // File groups in each partition are fixed at creation time and we do not 
want them to be split into multiple files
+  // ever. Hence, we use a very large basefile size in metadata table. The 
actual size of the HFiles created will
+  // eventually depend on the number of file groups selected for each 
partition (See estimateFileGroupCount function)
+  private static final long MDT_MAX_HFILE_SIZE_BYTES = 10 * 1024 * 1024 * 
1024L; // 10GB
+
+
   /**
    * Create a {@code HoodieWriteConfig} to use for the Metadata Table.  This 
is used by async
    * indexer only.
@@ -65,7 +79,8 @@ public class HoodieMetadataWriteUtils {
   public static HoodieWriteConfig createMetadataWriteConfig(
       HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy) {
     String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
-    int parallelism = writeConfig.getMetadataInsertParallelism();
+
+    final long maxLogFileSizeBytes = 
writeConfig.getMetadataConfig().getMaxLogFileSize();
 
     // Create the write config for the metadata table by borrowing options 
from the main write config.
     HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
@@ -91,7 +106,7 @@ public class HoodieMetadataWriteUtils {
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN)
             .withAutoClean(false)
-            .withCleanerParallelism(parallelism)
+            .withCleanerParallelism(MDT_DEFAULT_PARALLELISM)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
             .retainCommits(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED)
@@ -111,16 +126,18 @@ public class HoodieMetadataWriteUtils {
             // deltacommits having corresponding completed commits. Therefore, 
we need to compact all fileslices of all
             // partitions together requiring UnBoundedCompactionStrategy.
             .withCompactionStrategy(new UnBoundedCompactionStrategy())
-            // Check if log compaction is enabled, this is needed for tables 
with lot of records.
+            // Check if log compaction is enabled, this is needed for tables 
with a lot of records.
             
.withLogCompactionEnabled(writeConfig.isLogCompactionEnabledOnMetadata())
             // Below config is only used if isLogCompactionEnabled is set.
             
.withLogCompactionBlocksThreshold(writeConfig.getMetadataLogCompactBlocksThreshold())
             .build())
-        .withParallelism(parallelism, parallelism)
-        .withDeleteParallelism(parallelism)
-        .withRollbackParallelism(parallelism)
-        .withFinalizeWriteParallelism(parallelism)
-        .withAllowMultiWriteOnSameInstant(true)
+        
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(MDT_MAX_HFILE_SIZE_BYTES)
+            .logFileMaxSize(maxLogFileSizeBytes)
+            // Keeping the log blocks as large as the log files themselves 
reduces the number of HFile blocks to be checked for
+            // presence of keys
+            .logFileDataBlockMaxSize(maxLogFileSizeBytes).build())
+        .withRollbackParallelism(MDT_DEFAULT_PARALLELISM)
+        .withFinalizeWriteParallelism(MDT_DEFAULT_PARALLELISM)
         
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
         .withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
         .withWriteStatusClass(FailOnFirstErrorWriteStatus.class)
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 6edeac05a74..e3602ce1a2a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -160,7 +160,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
+    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, 
dataMetaClient.getTableConfig().getMetadataPartitions()));
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 2346831162b..320ebb15763 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -135,7 +135,6 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     JavaRDD<HoodieRecord> preppedRecordRDD = 
HoodieJavaRDD.getJavaRDD(preppedRecords);
 
-    engineContext.setJobStatus(this.getClass().getName(), "Committing " + 
instantTime + " to metadata table " + metadataWriteConfig.getTableName());
     try (SparkRDDWriteClient writeClient = (SparkRDDWriteClient) 
getWriteClient()) {
       // rollback partially failed writes if any.
       if (dataWriteConfig.getFailedWritesCleanPolicy().isEager()
@@ -169,8 +168,10 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
       writeClient.startCommitWithTime(instantTime);
       if (bulkInsertPartitioner.isPresent()) {
+        engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Bulk inserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
         writeClient.bulkInsertPreppedRecords(preppedRecordRDD, instantTime, 
bulkInsertPartitioner).collect();
       } else {
+        engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
         writeClient.upsertPreppedRecords(preppedRecordRDD, 
instantTime).collect();
       }
 
@@ -179,7 +180,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
+    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, 
dataMetaClient.getTableConfig().getMetadataPartitions()));
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 6a1f1845816..f2f6f11a678 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -49,7 +49,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
   // Meta fields are not populated by default for metadata table
   public static final boolean DEFAULT_METADATA_POPULATE_META_FIELDS = false;
   // Default number of commits to retain, without cleaning, on metadata table
-  public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 10;
+  public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 20;
 
   public static final String METADATA_PREFIX = "hoodie.metadata";
   public static final String OPTIMIZED_LOG_BLOCKS_SCAN = 
".optimized.log.blocks.scan.enable";
@@ -71,14 +71,6 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
       .sinceVersion("0.7.0")
       .withDocumentation("Enable publishing of metrics around metadata 
table.");
 
-  // Parallelism for inserts
-  public static final ConfigProperty<Integer> INSERT_PARALLELISM_VALUE = 
ConfigProperty
-      .key(METADATA_PREFIX + ".insert.parallelism")
-      .defaultValue(1)
-      .markAdvanced()
-      .sinceVersion("0.7.0")
-      .withDocumentation("Parallelism to use when inserting to the metadata 
table");
-
   // Async index
   public static final ConfigProperty<Boolean> ASYNC_INDEX_ENABLE = 
ConfigProperty
       .key(METADATA_PREFIX + ".index.async")
@@ -265,7 +257,7 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
 
   public static final ConfigProperty<Integer> 
RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP = ConfigProperty
       .key(METADATA_PREFIX + ".record.index.max.filegroup.count")
-      .defaultValue(1000)
+      .defaultValue(10000)
       .markAdvanced()
       .sinceVersion("0.14.0")
       .withDocumentation("Maximum number of file groups to use for Record 
Index.");
@@ -285,6 +277,12 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("The current number of records are multiplied by this 
number when estimating the number of "
           + "file groups to create automatically. This helps account for 
growth in the number of records in the dataset.");
 
+  public static final ConfigProperty<Integer> RECORD_INDEX_MAX_PARALLELISM = 
ConfigProperty
+      .key(METADATA_PREFIX + ".max.init.parallelism")
+      .defaultValue(100000)
+      .sinceVersion("0.14.0")
+      .withDocumentation("Maximum parallelism to use when initializing Record 
Index.");
+
   public static final ConfigProperty<Long> MAX_READER_MEMORY_PROP = 
ConfigProperty
       .key(METADATA_PREFIX + ".max.reader.memory")
       .defaultValue(1024 * 1024 * 1024L)
@@ -307,6 +305,17 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Path on local storage to use, when keys read from 
metadata are held in a spillable map.");
 
+  public static final ConfigProperty<Long> MAX_LOG_FILE_SIZE_BYTES_PROP = 
ConfigProperty
+      .key(METADATA_PREFIX + ".max.logfile.size")
+      .defaultValue(2 * 1024 * 1024 * 1024L)  // 2GB
+      .sinceVersion("0.14.0")
+      .withDocumentation("Maximum size in bytes of a single log file. Larger 
log files can contain larger log blocks "
+          + "thereby reducing the number of blocks to search for keys");
+
+  public long getMaxLogFileSize() {
+    return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
+  }
+
   private HoodieMetadataConfig() {
     super();
   }
@@ -423,9 +432,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(MAX_READER_BUFFER_SIZE_PROP);
   }
 
-  /**
-   * Builder for {@link HoodieMetadataConfig}.
-   */
+  public int getRecordIndexMaxParallelism() {
+    return getInt(RECORD_INDEX_MAX_PARALLELISM);
+  }
+
   public static class Builder {
 
     private EngineType engineType = EngineType.SPARK;
@@ -498,11 +508,6 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
-    public Builder withInsertParallelism(int parallelism) {
-      metadataConfig.setValue(INSERT_PARALLELISM_VALUE, 
String.valueOf(parallelism));
-      return this;
-    }
-
     public Builder withAsyncIndex(boolean asyncIndex) {
       metadataConfig.setValue(ASYNC_INDEX_ENABLE, String.valueOf(asyncIndex));
       return this;
@@ -528,6 +533,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withRecordIndexMaxParallelism(int parallelism) {
+      metadataConfig.setValue(RECORD_INDEX_MAX_PARALLELISM, 
String.valueOf(parallelism));
+      return this;
+    }
+
     public Builder withAssumeDatePartitioning(boolean assumeDatePartitioning) {
       metadataConfig.setValue(ASSUME_DATE_PARTITIONING, 
String.valueOf(assumeDatePartitioning));
       return this;
@@ -599,6 +609,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withMaxLogFileSizeBytes(long sizeInBytes) {
+      metadataConfig.setValue(MAX_LOG_FILE_SIZE_BYTES_PROP, 
String.valueOf(sizeInBytes));
+      return this;
+    }
+
     public HoodieMetadataConfig build() {
       metadataConfig.setDefaultValue(ENABLE, 
getDefaultMetadataEnable(engineType));
       metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
@@ -640,17 +655,6 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
   @Deprecated
   public static final boolean DEFAULT_METADATA_METRICS_ENABLE = 
METRICS_ENABLE.defaultValue();
 
-  /**
-   * @deprecated Use {@link #INSERT_PARALLELISM_VALUE} and its methods.
-   */
-  @Deprecated
-  public static final String METADATA_INSERT_PARALLELISM_PROP = 
INSERT_PARALLELISM_VALUE.key();
-  /**
-   * @deprecated Use {@link #INSERT_PARALLELISM_VALUE} and its methods.
-   */
-  @Deprecated
-  public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 
INSERT_PARALLELISM_VALUE.defaultValue();
-
   /**
    * @deprecated Use {@link #COMPACT_NUM_DELTA_COMMITS} and its methods.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 5963e199ea9..0b25cab655b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -778,7 +778,7 @@ public class HoodieTableConfig extends HoodieConfig {
     setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     update(metaClient.getFs(), new Path(metaClient.getMetaPath()), getProps());
-    LOG.info(String.format("MDT %s partition %s has been %s", 
metaClient.getBasePathV2(), partitionType, enabled ? "enabled" : "disabled"));
+    LOG.info(String.format("MDT %s partition %s has been %s", 
metaClient.getBasePathV2(), partitionType.name(), enabled ? "enabled" : 
"disabled"));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 0ed7acc9709..0910971e6b7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -341,7 +341,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
         long beginTs = System.currentTimeMillis();
         // Not loaded yet
         try {
-          LOG.info("Building file system view for partitions " + partitionSet);
+          LOG.debug("Building file system view for partitions: " + 
partitionSet);
 
           // Pairs of relative partition path and absolute partition path
           List<Pair<String, Path>> absolutePartitionPathList = 
partitionSet.stream()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1381d7e5268..076eb4bf1dc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -55,7 +55,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -287,6 +286,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
         "Record index is not initialized in MDT");
 
+    HoodieTimer timer = HoodieTimer.start();
     Map<String, HoodieRecord<HoodieMetadataPayload>> result = 
getRecordsByKeys(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
     Map<String, HoodieRecordGlobalLocation> recordKeyToLocation = new 
HashMap<>(result.size());
     result.forEach((key, record) -> {
@@ -294,6 +294,11 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
         recordKeyToLocation.put(key, 
record.getData().getRecordGlobalLocation());
       }
     });
+
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_TIME_STR, 
timer.endTimer()));
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_KEYS_COUNT_STR, 
recordKeys.size()));
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_KEYS_HITS_COUNT_STR, 
recordKeyToLocation.size()));
+
     return recordKeyToLocation;
   }
 
@@ -383,7 +388,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
         })
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
-    LOG.info("Listed files in partitions from metadata: partition list =" + 
Arrays.toString(partitionPaths.toArray()));
+    LOG.info("Listed files in " + partitionPaths.size() + " partitions from 
metadata");
 
     return partitionPathToFilesMap;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 21983f31f66..b0e9bcaaab2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -543,7 +544,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   public Map<String, String> stats() {
-    return metrics.map(m -> m.getStats(true, metadataMetaClient, 
this)).orElse(new HashMap<>());
+    Set<String> allMetadataPartitionPaths = 
Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+    return metrics.map(m -> m.getStats(true, metadataMetaClient, this, 
allMetadataPartitionPaths)).orElse(new HashMap<>());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
index 31591010bf9..521b55efaed 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -46,6 +47,12 @@ public class HoodieMetadataMetrics implements Serializable {
   public static final String LOOKUP_FILES_STR = "lookup_files";
   public static final String LOOKUP_BLOOM_FILTERS_METADATA_STR = 
"lookup_meta_index_bloom_filters";
   public static final String LOOKUP_COLUMN_STATS_METADATA_STR = 
"lookup_meta_index_column_ranges";
+  // Time for lookup from record index
+  public static final String LOOKUP_RECORD_INDEX_TIME_STR = 
"lookup_record_index_time";
+  // Number of keys looked up in a call
+  public static final String LOOKUP_RECORD_INDEX_KEYS_COUNT_STR = 
"lookup_record_index_key_count";
+  // Number of keys found in record index
+  public static final String LOOKUP_RECORD_INDEX_KEYS_HITS_COUNT_STR = 
"lookup_record_index_key_count";
   public static final String SCAN_STR = "scan";
   public static final String BASEFILE_READ_STR = "basefile_read";
   public static final String INITIALIZE_STR = "initialize";
@@ -71,21 +78,21 @@ public class HoodieMetadataMetrics implements Serializable {
     this.metricsRegistry = metricsRegistry;
   }
 
-  public Map<String, String> getStats(boolean detailed, HoodieTableMetaClient 
metaClient, HoodieTableMetadata metadata) {
+  public Map<String, String> getStats(boolean detailed, HoodieTableMetaClient 
metaClient, HoodieTableMetadata metadata, Set<String> metadataPartitions) {
     try {
-      metaClient.reloadActiveTimeline();
       HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
-      return getStats(fsView, detailed, metadata);
+      return getStats(fsView, detailed, metadata, metadataPartitions);
     } catch (IOException ioe) {
       throw new HoodieIOException("Unable to get metadata stats.", ioe);
     }
   }
 
-  private Map<String, String> getStats(HoodieTableFileSystemView fsView, 
boolean detailed, HoodieTableMetadata tableMetadata) throws IOException {
+  private Map<String, String> getStats(HoodieTableFileSystemView fsView, 
boolean detailed, HoodieTableMetadata tableMetadata, Set<String> 
metadataPartitions)
+      throws IOException {
     Map<String, String> stats = new HashMap<>();
 
-    // Total size of the metadata and count of base/log files
-    for (String metadataPartition : MetadataPartitionType.allPaths()) {
+    // Total size of the metadata and count of base/log files for enabled 
partitions
+    for (String metadataPartition : metadataPartitions) {
       List<FileSlice> latestSlices = 
fsView.getLatestFileSlices(metadataPartition).collect(Collectors.toList());
 
       // Total size of the metadata and count of base/log files
@@ -131,10 +138,10 @@ public class HoodieMetadataMetrics implements 
Serializable {
     incrementMetric(durationKey, durationInMs);
   }
 
-  public void updateSizeMetrics(HoodieTableMetaClient metaClient, 
HoodieBackedTableMetadata metadata) {
-    Map<String, String> stats = getStats(false, metaClient, metadata);
+  public void updateSizeMetrics(HoodieTableMetaClient metaClient, 
HoodieBackedTableMetadata metadata, Set<String> metadataPartitions) {
+    Map<String, String> stats = getStats(false, metaClient, metadata, 
metadataPartitions);
     for (Map.Entry<String, String> e : stats.entrySet()) {
-      incrementMetric(e.getKey(), Long.parseLong(e.getValue()));
+      setMetric(e.getKey(), Long.parseLong(e.getValue()));
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 384c96a664e..e37d7c1daff 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1634,7 +1634,7 @@ public class HoodieTableMetadataUtil {
 
     LOG.info(String.format("Estimated file group count for MDT partition %s is 
%d "
             + "[recordCount=%d, avgRecordSize=%d, minFileGroupCount=%d, 
maxFileGroupCount=%d, growthFactor=%f, "
-            + "maxFileGroupSizeBytes=%d]", partitionType, fileGroupCount, 
recordCount, averageRecordSize, minFileGroupCount,
+            + "maxFileGroupSizeBytes=%d]", partitionType.name(), 
fileGroupCount, recordCount, averageRecordSize, minFileGroupCount,
         maxFileGroupCount, growthFactor, maxFileGroupSizeBytes));
     return fileGroupCount;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 3f6a5adf6f0..81a6b43c4f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -49,14 +48,6 @@ public enum MetadataPartitionType {
     return fileIdPrefix;
   }
 
-  public static List<String> allPaths() {
-    return Arrays.asList(
-        FILES.getPartitionPath(),
-        COLUMN_STATS.getPartitionPath(),
-        BLOOM_FILTERS.getPartitionPath()
-    );
-  }
-
   /**
    * Returns the list of metadata table partitions which require WriteStatus 
to track written records.
    * <p>


Reply via email to