danny0405 commented on code in PR #8900:
URL: https://github.com/apache/hudi/pull/8900#discussion_r1223894630
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
* deltacommit.
*/
protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String
latestDeltacommitTime) {
+
+ // Check if there are any pending compaction or log compaction instants in
the timeline.
+ // If pending compact/logcompaction operations are found abort scheduling
new compaction/logcompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.info(String.format("Not scheduling compaction or logcompaction,
since a pending compaction instant %s or logcompaction %s instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant));
+ return;
+ }
+
// Trigger compaction with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
final String compactionInstantTime =
HoodieTableMetadataUtil.createCompactionTimestamp(latestDeltacommitTime);
+
// we need to avoid checking compaction w/ same instant again.
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
- && writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
+ LOG.info(String.format("Compaction with same %s time is already present
in the timeline.", compactionInstantTime));
+ return;
+ } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp " +
compactionInstantTime);
writeClient.compact(compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with suffixes based on the same
instant time. This ensures that any future
+ // delta commits synced over will not have an instant time lesser than
the last completed instant on the
+ // metadata table.
+ final String logCompactionInstantTime =
HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
{
+ LOG.info(String.format("Log compaction with same %s time is already
present in the timeline.", logCompactionInstantTime));
+ return;
Review Comment:
This `return` can be eliminated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]