suryaprasanna commented on code in PR #8900:
URL: https://github.com/apache/hudi/pull/8900#discussion_r1223923099


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void 
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
    * deltacommit.
    */
   protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
latestDeltacommitTime) {
+
+    // Check if there are any pending compaction or log compaction instants in 
the timeline.
+    // If pending compact/logcompaction operations are found abort scheduling 
new compaction/logcompaction operations.
+    Option<HoodieInstant> pendingLogCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();

Review Comment:
   We can do async execution for compaction and logcompaction on metadata. In 
which case Ingestion writers will create the plan and exit and the async 
compaction and logcompaction pipelines will execute. In these cases plan's 
execution will be delayed.
   Will be porting async changes in another PR.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -97,6 +98,11 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
    */
   BaseHoodieWriteClient getWriteClient();
 
+  /**
+   * It returns write client for metadata table.
+   */
+  HoodieTableMetaClient getMetadataMetaClient();

Review Comment:
   This is useful for async table services on the metadata table to check for 
inflight compaction and log compaction plans.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java:
##########
@@ -47,6 +50,7 @@ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> 
create(
       String fileId,
       TaskContextSupplier taskContextSupplier,
       Option<BaseKeyGenerator> keyGeneratorOpt) {
+    LOG.info("Get updateHandle for fileId " + fileId + " and partitionPath " + 
partitionPath + " at commit " + instantTime);

Review Comment:
   Done.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1325,6 +1332,39 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
     return inflightAndCompletedPartitions;
   }
 
+  public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient 
dataMetaClient,
+                                                      HoodieTableMetaClient 
metadataMetaClient) {
+    // Only those log files which have a corresponding completed instant on 
the dataset should be read
+    // This is because the metadata table is updated before the dataset 
instants are committed.
+    HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
+    Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
+        .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+    // We should also add completed indexing delta commits in the metadata 
table, as they do not
+    // have corresponding completed instant in the data table
+    validInstantTimestamps.addAll(
+        metadataMetaClient.getActiveTimeline()
+            .filter(instant -> instant.isCompleted()
+                && (isIndexingCommit(instant.getTimestamp()) || 
isLogCompactionInstant(instant)))
+            .getInstantsAsStream()
+            .map(HoodieInstant::getTimestamp)
+            .collect(Collectors.toList()));
+
+    // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
+    // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
+    // instant which we have a log block for.
+    final String earliestInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+    
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
+        .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, earliestInstantTime))
+        .forEach(instant -> {
+          validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
+        });
+
+    // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
+    validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);

Review Comment:
   Yeah, updated it.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java:
##########
@@ -47,6 +50,7 @@ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> 
create(
       String fileId,
       TaskContextSupplier taskContextSupplier,
       Option<BaseKeyGenerator> keyGeneratorOpt) {
+    LOG.info("Get updateHandle for fileId " + fileId + " and partitionPath " + 
partitionPath + " at commit " + instantTime);

Review Comment:
   There could be cases where Record index returns file groups which are 
actually not present in the file system, during that case no Option is present 
errors will be thrown. Using this log debugging those cases will be easier.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void 
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
    * deltacommit.
    */
   protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
latestDeltacommitTime) {
+
+    // Check if there are any pending compaction or log compaction instants in 
the timeline.
+    // If pending compact/logcompaction operations are found abort scheduling 
new compaction/logcompaction operations.
+    Option<HoodieInstant> pendingLogCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+    Option<HoodieInstant> pendingCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+    if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+      LOG.info(String.format("Not scheduling compaction or logcompaction, 
since a pending compaction instant %s or logcompaction %s instant is present",
+          pendingCompactionInstant, pendingLogCompactionInstant));
+      return;
+    }
+
     // Trigger compaction with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
     final String compactionInstantTime = 
HoodieTableMetadataUtil.createCompactionTimestamp(latestDeltacommitTime);
+
     // we need to avoid checking compaction w/ same instant again.
     // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
-    if 
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
-        && writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
+      LOG.info(String.format("Compaction with same %s time is already present 
in the timeline.", compactionInstantTime));
+      return;
+    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+      LOG.info("Compaction is scheduled for timestamp " + 
compactionInstantTime);
       writeClient.compact(compactionInstantTime);
+    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+      // Schedule and execute log compaction with suffixes based on the same 
instant time. This ensures that any future
+      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
+      // metadata table.
+      final String logCompactionInstantTime = 
HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
+      if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
 {
+        LOG.info(String.format("Log compaction with same %s time is already 
present in the timeline.", logCompactionInstantTime));
+        return;

Review Comment:
   Yeah, removed the unnecessary returns in this class.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java:
##########
@@ -233,48 +233,49 @@ void testSyncMetadataTable() throws Exception {
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
 
     // test metadata table compaction
-    // write another 4 commits
-    for (int i = 1; i < 5; i++) {
+    // write another 9 commits to trigger compaction twice. Since default 
clean version to retain is 2.

Review Comment:
   Clean's retainFileVersions value is changed to 2 and it is actually 
hardcoded in HoodieMetadataWriteUtils and both Spark and Flink gets the 
writeConfig using this. So, this cannot be overridden.
   The reason for making the change is to support restore of main table 
timeline for atleast 10 commits. Restore on main table also causes restore on 
metadata table. If the retainFileVersions is less than 2, then next cleaner job 
after compaction would remove previous file slice there by blocking restore on 
metadata table or loosing data.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void 
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
    * deltacommit.
    */
   protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
latestDeltacommitTime) {
+
+    // Check if there are any pending compaction or log compaction instants in 
the timeline.
+    // If pending compact/logcompaction operations are found abort scheduling 
new compaction/logcompaction operations.
+    Option<HoodieInstant> pendingLogCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+    Option<HoodieInstant> pendingCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+    if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {

Review Comment:
   Good point. Refactored code.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -985,9 +985,10 @@ public void performTableServices(Option<String> 
inFlightInstantTimestamp) {
 
       // Do timeline validation before scheduling compaction/logcompaction 
operations.
       if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, 
latestDeltacommitTime)) {
+        LOG.info("No inflight commits present in either of the timelines. "
+            + "Proceeding to check compaction.");

Review Comment:
   Yeah good point. I added it to check if 
validateTimelineBeforeSchedulingCompaction succeeded. But with existing code it 
can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to