This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 195ae3a9a23 [HUDI-6334] Integrate logcompaction table service to 
metadata table and provides various bugfixes to metadata table (#8900)
195ae3a9a23 is described below

commit 195ae3a9a23eb7c241b89d2a51ef902715d4b20b
Author: Surya Prasanna <[email protected]>
AuthorDate: Fri Jun 9 07:23:27 2023 -0700

    [HUDI-6334] Integrate logcompaction table service to metadata table and 
provides various bugfixes to metadata table (#8900)
    
    * Integrate logcompaction table service on metadata table and provides 
various bugfixes to metadata table
    
    Following changes are also included as part of this commit:
    
    1. Change cleaner policy to KEEP_LATEST_FILE_VERSIONS for metadata table
    2. Added logs statements
    3. Fix rollback of inflight deltacommits on metadata table
    4. Add log statements to LogCompaction planner
    5. Integrate logcompaction table service on metadata table
    6. Include checks to block Logcompaction in presence of compaction with 
greater timestamp
    7. Use instant range option for major and minor compaction on metadata table
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  7 ++
 .../apache/hudi/config/HoodieCompactionConfig.java | 15 +++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 12 +++
 .../apache/hudi/io/HoodieMergeHandleFactory.java   |  5 ++
 .../metadata/HoodieBackedTableMetadataWriter.java  | 33 +++++++-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |  4 +
 .../hudi/table/action/compact/CompactHelpers.java  | 21 +++++
 .../hudi/table/action/compact/HoodieCompactor.java | 18 +---
 .../compact/RunCompactionActionExecutor.java       |  1 +
 .../BaseHoodieCompactionPlanGenerator.java         | 12 ++-
 .../HoodieLogCompactionPlanGenerator.java          | 16 ++--
 .../FlinkHoodieBackedTableMetadataWriter.java      |  2 +-
 .../hudi/client/SparkRDDTableServiceClient.java    | 31 ++++++-
 .../SparkHoodieBackedTableMetadataWriter.java      |  2 +-
 ...DataValidationCheckForLogCompactionActions.java |  2 +-
 .../functional/TestHoodieBackedMetadata.java       |  1 +
 .../TestHoodieClientOnMergeOnReadStorage.java      | 26 +++---
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  4 +-
 .../hudi/common/config/HoodieMetadataConfig.java   | 22 +++++
 .../table/log/HoodieMergedLogRecordScanner.java    |  1 -
 .../hudi/exception/HoodieLogCompactException.java  | 30 +++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 83 +-----------------
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 99 ++++++++++++++++++++++
 23 files changed, 313 insertions(+), 134 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index ab45d200b37..3807ee2191b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -583,6 +583,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     tableServiceClient.runAnyPendingCompactions(createTable(config, 
hadoopConf));
   }
 
+  /**
+   * Run any pending log compactions.
+   */
+  public void runAnyPendingLogCompactions() {
+    tableServiceClient.runAnyPendingLogCompactions(createTable(config, 
hadoopConf));
+  }
+
   /**
    * Create a savepoint based on the latest commit action on the timeline.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index c8814406ce5..d615be0926d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -61,6 +61,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "but users are expected to trigger async job for execution. If 
`hoodie.compact.inline` is set to true, regular writers will do both scheduling 
and "
           + "execution inline for compaction");
 
+  public static final ConfigProperty<String> ENABLE_LOG_COMPACTION = 
ConfigProperty
+      .key("hoodie.log.compaction.enable")
+      .defaultValue("false")
+      .sinceVersion("0.14")
+      .withDocumentation("By enabling log compaction through this config, log 
compaction will also get enabled for the metadata table.");
+
   public static final ConfigProperty<String> INLINE_LOG_COMPACT = 
ConfigProperty
       .key("hoodie.log.compaction.inline")
       .defaultValue("false")
@@ -432,8 +438,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withLogCompactionBlocksThreshold(String 
logCompactionBlocksThreshold) {
-      compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, 
logCompactionBlocksThreshold);
+    public Builder withLogCompactionEnabled(boolean enableLogCompaction) {
+      compactionConfig.setValue(ENABLE_LOG_COMPACTION, 
Boolean.toString(enableLogCompaction));
+      return this;
+    }
+
+    public Builder withLogCompactionBlocksThreshold(int 
logCompactionBlocksThreshold) {
+      compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, 
String.valueOf(logCompactionBlocksThreshold));
       return this;
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9a7ee2fbaa7..fd19607f1fd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1414,6 +1414,10 @@ public class HoodieWriteConfig extends HoodieConfig {
    * compaction properties.
    */
 
+  public boolean isLogCompactionEnabled() {
+    return getBoolean(HoodieCompactionConfig.ENABLE_LOG_COMPACTION);
+  }
+
   public int getLogCompactionBlocksThreshold() {
     return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
   }
@@ -2335,6 +2339,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBooleanOrDefault(HoodieMetadataConfig.ASYNC_INDEX_ENABLE);
   }
 
+  public int getMetadataLogCompactBlocksThreshold() {
+    return getInt(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD);
+  }
+
+  public boolean isLogCompactionEnabledOnMetadata() {
+    return 
getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
+  }
+
   /**
    * Hoodie Client Lock Configs.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index 8aadd637f0e..5c36eb3e8c1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.Map;
@@ -34,6 +36,7 @@ import java.util.Map;
  * Factory class for hoodie merge handle.
  */
 public class HoodieMergeHandleFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeHandleFactory.class);
   /**
    * Creates a merge handle for normal write path.
    */
@@ -47,6 +50,7 @@ public class HoodieMergeHandleFactory {
       String fileId,
       TaskContextSupplier taskContextSupplier,
       Option<BaseKeyGenerator> keyGeneratorOpt) {
+    LOG.info("Create update handle for fileId {} and partition path {} at 
commit {}", fileId, partitionPath, instantTime);
     if (table.requireSortedRecords()) {
       if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
         return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, 
instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
@@ -79,6 +83,7 @@ public class HoodieMergeHandleFactory {
       HoodieBaseFile dataFileToBeMerged,
       TaskContextSupplier taskContextSupplier,
       Option<BaseKeyGenerator> keyGeneratorOpt) {
+    LOG.info("Get updateHandle for fileId {} and partitionPath {} at commit 
{}", fileId, partitionPath, instantTime);
     if (table.requireSortedRecords()) {
       return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, 
keyToNewRecords, partitionPath, fileId,
           dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 3614fcd3de1..40f5ff4c0a6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -987,7 +987,6 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, 
latestDeltacommitTime)) {
         compactIfNecessary(writeClient, latestDeltacommitTime);
       }
-
       writeClient.archive();
       LOG.info("All the table services operations on MDT completed 
successfully");
     } catch (Exception e) {
@@ -1008,6 +1007,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   private void runPendingTableServicesOperations(BaseHoodieWriteClient 
writeClient) {
     // finish off any pending log compaction or compactions operations if any 
from previous attempt.
     writeClient.runAnyPendingCompactions();
+    writeClient.runAnyPendingLogCompactions();
   }
 
   /**
@@ -1025,13 +1025,27 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
     final String compactionInstantTime = 
HoodieTableMetadataUtil.createCompactionTimestamp(latestDeltacommitTime);
+
     // we need to avoid checking compaction w/ same instant again.
     // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
-    if 
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
-        && writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
+      LOG.info(String.format("Compaction with same %s time is already present 
in the timeline.", compactionInstantTime));
+    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+      LOG.info("Compaction is scheduled for timestamp " + 
compactionInstantTime);
       writeClient.compact(compactionInstantTime);
+    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+      // Schedule and execute log compaction with suffixes based on the same 
instant time. This ensures that any future
+      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
+      // metadata table.
+      final String logCompactionInstantTime = 
HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
+      if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
 {
+        LOG.info(String.format("Log compaction with same %s time is already 
present in the timeline.", logCompactionInstantTime));
+      } else if 
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, 
Option.empty())) {
+        LOG.info("Log compaction is scheduled for timestamp " + 
logCompactionInstantTime);
+        writeClient.logCompact(logCompactionInstantTime);
+      }
     }
   }
 
@@ -1063,7 +1077,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
     // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
     // a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
-    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
+    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, the latest compaction instant time in 
MDT represents
     // any instants before that is already synced with metadata table.
     // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
     // instant before c4 is synced with metadata table.
@@ -1078,6 +1092,17 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       return false;
     }
 
+    // Check if there are any pending compaction or log compaction instants in 
the timeline.
+    // If pending compact/logcompaction operations are found abort scheduling 
new compaction/logcompaction operations.
+    Option<HoodieInstant> pendingLogCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+    Option<HoodieInstant> pendingCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+    if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+      LOG.warn(String.format("Not scheduling compaction or logcompaction, 
since a pending compaction instant %s or logcompaction %s instant is present",
+          pendingCompactionInstant, pendingLogCompactionInstant));
+      return false;
+    }
     return true;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 10d42444049..f431283ac7a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -111,6 +111,10 @@ public class HoodieMetadataWriteUtils {
             // deltacommits having corresponding completed commits. Therefore, 
we need to compact all fileslices of all
             // partitions together requiring UnBoundedCompactionStrategy.
             .withCompactionStrategy(new UnBoundedCompactionStrategy())
+            // Check if log compaction is enabled, this is needed for tables 
with lot of records.
+            
.withLogCompactionEnabled(writeConfig.isLogCompactionEnabledOnMetadata())
+            // Below config is only used if isLogCompactionEnabled is set.
+            
.withLogCompactionBlocksThreshold(writeConfig.getMetadataLogCompactBlocksThreshold())
             .build())
         .withParallelism(parallelism, parallelism)
         .withDeleteParallelism(parallelism)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index 800e6a4acea..c6fa1f4f2b2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -24,16 +24,21 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Base class helps to perform compact.
@@ -96,4 +101,20 @@ public class CompactHelpers<T, I, K, O> {
           "Failed to commit " + table.getMetaClient().getBasePath() + " at 
time " + logCompactionCommitTime, e);
     }
   }
+
+  public Option<InstantRange> getInstantRange(HoodieTableMetaClient 
metaClient) {
+    return 
HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())
+        ? Option.of(getMetadataLogReaderInstantRange(metaClient)) : 
Option.empty();
+  }
+
+  private InstantRange getMetadataLogReaderInstantRange(HoodieTableMetaClient 
metadataMetaClient) {
+    HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+        .setConf(metadataMetaClient.getHadoopConf())
+        
.setBasePath(HoodieTableMetadata.getDatasetBasePath(metadataMetaClient.getBasePathV2().toString()))
+        .build();
+    Set<String> validInstants = 
HoodieTableMetadataUtil.getValidInstantTimestamps(dataMetaClient, 
metadataMetaClient);
+    return InstantRange.builder()
+        .rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
+        .explicitInstants(validInstants).build();
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 0566b39b478..906ea6473a4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -42,8 +42,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.IOUtils;
-import org.apache.hudi.metadata.HoodieBackedTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieCompactionHandler;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -60,7 +58,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 import java.util.stream.StreamSupport;
 
 import static java.util.stream.Collectors.toList;
@@ -131,8 +128,7 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     context.setJobStatus(this.getClass().getSimpleName(), "Compacting file 
slices: " + config.getTableName());
     TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
     // if this is a MDT, set up the instant range of log reader just like 
regular MDT snapshot reader.
-    Option<InstantRange> instantRange = 
HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
-        ? Option.of(getMetadataLogReaderInstantRange(metaClient)) : 
Option.empty();
+    Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
     return context.parallelize(operations).map(operation -> compact(
         compactionHandler, metaClient, config, operation, 
compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, 
executionHelper))
         .flatMap(List::iterator);
@@ -197,6 +193,7 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
         .withLogFilePaths(logFiles)
         .withReaderSchema(readerSchema)
         
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, 
maxInstantTime))
+        .withInstantRange(instantRange)
         
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
         .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
         .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
@@ -256,17 +253,6 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     }).collect(toList());
   }
 
-  private InstantRange getMetadataLogReaderInstantRange(HoodieTableMetaClient 
metadataMetaClient) {
-    HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
-        .setConf(metadataMetaClient.getHadoopConf())
-        
.setBasePath(HoodieTableMetadata.getDatasetBasePath(metadataMetaClient.getBasePath()))
-        .build();
-    Set<String> validInstants = 
HoodieBackedTableMetadata.getValidInstantTimestamps(dataMetaClient, 
metadataMetaClient);
-    return InstantRange.builder()
-        .rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
-        .explicitInstants(validInstants).build();
-  }
-
   public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
     String maxInstantTime = metaClient
         
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index 3f86df7e3ad..5bd1894f26d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -108,6 +108,7 @@ public class RunCompactionActionExecutor<T> extends
         metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, 
schemaPair.getLeft().get());
         metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaPair.getRight().get());
       }
+      // Setting operationType, which is compact.
       metadata.setOperationType(operationType);
       compactionMetadata.setWriteStatuses(statuses);
       compactionMetadata.setCommitted(false);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 55740269a83..e8d109f7b76 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -40,6 +41,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +84,6 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
 
     // filter the partition paths if needed to reduce list status
     partitionPaths = filterPartitionPathsByStrategy(writeConfig, 
partitionPaths);
-
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no compaction plan
       return null;
@@ -91,6 +92,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for 
files to compact: " + writeConfig.getTableName());
 
     SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
this.hoodieTable.getSliceView();
+    // Exclude file groups under compaction.
     Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
         .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
         .collect(Collectors.toSet());
@@ -98,6 +100,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     // Exclude files in pending clustering from compaction.
     
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
 
+    // Exclude files in pending logcompaction.
     if (filterLogCompactionOperations()) {
       
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
           .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
@@ -108,10 +111,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
         
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
             HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
         .filterCompletedInstants().lastInstant().get().getTimestamp();
+    LOG.info("Last completed instant time " + lastCompletedInstantTime);
+    Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
 
     List<HoodieCompactionOperation> operations = 
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
         .getLatestFileSlices(partitionPath)
-        .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, 
fgIdsInPendingCompactionAndClustering))
+        .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, 
fgIdsInPendingCompactionAndClustering, instantRange))
         .map(s -> {
           List<HoodieLogFile> logFiles =
               
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
@@ -158,7 +163,8 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     return partitionPaths;
   }
 
-  protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime, Set<HoodieFileGroupId> pendingFileGroupIds) {
+  protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime,
+                                    Set<HoodieFileGroupId> 
pendingFileGroupIds, Option<InstantRange> instantRange) {
     return fileSlice.getLogFiles().count() > 0 && 
!pendingFileGroupIds.contains(fileSlice.getFileGroupId());
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 920187e218f..2b704726580 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -28,7 +28,9 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.LogCompactionExecutionHelper;
@@ -63,9 +65,10 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
   }
 
   @Override
-  protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime, Set<HoodieFileGroupId> pendingFileGroupIds) {
-    return isFileSliceEligibleForLogCompaction(fileSlice, 
lastCompletedInstantTime)
-        && super.filterFileSlice(fileSlice, lastCompletedInstantTime, 
pendingFileGroupIds);
+  protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime,
+                                    Set<HoodieFileGroupId> 
pendingFileGroupIds, Option<InstantRange> instantRange) {
+    return isFileSliceEligibleForLogCompaction(fileSlice, 
lastCompletedInstantTime, instantRange)
+        && super.filterFileSlice(fileSlice, lastCompletedInstantTime, 
pendingFileGroupIds, instantRange);
   }
 
   @Override
@@ -78,7 +81,8 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
    * @param fileSlice File Slice under consideration.
    * @return Boolean value that determines whether log compaction will be 
scheduled or not.
    */
-  private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, 
String maxInstantTime) {
+  private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, 
String maxInstantTime,
+                                                      Option<InstantRange> 
instantRange) {
     LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition "
         + fileSlice.getPartitionPath() + " eligible for log compaction.");
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
@@ -90,13 +94,15 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
             .map(file -> file.getPath().toString())
             .collect(Collectors.toList()))
         .withLatestInstantTime(maxInstantTime)
+        .withInstantRange(instantRange)
         .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
         .withOptimizedLogBlocksScan(true)
         .withRecordMerger(writeConfig.getRecordMerger())
         .build();
     scanner.scan(true);
     int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
-    LOG.info("Total blocks seen are " + totalBlocks);
+    LOG.info("Total blocks seen are " + totalBlocks + ", log blocks threshold 
is "
+        + writeConfig.getLogCompactionBlocksThreshold());
 
     // If total blocks in the file slice is > blocks threshold value(default 
value is 5).
     // Log compaction can be scheduled.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 9378274a12f..d779eea4cc6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -186,4 +186,4 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     }
     return writeClient;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index a52af34429f..6079d339317 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -31,12 +31,14 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieLogCompactException;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -53,8 +55,13 @@ import org.slf4j.LoggerFactory;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<JavaRDD<WriteStatus>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkRDDTableServiceClient.class);
@@ -86,13 +93,29 @@ public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String 
logCompactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+
+    // Check if a commit or compaction instant with a greater timestamp is on 
the timeline.
+    // If a instant is found then abort log compaction, since it is no longer 
needed.
+    Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
COMPACTION_ACTION);
+    Option<HoodieInstant> compactionInstantWithGreaterTimestamp =
+        Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream()
+            .filter(hoodieInstant -> 
actions.contains(hoodieInstant.getAction()))
+            .filter(hoodieInstant -> 
HoodieTimeline.compareTimestamps(hoodieInstant.getTimestamp(),
+                GREATER_THAN, logCompactionInstantTime))
+            .findFirst());
+    if (compactionInstantWithGreaterTimestamp.isPresent()) {
+      throw new HoodieLogCompactException(String.format("Cannot log compact 
since a compaction instant with greater "
+          + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
+    }
+
     HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
     if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
       LOG.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
       table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
       table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Inflight logcompaction file exists");
+      throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
+          + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
     }
     logCompactionTimer = metrics.getLogCompactionCtx();
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
@@ -131,7 +154,7 @@ public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<
     if (compactionTimer != null) {
       long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
       
HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
 ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.COMPACTION_ACTION)
+          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, COMPACTION_ACTION)
       );
     }
     LOG.info("Compacted successfully on commit " + compactionCommitTime);
@@ -157,8 +180,8 @@ public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionCommitTime)
         .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+    if (logCompactionTimer != null) {
+      long durationInMs = metrics.getDurationInMs(logCompactionTimer.stop());
       
HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant
 ->
           metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
       );
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 3b99535528f..84e0671f8c7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -150,7 +150,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
         metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
       }
 
-      if 
(!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
+      if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
         // if this is a new commit being applied to metadata for the first time
         writeClient.startCommitWithTime(instantTime);
       } else {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
index ca55e10703f..a04182e3379 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
@@ -377,7 +377,7 @@ public class TestDataValidationCheckForLogCompactionActions 
extends HoodieClient
     // Create logcompaction client.
     HoodieWriteConfig logCompactionConfig = 
HoodieWriteConfig.newBuilder().withProps(config2.getProps())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withLogCompactionBlocksThreshold("2").build())
+            .withLogCompactionBlocksThreshold(2).build())
         .build();
     SparkRDDWriteClient logCompactionClient = new SparkRDDWriteClient(context, 
logCompactionConfig);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index a5d30b453a5..21fc571bb44 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2778,6 +2778,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
     });
 
+    // TODO: include validation for record_index partition here.
     LOG.info("Validation time=" + timer.endTimer());
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 9c1acc19453..4073e63bd12 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -63,7 +63,6 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase 
{
@@ -143,7 +142,8 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   @Test
   public void testLogCompactionOnMORTable() throws Exception {
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
-        .withLogCompactionBlocksThreshold("1")
+        .withMaxNumDeltaCommitsBeforeCompaction(1)
+        .withLogCompactionBlocksThreshold(1)
         .build();
     HoodieWriteConfig config = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
         
HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build();
@@ -200,8 +200,9 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   @Test
   public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception {
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
-        .withLogCompactionBlocksThreshold("1")
         .withEnableOptimizedLogBlocksScan("true")
+        .withMaxNumDeltaCommitsBeforeCompaction(1)
+        .withLogCompactionBlocksThreshold(1)
         .build();
     HoodieWriteConfig config = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
         
HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build();
@@ -244,7 +245,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   public void testSchedulingLogCompactionAfterSchedulingCompaction() throws 
Exception {
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
         .withMaxNumDeltaCommitsBeforeCompaction(1)
-        .withLogCompactionBlocksThreshold("1")
+        .withLogCompactionBlocksThreshold(1)
         .build();
     HoodieWriteConfig config = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
         
HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build();
@@ -279,7 +280,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   public void testSchedulingCompactionAfterSchedulingLogCompaction() throws 
Exception {
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
         .withMaxNumDeltaCommitsBeforeCompaction(1)
-        .withLogCompactionBlocksThreshold("1")
+        .withLogCompactionBlocksThreshold(1)
         .build();
     HoodieWriteConfig config = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
         HoodieIndex.IndexType.INMEMORY).withAutoCommit(true)
@@ -294,11 +295,10 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             .build())
         .build();
     SparkRDDWriteClient client = getHoodieWriteClient(config);
-
     // First insert
     String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     insertBatch(config, client, newCommitTime, "000", 100,
-        SparkRDDWriteClient::insert, false, false, 100, 100,
+        SparkRDDWriteClient::insert, false, false, 10, 100,
         1, Option.empty());
 
     String prevCommitTime = newCommitTime;
@@ -306,17 +306,15 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
     newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     updateBatch(config, client, newCommitTime, prevCommitTime,
         Option.of(Arrays.asList(prevCommitTime)), "000", 50, 
SparkRDDWriteClient::upsert,
-        false, false, 50, 100, 2, config.populateMetaFields());
+        false, false, 50, 10, 2, config.populateMetaFields());
 
     // Schedule log compaction
     Option<String> logCompactionTimeStamp = 
client.scheduleLogCompaction(Option.empty());
     assertTrue(logCompactionTimeStamp.isPresent());
 
-    // Try scheduling compaction, it won't succeed
+    // Even if pending logcompaction plans are in the timeline, compaction 
plan can be created.
     Option<String> compactionTimeStamp = 
client.scheduleCompaction(Option.empty());
     assertTrue(compactionTimeStamp.isPresent());
-    client.compact(compactionTimeStamp.get());
-    assertThrows(Exception.class, () -> 
client.logCompact(logCompactionTimeStamp.get()));
   }
 
   @Test
@@ -376,7 +374,8 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   @Test
   public void testRollbackOnLogCompaction() throws Exception {
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
-        .withLogCompactionBlocksThreshold("1")
+        .withMaxNumDeltaCommitsBeforeCompaction(1)
+        .withLogCompactionBlocksThreshold(1)
         .build();
     HoodieWriteConfig lcConfig = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 
HoodieIndex.IndexType.INMEMORY)
         .withAutoCommit(false).withCompactionConfig(compactionConfig).build();
@@ -475,7 +474,8 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   @Test
   public void testArchivalOnLogCompaction() throws Exception {
     HoodieCompactionConfig logCompactionConfig = 
HoodieCompactionConfig.newBuilder()
-        .withLogCompactionBlocksThreshold("2")
+        .withMaxNumDeltaCommitsBeforeCompaction(1)
+        .withLogCompactionBlocksThreshold(1)
         .build();
     HoodieWriteConfig lcWriteConfig = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
         
HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(logCompactionConfig).build();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 184c7880c9e..43dea6d3b83 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -313,8 +313,8 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
   public void testLogBlocksCountsAfterLogCompaction(boolean 
populateMetaFields) throws Exception {
 
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
-        .withInlineCompaction(false)
-        .withLogCompactionBlocksThreshold("1")
+        .withMaxNumDeltaCommitsBeforeCompaction(1)
+        .withLogCompactionBlocksThreshold(1)
         .build();
     // insert 100 recordsx
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 6f9615578fa..4f1f4afaf74 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -94,6 +94,18 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
       .sinceVersion("0.7.0")
       .withDocumentation("Controls how often the metadata table is 
compacted.");
 
+  public static final ConfigProperty<String> 
ENABLE_LOG_COMPACTION_ON_METADATA_TABLE = ConfigProperty
+      .key(METADATA_PREFIX + ".log.compaction.enable")
+      .defaultValue("false")
+      .sinceVersion("0.14")
+      .withDocumentation("This configs enables logcompaction for the metadata 
table.");
+
+  // Log blocks threshold, after a file slice crosses this threshold log 
compact operation is scheduled.
+  public static final ConfigProperty<Integer> LOG_COMPACT_BLOCKS_THRESHOLD = 
ConfigProperty
+      .key(METADATA_PREFIX + ".log.compaction.blocks.threshold")
+      .defaultValue(5)
+      .withDocumentation("Controls the criteria to log compacted files groups 
in metadata table.");
+
   // Regex to filter out matching directories during bootstrap
   public static final ConfigProperty<String> DIR_FILTER_REGEX = ConfigProperty
       .key(METADATA_PREFIX + ".dir.filter.regex")
@@ -408,6 +420,16 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withLogCompactionEnabled(boolean enableLogCompaction) {
+      metadataConfig.setValue(ENABLE_LOG_COMPACTION_ON_METADATA_TABLE, 
Boolean.toString(enableLogCompaction));
+      return this;
+    }
+
+    public Builder withLogCompactBlocksThreshold(int 
logCompactBlocksThreshold) {
+      metadataConfig.setValue(LOG_COMPACT_BLOCKS_THRESHOLD, 
Integer.toString(logCompactBlocksThreshold));
+      return this;
+    }
+
     public Builder withFileListingParallelism(int parallelism) {
       metadataConfig.setValue(FILE_LISTING_PARALLELISM_VALUE, 
String.valueOf(parallelism));
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index bf220ca7847..ef4eec1ba07 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -332,7 +332,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     private String keyFieldOverride;
     // By default, we're doing a full-scan
     private boolean forceFullScan = true;
-    // Use scanV2 method.
     private boolean enableOptimizedLogBlocksScan = false;
     private HoodieRecordMerger recordMerger;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieLogCompactException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieLogCompactException.java
new file mode 100644
index 00000000000..888327bad55
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieLogCompactException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieLogCompactException extends HoodieException {
+
+  public HoodieLogCompactException(String msg) {
+    super(msg);
+  }
+
+  public HoodieLogCompactException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 2639cd350d4..7ca9d6573e8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -20,9 +20,6 @@ package org.apache.hudi.metadata;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
-import org.apache.hudi.avro.model.HoodieRestoreMetadata;
-import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
@@ -37,10 +34,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.HoodieTimer;
@@ -50,7 +44,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
@@ -67,7 +60,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -83,7 +75,6 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BL
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
 
 /**
  * Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -469,37 +460,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return Pair.of(baseFileReader, baseFileOpenMs);
   }
 
-  public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient 
dataMetaClient, HoodieTableMetaClient metadataMetaClient) {
-    // Only those log files which have a corresponding completed instant on 
the dataset should be read
-    // This is because the metadata table is updated before the dataset 
instants are committed.
-    HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
-    Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
-        .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
-
-    // We should also add completed indexing delta commits in the metadata 
table, as they do not
-    // have corresponding completed instant in the data table
-    validInstantTimestamps.addAll(
-        metadataMetaClient.getActiveTimeline()
-            .filter(instant -> instant.isCompleted() && 
isIndexingCommit(instant.getTimestamp()))
-            .getInstants().stream()
-            .map(HoodieInstant::getTimestamp)
-            .collect(Collectors.toList()));
-
-    // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
-    // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
-    // instant which we have a log block for.
-    final String earliestInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
-    
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
-        .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, earliestInstantTime))
-        .forEach(instant -> {
-          validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
-        });
-
-    // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
-    validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
-    return validInstantTimestamps;
-  }
-
   public Pair<HoodieMetadataLogRecordReader, Long> 
getLogRecordScanner(List<HoodieLogFile> logFiles,
                                                                        String 
partitionName,
                                                                        
Option<Boolean> allowFullScanOverride) {
@@ -511,7 +471,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
     // Only those log files which have a corresponding completed instant on 
the dataset should be read
     // This is because the metadata table is updated before the dataset 
instants are committed.
-    Set<String> validInstantTimestamps = 
getValidInstantTimestamps(dataMetaClient, metadataMetaClient);
+    Set<String> validInstantTimestamps = HoodieTableMetadataUtil
+        .getValidInstantTimestamps(dataMetaClient, metadataMetaClient);
 
     Option<HoodieInstant> latestMetadataInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
     String latestMetadataInstantTime = 
latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
@@ -559,46 +520,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     }
   }
 
-  /**
-   * Returns a list of commits which were rolled back as part of a Rollback or 
Restore operation.
-   *
-   * @param instant  The Rollback operation to read
-   * @param timeline instant of timeline from dataset.
-   */
-  private static List<String> getRollbackedCommits(HoodieInstant instant, 
HoodieActiveTimeline timeline) {
-    try {
-      List<String> commitsToRollback = null;
-      if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
-        try {
-          HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-              timeline.getInstantDetails(instant).get());
-          commitsToRollback = rollbackMetadata.getCommitsRollback();
-        } catch (IOException e) {
-          // if file is empty, fetch the commits to rollback from 
rollback.requested file
-          HoodieRollbackPlan rollbackPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(
-              timeline.readRollbackInfoAsBytes(new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
-                  instant.getTimestamp())).get(), HoodieRollbackPlan.class);
-          commitsToRollback = 
Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
-          LOG.warn("Had to fetch rollback info from requested instant since 
completed file is empty " + instant.toString());
-        }
-        return commitsToRollback;
-      }
-
-      List<String> rollbackedCommits = new LinkedList<>();
-      if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) {
-        // Restore is made up of several rollbacks
-        HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
-            timeline.getInstantDetails(instant).get());
-        restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
-          rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback()));
-        });
-      }
-      return rollbackedCommits;
-    } catch (IOException e) {
-      throw new HoodieMetadataException("Error retrieving rollback commits for 
instant " + instant, e);
-    }
-  }
-
   @Override
   public void close() {
     closePartitionReaders();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 0308c79c9d3..45ef7b5ecc9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -30,6 +30,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -49,6 +50,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
@@ -96,6 +98,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 
 /**
  * A utility to convert timeline information to metadata table records.
@@ -111,6 +114,7 @@ public class HoodieTableMetadataUtil {
   // Suffix to use for compaction
   private static final String COMPACTION_TIMESTAMP_SUFFIX = "001";
 
+
   // Suffix to use for clean
   private static final String CLEAN_TIMESTAMP_SUFFIX = "002";
 
@@ -119,6 +123,9 @@ public class HoodieTableMetadataUtil {
   // to avoid collision.
   public static final String METADATA_INDEXER_TIME_SUFFIX = "004";
 
+  // Suffix to use for log compaction
+  private static final String LOG_COMPACTION_TIMESTAMP_SUFFIX = "005";
+
   // This suffix and all after that are used for initialization of the various 
partitions. The unused suffixes lower than this value
   // are reserved for future operations on the MDT.
   public static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
@@ -1325,6 +1332,39 @@ public class HoodieTableMetadataUtil {
     return inflightAndCompletedPartitions;
   }
 
+  public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient 
dataMetaClient,
+                                                      HoodieTableMetaClient 
metadataMetaClient) {
+    // Only those log files which have a corresponding completed instant on 
the dataset should be read
+    // This is because the metadata table is updated before the dataset 
instants are committed.
+    HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
+    Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
+        .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+    // We should also add completed indexing delta commits in the metadata 
table, as they do not
+    // have corresponding completed instant in the data table
+    validInstantTimestamps.addAll(
+        metadataMetaClient.getActiveTimeline()
+            .filter(instant -> instant.isCompleted()
+                && (isIndexingCommit(instant.getTimestamp()) || 
isLogCompactionInstant(instant)))
+            .getInstantsAsStream()
+            .map(HoodieInstant::getTimestamp)
+            .collect(Collectors.toList()));
+
+    // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
+    // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
+    // instant which we have a log block for.
+    final String earliestInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+    
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
+        .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, earliestInstantTime))
+        .forEach(instant -> {
+          validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
+        });
+
+    // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
+    validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 
PARTITION_INITIALIZATION_TIME_SUFFIX));
+    return validInstantTimestamps;
+  }
+
   /**
    * Checks if a delta commit in metadata table is written by async indexer.
    * <p>
@@ -1339,6 +1379,58 @@ public class HoodieTableMetadataUtil {
             && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
   }
 
+  /**
+   * This method returns true if the instant provided belongs to Log 
compaction instant.
+   * For metadata table, log compaction instant are created with Suffix "004" 
provided in LOG_COMPACTION_TIMESTAMP_SUFFIX.
+   * @param instant Hoodie completed instant.
+   * @return true for logcompaction instants flase otherwise.
+   */
+  public static boolean isLogCompactionInstant(HoodieInstant instant) {
+    return instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)
+        && instant.getTimestamp().length() == MILLIS_INSTANT_ID_LENGTH + 
LOG_COMPACTION_TIMESTAMP_SUFFIX.length()
+        && instant.getTimestamp().endsWith(LOG_COMPACTION_TIMESTAMP_SUFFIX);
+  }
+
+  /**
+   * Returns a list of commits which were rolled back as part of a Rollback or 
Restore operation.
+   *
+   * @param instant  The Rollback operation to read
+   * @param timeline instant of timeline from dataset.
+   */
+  private static List<String> getRollbackedCommits(HoodieInstant instant, 
HoodieActiveTimeline timeline) {
+    try {
+      List<String> commitsToRollback;
+      if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
+        try {
+          HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+              timeline.getInstantDetails(instant).get());
+          commitsToRollback = rollbackMetadata.getCommitsRollback();
+        } catch (IOException e) {
+          // if file is empty, fetch the commits to rollback from 
rollback.requested file
+          HoodieRollbackPlan rollbackPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(
+              timeline.readRollbackInfoAsBytes(new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
+                  instant.getTimestamp())).get(), HoodieRollbackPlan.class);
+          commitsToRollback = 
Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
+          LOG.warn("Had to fetch rollback info from requested instant since 
completed file is empty " + instant.toString());
+        }
+        return commitsToRollback;
+      }
+
+      List<String> rollbackedCommits = new LinkedList<>();
+      if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) {
+        // Restore is made up of several rollbacks
+        HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+            timeline.getInstantDetails(instant).get());
+        restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
+          rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback()));
+        });
+      }
+      return rollbackedCommits;
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Error retrieving rollback commits for 
instant " + instant, e);
+    }
+  }
+
   /**
    * Delete the metadata table for the dataset and backup if required.
    *
@@ -1521,4 +1613,11 @@ public class HoodieTableMetadataUtil {
   public static String createIndexInitTimestamp(String timestamp, int offset) {
     return String.format("%s%03d", timestamp, 
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
   }
+
+  /**
+   * Create the timestamp for a compaction operation on the metadata table.
+   */
+  public static String createLogCompactionTimestamp(String timestamp) {
+    return timestamp + LOG_COMPACTION_TIMESTAMP_SUFFIX;
+  }
 }

Reply via email to