danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1190914223


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -619,41 +680,34 @@ private boolean 
anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
     return false;
   }
 
-  private void 
updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> 
partitionTypes) {
-    Set<String> completedPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
-    
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
-    
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(),
 String.join(",", completedPartitions));
-    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
-  }
-
-  private HoodieTableMetaClient initializeMetaClient(boolean 
populateMetaFields) throws IOException {
+  private HoodieTableMetaClient initializeMetaClient() throws IOException {
     return HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(HoodieTableType.MERGE_ON_READ)
         .setTableName(tableName)
         .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
         .setPayloadClassName(HoodieMetadataPayload.class.getName())
         .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
         .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
-        .setPopulateMetaFields(populateMetaFields)
-        
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
-        .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+        .setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+            
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+            .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());

Review Comment:
   Fix the indentation.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
    * Processes commit metadata from data table and commits to metadata table.
    *
-   * @param instantTime instant time of interest.
+   * @param instantTime             instant time of interest.
    * @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param <T> type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
-  private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (!dataWriteConfig.isMetadataTableEnabled()) {
-      return;
-    }
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+    ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
     Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
     Set<String> inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
     // if indexing is inflight then do not trigger table service
     boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   Do we still need this?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -118,46 +123,32 @@ protected void initRegistry() {
   }
 
   @Override
-  protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext 
engineContext,
-                                                           Option<T> 
actionMetadata,
-                                                           Option<String> 
inflightInstantTimestamp) {
-    try {
-      metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
-        if (registry instanceof DistributedRegistry) {
-          HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext) engineContext;
-          ((DistributedRegistry) 
registry).register(sparkEngineContext.getJavaSparkContext());
-        }
-      });
+  protected void commit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
+    commitInternal(instantTime, partitionRecordsMap, Option.empty());
+  }
 
-      if (enabled) {
-        initializeIfNeeded(dataMetaClient, actionMetadata, 
inflightInstantTimestamp);
-      }
-    } catch (IOException e) {
-      LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
-      enabled = false;
-    }
+  protected void bulkCommit(
+          String instantTime, MetadataPartitionType partitionType, 
HoodieData<HoodieRecord> records,
+          int fileGroupCount) {
+    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = 
new HashMap<>();
+    partitionRecordsMap.put(partitionType, records);
+    SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);

Review Comment:
   Can bulk_insert supporting writing HFile directly ? It seems it still write 
parquets, then the k-v query performance should not be good ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -373,105 +356,92 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        return true;
       }
 
-      String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-      initTableMetadata(); // re-init certain flags in BaseTableMetadata
-      initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
partitionsToInit);
-      initialCommit(createInstantTime, partitionsToInit);
-      updateInitializedPartitionsInTableConfig(partitionsToInit);
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
+              
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

Review Comment:
   `.getReverseOrderedInstants().findFirst()` -> `lastInstant` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to