danny0405 commented on code in PR #14095:
URL: https://github.com/apache/hudi/pull/14095#discussion_r2432794527
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java:
##########
@@ -126,60 +123,10 @@ protected void bulkCommit(String instantTime, String
partitionPath, HoodieData<H
@Override
protected void commitInternal(String instantTime, Map<String,
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
- ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is
not fully initialized yet.");
- HoodieData<HoodieRecord> preppedRecords =
tagRecordsWithLocation(partitionRecordsMap, isInitializing).getKey();
- List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
-
- // Flink engine does not optimize initialCommit to MDT as bulk insert is
not yet supported
-
- BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>>
writeClient = (BaseHoodieWriteClient<?, List<HoodieRecord>, ?,
List<WriteStatus>>) getWriteClient();
- // rollback partially failed writes if any.
- if (writeClient.rollbackFailedWrites(metadataMetaClient)) {
- metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
- }
-
- compactIfNecessary(writeClient, Option.empty());
-
- if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
- // if this is a new commit being applied to metadata for the first time
- LOG.info("New commit at " + instantTime + " being applied to MDT.");
- } else {
- // this code path refers to a re-attempted commit that:
- // 1. got committed to metadata table, but failed in datatable.
- // 2. failed while committing to metadata table
- // for e.g., let's say compaction c1 on 1st attempt succeeded in
metadata table and failed before committing to datatable.
- // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
- // are upserts to metadata table and so only a new delta commit will be
created.
- // once rollback is complete in datatable, compaction will be retried
again, which will eventually hit this code block where the respective commit is
- // already part of completed commit. So, we have to manually rollback
the completed instant and proceed.
- Option<HoodieInstant> alreadyCompletedInstant =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry
-> entry.requestedTime().equals(instantTime))
- .lastInstant();
- LOG.info(String.format("%s completed commit at %s being applied to MDT.",
- alreadyCompletedInstant.isPresent() ? "Already" : "Partially",
instantTime));
-
- // Rollback the previous commit
- if (!writeClient.rollback(instantTime)) {
- throw new HoodieMetadataException("Failed to rollback deltacommit at "
+ instantTime + " from MDT");
- }
- metadataMetaClient.reloadActiveTimeline();
- }
-
- writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
HoodieActiveTimeline.DELTA_COMMIT_ACTION);
- preWrite(instantTime);
-
- List<WriteStatus> statuses = isInitializing
- ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime,
bulkInsertPartitioner)
- : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
- // flink does not support auto-commit yet, also the auto commit logic is
not complete as BaseHoodieWriteClient now.
- writeClient.commit(instantTime, statuses, Option.empty(),
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
-
- // reload timeline
- metadataMetaClient.reloadActiveTimeline();
- cleanIfNecessary(writeClient, "");
- writeClient.archive();
-
- // Update total size of the metadata and count of base/log files
- metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata,
dataMetaClient.getTableConfig().getMetadataPartitions()));
+ performTableServices(Option.ofNullable(instantTime), false);
+ // the timeline reload should be unnecessary
+ // metadataMetaClient.reloadActiveTimeline();
Review Comment:
cc @nsivabalan to confirm this, it looks like the logic in
`super.commitInternal` still works no matter whether the mdt timeline reloaded
or not.
Another thing I want to confirm is inside `super.commitInternal`, in the
last step there is also a mdt timeline reloead, seems also unnecessary because
most of the callers just close the mdt writer instantly after the `#update`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]