danny0405 commented on code in PR #9379:
URL: https://github.com/apache/hudi/pull/9379#discussion_r1286529738


##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java:
##########
@@ -97,68 +96,72 @@ protected void initRegistry() {
 
   @Override
   protected void commit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
-    doCommit(instantTime, partitionRecordsMap, false);
+    commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
+  }
+
+  @Override
+  protected List<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) {
+    return records.collectAsList();
   }
 
   @Override
   protected void bulkCommit(String instantTime, MetadataPartitionType 
partitionType, HoodieData<HoodieRecord> records, int fileGroupCount) {
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = 
new HashMap<>();
-    partitionRecordsMap.put(partitionType, records);
-    doCommit(instantTime, partitionRecordsMap, true);
+    commitInternal(instantTime, Collections.singletonMap(partitionType, 
records), true, Option.empty());
   }
 
-  private void doCommit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing) {
+  @Override
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
     ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
 
     //  Flink engine does not optimize initialCommit to MDT as bulk insert is 
not yet supported
 
-    try (HoodieFlinkWriteClient writeClient = (HoodieFlinkWriteClient) 
getWriteClient()) {
-      // rollback partially failed writes if any.
-      if (writeClient.rollbackFailedWrites()) {
-        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
-      }
-      
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().ifPresent(instant
 -> compactIfNecessary(writeClient, instant.getTimestamp()));
-
-      if 
(!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
-        // if this is a new commit being applied to metadata for the first time
-        LOG.info("New commit at " + instantTime + " being applied to MDT.");
-      } else {
-        // this code path refers to a re-attempted commit that:
-        //   1. got committed to metadata table, but failed in datatable.
-        //   2. failed while committing to metadata table
-        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
-        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
-        // are upserts to metadata table and so only a new delta commit will 
be created.
-        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
-        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
-        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
-            .lastInstant();
-        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
-            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
-
-        // Rollback the previous commit
-        if (!writeClient.rollback(instantTime)) {
-          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
-        }
-        metadataMetaClient.reloadActiveTimeline();
+    BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> 
writeClient = (BaseHoodieWriteClient<?, List<HoodieRecord>, ?, 
List<WriteStatus>>) getWriteClient();
+    // rollback partially failed writes if any.
+    if (writeClient.rollbackFailedWrites()) {
+      metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    }
+    
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().ifPresent(instant
 -> compactIfNecessary(writeClient, instant.getTimestamp()));
+
+    if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at " + instantTime + " being applied to MDT.");
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+          .lastInstant();
+      LOG.info(String.format("%s completed commit at %s being applied to MDT.",
+          alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {
+        throw new HoodieMetadataException("Failed to rollback deltacommit at " 
+ instantTime + " from MDT");
       }
+      metadataMetaClient.reloadActiveTimeline();
+    }
 
-      writeClient.startCommitWithTime(instantTime);
-      
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
+    writeClient.startCommitWithTime(instantTime);
+    preWrite(instantTime);
 
-      List<WriteStatus> statuses = isInitializing
-          ? writeClient.bulkInsertPreppedRecords(preppedRecordList, 
instantTime, Option.empty())
-          : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
-      // flink does not support auto-commit yet, also the auto commit logic is 
not complete as BaseHoodieWriteClient now.
-      writeClient.commit(instantTime, statuses, Option.empty(), 
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
+    List<WriteStatus> statuses = isInitializing
+        ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, 
Option.empty())
+        : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
+    // flink does not support auto-commit yet, also the auto commit logic is 
not complete as BaseHoodieWriteClient now.
+    writeClient.commit(instantTime, statuses, Option.empty(), 
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
 
-      // reload timeline
-      metadataMetaClient.reloadActiveTimeline();
-      
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().ifPresent(instant
 -> cleanIfNecessary(writeClient, instant.getTimestamp()));
-      writeClient.archive();
-    }
+    // reload timeline
+    metadataMetaClient.reloadActiveTimeline();
+    
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().ifPresent(instant
 -> cleanIfNecessary(writeClient, instant.getTimestamp()));
+    writeClient.archive();

Review Comment:
   Okay, seems the write client is shared by multiple MDT writer methods and 
has the same lifecycle with the writer itself now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to