manojpec commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r797380164



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();

Review comment:
       This might work for FILES partition. But, bloom filters and columns 
stats partition initialization can be huge set of records and can blow up the 
driver memory. I hit this problem as well. We need to make use of the context 
to build this HoodieData<HoodieRecord> list and then commit the RDD via commit 
call. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();

Review comment:
       And, calling commit serially for each partition one after the other 
might not be ideal. They are all different paritions under the same table and 
we should be able to commit across all in one shot via HoodieData. 
https://github.com/apache/hudi/pull/4352 take care of this. We can discuss more.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+        commit(engineContext.parallelize(records, 1), p, instantTime, 
canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to 
be) written
+    Option<HoodieInstant> lastIndexingInstant = 
dataMetaClient.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from 
requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return 
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to 
FileSystem.exists() check.");
+        return getExistingMetadataPartitions();

Review comment:
       But, FS check is not fool proof. Parition directory might exists well 
before the full initialization right? Or, we take care of the partition dir 
renamed/exists only after successful initialization?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -855,6 +855,17 @@ public boolean scheduleCompactionAtInstant(String 
instantTime, Option<Map<String
     return scheduleTableService(instantTime, extraMetadata, 
TableServiceType.COMPACT).isPresent();
   }
 
+  public Option<String> scheduleIndexing(List<String> partitions) {

Review comment:
       Should the arg be from the metadata table partition enum types? To force 
only valid/supported ones can be indexed?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -460,6 +475,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext 
engineContext, Hoodi
         .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
 
     initTableMetadata();
+    // TODO: make it generic for all enabled partition types

Review comment:
       This is taken care in https://github.com/apache/hudi/pull/4352

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, 
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize 
file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+        commit(engineContext.parallelize(records, 1), p, instantTime, 
canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to 
be) written
+    Option<HoodieInstant> lastIndexingInstant = 
dataMetaClient.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from 
requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return 
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to 
FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.all();
+  }
+
+  private List<String> getExistingMetadataPartitions() {
+    return MetadataPartitionType.all().stream()
+        .filter(p -> {
+          try {
+            // TODO: avoid fs.exists() check
+            return 
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
 p));

Review comment:
       See comment above. We can return the partitions only after it is fully 
initialized. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to