nsivabalan commented on code in PR #9379:
URL: https://github.com/apache/hudi/pull/9379#discussion_r1286377816


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1061,74 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertHoodieDataToEngineSpecificInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
+    HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
+    I preppedRecordInputs = 
convertHoodieDataToEngineSpecificInput(preppedRecords);
+
+    try (BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient()) {
+      // rollback partially failed writes if any.
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
+
+      if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
+      } else {
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
+        }
+        metadataMetaClient.reloadActiveTimeline();
+      }
+
+      writeClient.startCommitWithTime(instantTime);
+      preCommit(instantTime);
+      if (isInitializing) {
+        engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Bulk inserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+        writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, 
bulkInsertPartitioner);
+      } else {
+        engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+        writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
+      }
+
+      metadataMetaClient.reloadActiveTimeline();
+    }
+
+    // Update total size of the metadata and count of base/log files
+    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, 
dataMetaClient.getTableConfig().getMetadataPartitions()));
+  }
+
+  /**
+   * Allows the implementation to perform any pre-commit operations like 
transitioning a commit to inflight if required.
+   * @param instantTime time of commit
+   */
+  protected void preCommit(String instantTime) {

Review Comment:
   lets name this preWrite. the writes have not started only. We have a similar 
pattern in our writeClient, where we have preCommit, which will be invoked 
after all writes are complete, but before wrapping up a commit. Things like 
conflict resolution etc will be handled there. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to