codope commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r835213738



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+        commit(engineContext.parallelize(records, 1), p, instantTime, 
canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to 
be) written
+    Option<HoodieInstant> lastIndexingInstant = 
dataMetaClient.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from 
requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return 
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to 
FileSystem.exists() check.");
+        return getExistingMetadataPartitions();

Review comment:
       now the source of truth will be table config, which will have both 
inflight and completed partitions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to