nsivabalan commented on code in PR #9379:
URL: https://github.com/apache/hudi/pull/9379#discussion_r1286231052


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,68 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);

Review Comment:
   may be: `convertHoodieDataToEngineSpecificInput` 



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java:
##########
@@ -97,17 +97,22 @@ protected void initRegistry() {
 
   @Override
   protected void commit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
-    doCommit(instantTime, partitionRecordsMap, false);
+    commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
+  }
+
+  @Override
+  protected List<HoodieRecord> 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records) {
+    return records.collectAsList();
   }
 
   @Override
   protected void bulkCommit(String instantTime, MetadataPartitionType 
partitionType, HoodieData<HoodieRecord> records, int fileGroupCount) {
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = 
new HashMap<>();
-    partitionRecordsMap.put(partitionType, records);
-    doCommit(instantTime, partitionRecordsMap, true);
+    commitInternal(instantTime, Collections.singletonMap(partitionType, 
records), true, Option.empty());
   }
 
-  private void doCommit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing) {
+  @Override
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,

Review Comment:
   @danny0405 : Can you take a look at all changes here. esply flink side of 
things. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,68 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
+    HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
+    I preppedRecordInputs = convertRecordsToWriteClientInput(preppedRecords);
+
+    try (BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient()) {
+      // rollback partially failed writes if any.
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
+
+      if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
+      } else {
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
+        }
+        metadataMetaClient.reloadActiveTimeline();
+      }
+
+      writeClient.startCommitWithTime(instantTime);
+      if (manuallyTransitionCommit) {

Review Comment:
   general suggestion. when you put out such large refacotoring PRs, would be 
easier for reviewers if you point out the lines that got changed. reviewers may 
not pay full attention since the first 20 lines in this code snippet is copied 
over as is. 
   
   So, you leave comments here as what code is changed, what code is just 
copied over. would help reviewer focus on whats exactly changed as well 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,69 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {

Review Comment:
   what is the purpose of isInitializing? 
   doesn't the presence of partitioner suffice? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,69 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
+    HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
+    I preppedRecordInputs = convertRecordsToWriteClientInput(preppedRecords);
+
+    try (BaseHoodieWriteClient<?, I, ?, O> writeClient = getWriteClient()) {
+      // rollback partially failed writes if any.
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
+
+      if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
+      } else {
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
+        }
+        metadataMetaClient.reloadActiveTimeline();
+      }
+
+      writeClient.startCommitWithTime(instantTime);
+      if (manuallyTransitionCommit) {
+        
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
+      }

Review Comment:
   From what I infer, java and flink need it, while spark does not need this. 
alternatively, we can introduce a method called preWrite just after starting a 
commit here. 
   it will be no-op in spark, while java and flink can overwrite and transition 
the timeline. 
   So, in that its better abstracted out. 
   



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieMetadataBulkInsertPartitioner.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class JavaHoodieMetadataBulkInsertPartitioner<T>
+    implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
+  private String fileId = null;
+
+  @Override
+  public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> 
records, int outputPartitions) {
+    if (records.isEmpty()) {
+      return records;
+    }
+    records.sort(Comparator.comparing(record -> 
record.getKey().getRecordKey()));
+    fileId = 
HoodieTableMetadataUtil.getFileGroupPrefix(records.get(0).getCurrentLocation().getFileId());

Review Comment:
   lets ensure we have follow up tickets here. 
   looks like this will work only for FILES partition as of now. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to