codope commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r836766366



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams 
getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap 
= convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+        commit(instantTime, partitionRecordsMap, canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to 
be) written
+    Option<HoodieInstant> lastIndexingInstant = 
dataMetaClient.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from 
requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return 
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to 
FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.allPaths();

Review comment:
       this logic is updated. no more returning all partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to