manojpec commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r797392212



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+        commit(engineContext.parallelize(records, 1), p, instantTime, 
canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to 
be) written
+    Option<HoodieInstant> lastIndexingInstant = 
dataMetaClient.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from 
requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return 
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to 
FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.all();
+  }
+
+  private List<String> getExistingMetadataPartitions() {
+    return MetadataPartitionType.all().stream()
+        .filter(p -> {
+          try {
+            // TODO: avoid fs.exists() check
+            return 
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
 p));
+          } catch (IOException e) {
+            return false;
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void index(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+    indexPartitionInfos.forEach(indexPartitionInfo -> {
+      String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+      String relativePartitionPath = 
indexPartitionInfo.getMetadataPartitionPath();
+      LOG.info(String.format("Creating a new metadata index for partition '%s' 
under path %s upto instant %s",
+          relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime));
+      try {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.MERGE_ON_READ)
+            .setTableName(tableName)
+            .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+            .setPayloadClassName(HoodieMetadataPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+            .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+            
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+            
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+            .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+        initTableMetadata();
+        initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)), 
indexUptoInstantTime, 1);
+      } catch (IOException e) {
+        throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, indexUptoInstant: %s",
+            relativePartitionPath, indexUptoInstantTime));
+      }
+
+      // List all partitions in the basePath of the containing dataset
+      LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
+      engineContext.setJobStatus(this.getClass().getSimpleName(), 
"MetadataIndex: initializing metadata table by listing files and partitions");
+      List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+
+      // During bootstrap, the list of files to be committed can be huge. So 
creating a HoodieCommitMetadata out of these
+      // large number of files and calling the existing 
update(HoodieCommitMetadata) function does not scale well.
+      // Hence, we have a special commit just for the bootstrap scenario.
+      bootstrapCommit(dirInfoList, indexUptoInstantTime, 
relativePartitionPath);

Review comment:
       We need to decide on the interface for bootstrapCommit() which can apply 
to all metadata partitions. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+        commit(engineContext.parallelize(records, 1), p, instantTime, 
canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to 
be) written
+    Option<HoodieInstant> lastIndexingInstant = 
dataMetaClient.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from 
requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return 
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to 
FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.all();
+  }
+
+  private List<String> getExistingMetadataPartitions() {
+    return MetadataPartitionType.all().stream()
+        .filter(p -> {
+          try {
+            // TODO: avoid fs.exists() check
+            return 
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
 p));
+          } catch (IOException e) {
+            return false;
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void index(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+    indexPartitionInfos.forEach(indexPartitionInfo -> {
+      String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+      String relativePartitionPath = 
indexPartitionInfo.getMetadataPartitionPath();
+      LOG.info(String.format("Creating a new metadata index for partition '%s' 
under path %s upto instant %s",
+          relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime));
+      try {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.MERGE_ON_READ)
+            .setTableName(tableName)
+            .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+            .setPayloadClassName(HoodieMetadataPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+            .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+            
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+            
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+            .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+        initTableMetadata();
+        initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)), 
indexUptoInstantTime, 1);

Review comment:
       FileGroup count for each partition comes from config. Either they can be 
part of the plan or we need to refer back to the config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to