lokeshj1703 commented on code in PR #12948:
URL: https://github.com/apache/hudi/pull/12948#discussion_r1995088754


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1726,4 +1659,302 @@ public boolean isInitialized() {
   }
 
   protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();
+
+  protected class MetadataWriteHandler implements Serializable {
+    HoodieTableVersion tableVersion;
+
+    public MetadataWriteHandler(HoodieTableVersion tableVersion) {
+      this.tableVersion = tableVersion;
+    }
+
+    public boolean shouldInitializeFromFilesystem(Set<String> 
pendingDataInstants, Option<String> inflightInstantTimestamp) {
+      if (tableVersion.lesserThan(HoodieTableVersion.EIGHT) && 
pendingDataInstants.stream()
+          .anyMatch(i -> !inflightInstantTimestamp.isPresent() || 
!i.equals(inflightInstantTimestamp.get()))) {
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+        LOG.warn("Cannot initialize metadata table as operation(s) are in 
progress on the dataset: {}",
+            Arrays.toString(pendingDataInstants.toArray()));
+        return false;
+      } else {
+        return true;
+      }
+    }
+
+    public String getTimelineHistoryPath() {
+      return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+          ? TIMELINE_HISTORY_PATH.defaultValue() : 
ARCHIVELOG_FOLDER.defaultValue();
+    }
+
+    /**
+     * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
+     */
+    protected boolean validateCompactionScheduling(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+      if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+        return 
validateCompactionSchedulingVersionSix(inFlightInstantTimestamp, 
latestDeltaCommitTimeInMetadataTable);
+      } else {
+        // Under the log compaction scope, the sequence of the log-compaction 
and compaction needs to be ensured because metadata items such as RLI
+        // only has proc-time ordering semantics. For "ensured", it means the 
completion sequence of the log-compaction/compaction is the same as the start 
sequence.
+        if (metadataWriteConfig.isLogCompactionEnabled()) {
+          Option<HoodieInstant> pendingLogCompactionInstant =
+              
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+          Option<HoodieInstant> pendingCompactionInstant =
+              
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+          if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+            LOG.warn("Not scheduling compaction or logCompaction, since a 
pending compaction instant {} or logCompaction {} instant is present",
+                pendingCompactionInstant, pendingLogCompactionInstant);
+            return false;
+          }
+        }
+        return true;
+      }
+    }
+
+    /**
+     * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
+     */
+    protected boolean validateCompactionSchedulingVersionSix(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+      // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
+      // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
+      // a. There could be a chance that latest delta commit in MDT is 
committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should 
be employed
+      // b. There could be DT inflights after latest delta commit in MDT and 
we are ok with it. bcoz, the contract is, the latest compaction instant time in 
MDT represents
+      // any instants before that is already synced with metadata table.
+      // c. Do consider out of order commits. For eg, c4 from DT could 
complete before c3. and we can't trigger compaction in MDT with c4 as base 
instant time, until every
+      // instant before c4 is synced with metadata table.
+      List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+          
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+
+      if (!pendingInstants.isEmpty()) {
+        checkNumDeltaCommits(metadataMetaClient, 
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+        LOG.info(String.format(
+            "Cannot compact metadata table as there are %d inflight instants 
in data table before latest deltacommit in metadata table: %s. Inflight 
instants in data table: %s",
+            pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, 
Arrays.toString(pendingInstants.toArray())));
+        return false;
+      }
+
+      // Check if there are any pending compaction or log compaction instants 
in the timeline.
+      // If pending compact/logCompaction operations are found abort 
scheduling new compaction/logCompaction operations.
+      Option<HoodieInstant> pendingLogCompactionInstant =
+          
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+      Option<HoodieInstant> pendingCompactionInstant =
+          
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+      if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+        LOG.warn(String.format("Not scheduling compaction or logCompaction, 
since a pending compaction instant %s or logCompaction %s instant is present",
+            pendingCompactionInstant, pendingLogCompactionInstant));
+        return false;
+      }
+      return true;
+    }
+
+    protected void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int 
maxNumDeltaCommitsWhenPending) {
+      final HoodieActiveTimeline activeTimeline = 
metaClient.reloadActiveTimeline();
+      Option<HoodieInstant> lastCompaction = 
activeTimeline.filterCompletedInstants()
+          .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+      int numDeltaCommits = lastCompaction.isPresent()
+          ? 
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants()
+          : activeTimeline.getDeltaCommitTimeline().countInstants();
+      if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+        throw new HoodieMetadataException(String.format("Metadata table's 
deltacommits exceeded %d: "
+                + "this is likely caused by a pending instant in the data 
table. Resolve the pending instant "
+                + "or adjust `%s`, then restart the pipeline.",
+            maxNumDeltaCommitsWhenPending, 
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+      }
+    }
+
+    /**
+     * Perform a compaction on the Metadata Table.
+     * <p>
+     * Cases to be handled:
+     * 1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+     * a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
+     * <p>
+     * 2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+     * deltacommit.
+     */
+    protected void compactIfNecessary(BaseHoodieWriteClient writeClient, 
String latestDeltacommitTime) {

Review Comment:
   Java docs for the function `compactIfNecessary` are same in 0.15.0 and 1.0



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1726,4 +1659,302 @@ public boolean isInitialized() {
   }
 
   protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();
+
+  protected class MetadataWriteHandler implements Serializable {
+    HoodieTableVersion tableVersion;
+
+    public MetadataWriteHandler(HoodieTableVersion tableVersion) {
+      this.tableVersion = tableVersion;
+    }
+
+    public boolean shouldInitializeFromFilesystem(Set<String> 
pendingDataInstants, Option<String> inflightInstantTimestamp) {
+      if (tableVersion.lesserThan(HoodieTableVersion.EIGHT) && 
pendingDataInstants.stream()
+          .anyMatch(i -> !inflightInstantTimestamp.isPresent() || 
!i.equals(inflightInstantTimestamp.get()))) {
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+        LOG.warn("Cannot initialize metadata table as operation(s) are in 
progress on the dataset: {}",
+            Arrays.toString(pendingDataInstants.toArray()));
+        return false;
+      } else {
+        return true;
+      }
+    }
+
+    public String getTimelineHistoryPath() {
+      return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+          ? TIMELINE_HISTORY_PATH.defaultValue() : 
ARCHIVELOG_FOLDER.defaultValue();
+    }
+
+    /**
+     * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
+     */
+    protected boolean validateCompactionScheduling(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+      if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+        return 
validateCompactionSchedulingVersionSix(inFlightInstantTimestamp, 
latestDeltaCommitTimeInMetadataTable);
+      } else {
+        // Under the log compaction scope, the sequence of the log-compaction 
and compaction needs to be ensured because metadata items such as RLI
+        // only has proc-time ordering semantics. For "ensured", it means the 
completion sequence of the log-compaction/compaction is the same as the start 
sequence.
+        if (metadataWriteConfig.isLogCompactionEnabled()) {
+          Option<HoodieInstant> pendingLogCompactionInstant =
+              
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+          Option<HoodieInstant> pendingCompactionInstant =
+              
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+          if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+            LOG.warn("Not scheduling compaction or logCompaction, since a 
pending compaction instant {} or logCompaction {} instant is present",
+                pendingCompactionInstant, pendingLogCompactionInstant);
+            return false;
+          }
+        }
+        return true;
+      }
+    }
+
+    /**
+     * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
+     */
+    protected boolean validateCompactionSchedulingVersionSix(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+      // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
+      // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
+      // a. There could be a chance that latest delta commit in MDT is 
committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should 
be employed
+      // b. There could be DT inflights after latest delta commit in MDT and 
we are ok with it. bcoz, the contract is, the latest compaction instant time in 
MDT represents
+      // any instants before that is already synced with metadata table.
+      // c. Do consider out of order commits. For eg, c4 from DT could 
complete before c3. and we can't trigger compaction in MDT with c4 as base 
instant time, until every
+      // instant before c4 is synced with metadata table.
+      List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+          
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+
+      if (!pendingInstants.isEmpty()) {
+        checkNumDeltaCommits(metadataMetaClient, 
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+        LOG.info(String.format(
+            "Cannot compact metadata table as there are %d inflight instants 
in data table before latest deltacommit in metadata table: %s. Inflight 
instants in data table: %s",
+            pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, 
Arrays.toString(pendingInstants.toArray())));
+        return false;
+      }
+
+      // Check if there are any pending compaction or log compaction instants 
in the timeline.
+      // If pending compact/logCompaction operations are found abort 
scheduling new compaction/logCompaction operations.
+      Option<HoodieInstant> pendingLogCompactionInstant =
+          
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+      Option<HoodieInstant> pendingCompactionInstant =
+          
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+      if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+        LOG.warn(String.format("Not scheduling compaction or logCompaction, 
since a pending compaction instant %s or logCompaction %s instant is present",
+            pendingCompactionInstant, pendingLogCompactionInstant));
+        return false;
+      }
+      return true;
+    }
+
+    protected void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int 
maxNumDeltaCommitsWhenPending) {
+      final HoodieActiveTimeline activeTimeline = 
metaClient.reloadActiveTimeline();
+      Option<HoodieInstant> lastCompaction = 
activeTimeline.filterCompletedInstants()
+          .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+      int numDeltaCommits = lastCompaction.isPresent()
+          ? 
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants()
+          : activeTimeline.getDeltaCommitTimeline().countInstants();
+      if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+        throw new HoodieMetadataException(String.format("Metadata table's 
deltacommits exceeded %d: "
+                + "this is likely caused by a pending instant in the data 
table. Resolve the pending instant "
+                + "or adjust `%s`, then restart the pipeline.",
+            maxNumDeltaCommitsWhenPending, 
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+      }
+    }
+
+    /**
+     * Perform a compaction on the Metadata Table.
+     * <p>
+     * Cases to be handled:
+     * 1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+     * a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
+     * <p>
+     * 2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+     * deltacommit.
+     */
+    protected void compactIfNecessary(BaseHoodieWriteClient writeClient, 
String latestDeltacommitTime) {
+      if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+        compactIfNecessaryVersionSix(writeClient, latestDeltacommitTime);
+      } else {
+        // IMPORTANT: Trigger compaction with max instant time that is smaller 
than(or equals) the earliest pending instant from DT.
+        // The compaction planner will manage to filter out the log files that 
finished with greater completion time.
+        // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for 
more details.
+        HoodieTimeline metadataCompletedTimeline = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants();
+        final String compactionInstantTime = 
dataMetaClient.reloadActiveTimeline()
+            // The filtering strategy is kept in line with the rollback 
premise, if an instant is pending on DT but completed on MDT,
+            // generates a compaction time smaller than it so that the instant 
could then been rolled back.
+            .filterInflightsAndRequested().filter(instant -> 
metadataCompletedTimeline.containsInstant(instant.requestedTime())).firstInstant()
+            // minus the pending instant time by 1 millisecond to avoid 
conflicts on the MDT.
+            .map(instant -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.requestedTime(), 1L))
+            .orElse(writeClient.createNewInstantTime(false));
+
+        // we need to avoid checking compaction w/ same instant again.
+        // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
+        // and again w/ C6, we will re-attempt compaction at which point 
latest delta commit is C4 in MDT.
+        // and so we try compaction w/ instant C4001. So, we can avoid 
compaction if we already have compaction w/ same instant time.
+        if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
+          LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+        } else if 
(writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+          LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
+          writeClient.compact(compactionInstantTime);
+        } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+          // Schedule and execute log compaction with new instant time.
+          final String logCompactionInstantTime = 
metadataMetaClient.createNewInstantTime(false);
+          if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
 {
+            LOG.info("Log compaction with same {} time is already present in 
the timeline.", logCompactionInstantTime);
+          } else if 
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, 
Option.empty())) {
+            LOG.info("Log compaction is scheduled for timestamp {}", 
logCompactionInstantTime);
+            writeClient.logCompact(logCompactionInstantTime);
+          }
+        }
+      }
+    }
+
+    /**
+     * Perform a compaction on the Metadata Table.
+     * <p>
+     * Cases to be handled:
+     * 1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+     * a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
+     * <p>
+     * 2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+     * deltacommit.
+     */
+    protected void compactIfNecessaryVersionSix(BaseHoodieWriteClient 
writeClient, String latestDeltacommitTime) {

Review Comment:
   Java docs for the function `compactIfNecessary` are same in 0.15.0 and 1.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to