nsivabalan commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264287885


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -170,7 +170,7 @@ public interface HoodieTimeline extends Serializable {
    *
    * @return
    */
-  HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+  HoodieTimeline filterCompletedAndRewriteInstants();

Review Comment:
   we should name it as "filterCompletedRewriteInstants"
   
   rewrite refers to commit, delta commits and replace commits. 
   completed refers to state. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -53,6 +53,7 @@ public static Comparator<String> 
getReverseCommitTimeComparator() {
 
   /**
    * Timeline, based on which all getter work.
+   * This should be a write timeline that contains either completed instants 
or pending compaction instants.

Review Comment:
   can we rename the variable then.
   completedWriteAndCompactionTimeline



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -146,17 +189,22 @@ public static class TimelineDiffResult {
 
     private final List<HoodieInstant> newlySeenInstants;
     private final List<HoodieInstant> finishedCompactionInstants;
-    private final List<HoodieInstant> finishedOrRemovedLogCompactionInstants;
+    // Completed instants will have true as the value where as instants 
removed due to rollback will have false as value.
+    private final List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants;
+    // Completed instants will have true as the value where as instants 
removed due to rollback will have false as value.
+    private final List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants;
     private final boolean canSyncIncrementally;
 
     public static final TimelineDiffResult UNSAFE_SYNC_RESULT =
-        new TimelineDiffResult(null, null, null, false);
+        new TimelineDiffResult(null, null, null, null, false);
 
     public TimelineDiffResult(List<HoodieInstant> newlySeenInstants, 
List<HoodieInstant> finishedCompactionInstants,
-                              List<HoodieInstant> 
finishedOrRemovedLogCompactionInstants, boolean canSyncIncrementally) {
+                              List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants,
+                              List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants, boolean canSyncIncrementally) {
       this.newlySeenInstants = newlySeenInstants;
       this.finishedCompactionInstants = finishedCompactionInstants;
       this.finishedOrRemovedLogCompactionInstants = 
finishedOrRemovedLogCompactionInstants;
+      this.finishedOrRemovedReplaceCommitInstants = 
finishedOrRemovedReplaceCommitInstants;

Review Comment:
   if your ans to my previous comment is, only clustering, should we rename all 
these variables accordingly. it might confuse down the line. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // Check for replace commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants = 
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, logcompaciton, replacecommit)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,

Review Comment:
   btw, we are not interested in any replace commit right. only the clustering 
ones? can you point me to the code where we filter only for clustering replace 
commits? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,14 +134,20 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteTimeline();
+    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline
+        .getWriteTimeline()
+        .filterCompletedAndCompactionInstants();
   }
 
   /**
    * Adds the provided statuses into the file system view, and also caches it 
inside this object.
    */
   public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
-    HoodieTimer timer = HoodieTimer.start();
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    // Timeline determines whether a file slice is committed or not. Timeline 
object is attached to each HoodieFileGroup.
+    // Using this timeline object HoodieFileGroup determines whether a 
fileSlice is committed or not.
+    // visibleCommitsAndCompactionTimeline contains entire write timeline with 
pending commits, so filtering it

Review Comment:
   this comment does not make sense. 
   in L 137, we already filter for completed and compaction timeline. So, why 
add this comment here. and infact we don't do any filtering in this code 
snippet (L146 to 150). So its confusing.
   or did you meant to add this comment in L137 ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)

Review Comment:
   can we simplifly the map as below 
   ```
   .map(instantPair -> Pair.of(instantPair.getKey(), instantPair.getValue() != 
null))
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1246,6 +1301,19 @@ Stream<FileSlice> fetchAllFileSlices(String 
partitionPath) {
         .flatMap(HoodieFileGroup::getAllFileSlices);
   }
 
+  /**
+   * Default implementation for fetching all file-slices for a partition-path.
+   *
+   * @param partitionPath Partition path
+   * @return file-slice stream
+   */
+  Stream<FileSlice> fetchAllFileSlices(String partitionPath, boolean 
includePending) {

Review Comment:
   same comment as above. can we try to re use code as much as possible. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java:
##########
@@ -137,7 +137,7 @@ public void close() {
    */
   private static RocksDbBasedFileSystemView 
createRocksDBBasedFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
-    HoodieTimeline timeline = 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+    HoodieTimeline timeline = metaClient.getActiveTimeline();

Review Comment:
   can you tell me how did you chase and fix all these? 
   found the caller stack trace for 
AbstractHoodieFileSystemview.init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActiveTimeline) and fixed all of them ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -170,15 +176,25 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[] 
statuses) {
 
   /**
    * Build FileGroups from passed in file-status.
+   * TODO: Note here timeline is completedAndCompactionTimeline.

Review Comment:
   can you rename the "timeline" to "completedWriteAndCompactionTimeline" so 
that all the call stack knows what this timeline is about. 
   in fact, we can chase and fix all places. 
   for eg, we can rename in the constructor of HoodieFileGroup as well. 
   
   will avoid mistakes by someone in future who might make changes only in 
HoodieFileGroup for instance. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -787,6 +803,21 @@ public final Stream<FileSlice> getLatestFileSlices(String 
partitionStr) {
     }
   }
 
+  @Override
+  public final Stream<FileSlice> getLatestFileSlices(String partitionStr, 
boolean includePending) {

Review Comment:
   why can't we make one of this call the other. 
   getLatestFileSlices(String partitionStr)
   
   I see opportunity for code re-use here as well
   ```
     /**
      * Default implementation for fetching latest file-slices for a partition 
path.
      */
     Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
       return 
fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlice).filter(Option::isPresent)
           .map(Option::get);
     }
   
     /**
      * Default implementation for fetching latest file-slices for a partition 
path.
      */
     Stream<FileSlice> fetchLatestFileSlices(String partitionPath, boolean 
includePending) {
       return fetchAllStoredFileGroups(partitionPath).map(fileGroup -> 
fileGroup.getLatestFileSlice(includePending)).filter(Option::isPresent)
           .map(Option::get);
     }
   ```
   
   We can just call fileGroup.getLatestFileSlice(false) in other method. 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction

Review Comment:
   minor. 
   `Remove pending log compaction instant` 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java:
##########
@@ -544,10 +574,13 @@ protected Option<HoodieInstant> getReplaceInstant(final 
HoodieFileGroupId fileGr
   }
 
   private Stream<HoodieFileGroup> getFileGroups(Stream<FileSlice> sliceStream) 
{
+    // HoodieFileGroup using timeline to determine whether a file slice is 
committed or not so passing
+    // completedAndCompactionTimeline object while creating HoodieFileGroup.
+    HoodieTimeline completedAndCompactionTimeline = 
getVisibleCommitsAndCompactionTimeline();

Review Comment:
   may be we shd name this
   completedWriteAndCompactionTimeline 



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java:
##########
@@ -325,6 +325,15 @@ private void registerFileSlicesAPI() {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INCLUDE_PENDING_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_SLICES", 1);

Review Comment:
   shouldn't we fix this to LATEST_PARTITION_SLICES_INCLUDE_PENDING 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -327,11 +357,43 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add pending replace file groups
+   * @param pendingClusteringInstant pending instant
+   * @throws IOException
+   */
+  private void addPendingClusteringInstant(HoodieInstant 
pendingClusteringInstant) throws IOException {
+    LOG.info("Syncing pending clustering instant (" + pendingClusteringInstant 
+ ")");
+    Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupStreamToInstant =
+        
ClusteringUtils.getFileGroupEntriesFromClusteringInstant(pendingClusteringInstant,
 metaClient);
+    addFileGroupsInPendingClustering(fileGroupStreamToInstant);
+  }
+
+  /**
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction

Review Comment:
   minor. fix the docs. it  talks about compaction while the actual method is 
for clustering



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -145,6 +150,17 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionPath, String max
      */
     Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime);
 
+    /**
+     * Stream all "merged" file-slices before on an instant time If a 
file-group has a pending compaction request, the

Review Comment:
   minor. 
   "before or on an ..." 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // Check for replace commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants = 
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, logcompaciton, replacecommit)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+                                                                               
              HoodieTimeline newTimeline) {
+    Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+    return 
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant -> 
{
+      if (newTimelineInstants.contains(instant)) {

Review Comment:
   guess this is what danny was proposing 
   ```
     private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
                                                                                
                Function<HoodieTimeline, HoodieTimeline> 
getPendingActionTimelineFromOld,
                                                                                
                HoodieTimeline newTimeline,
                                                                                
                String completedAction, String pendingAction) {
       Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
   
       return 
getPendingActionTimelineFromOld.apply(oldTimeline).getInstantsAsStream().map(instant
 -> {
         if (newTimelineInstants.contains(instant)) {
           return Pair.of(instant, instant);
         } else {
           HoodieInstant completedInstant =
               new HoodieInstant(State.COMPLETED, completedAction, 
instant.getTimestamp());
           if (newTimelineInstants.contains(completedInstant)) {
             return Pair.of(instant, completedInstant);
           }
           HoodieInstant inflightInstant =
               new HoodieInstant(State.INFLIGHT, pendingAction, 
instant.getTimestamp());
           if (newTimelineInstants.contains(inflightInstant)) {
             return Pair.of(instant, inflightInstant);
           }
           return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
         }
       }).collect(Collectors.toList());
     }
   ```
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // Check for replace commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants = 
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, logcompaciton, replacecommit)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+                                                                               
              HoodieTimeline newTimeline) {
+    Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+    return 
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant -> 
{
+      if (newTimelineInstants.contains(instant)) {

Review Comment:
   surya, next time when you push an update, you can add this. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -953,6 +984,18 @@ public final Stream<FileSlice> getAllFileSlices(String 
partitionStr) {
     }
   }
 
+  @Override
+  public final Stream<FileSlice> getAllFileSlices(String partitionStr, boolean 
includePending) {

Review Comment:
   same comment as above



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -159,7 +177,12 @@ private void runIncrementalSync(HoodieTimeline timeline, 
TimelineDiffResult diff
             } else if 
(instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
               addRollbackInstant(timeline, instant);
             } else if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-              addReplaceInstant(timeline, instant);
+              boolean isClusteringCommit = 
ClusteringUtils.isClusteringCommit(metaClient, instant);
+              if (!instant.isCompleted() && isClusteringCommit) {
+                addPendingClusteringInstant(instant);
+              } else {
+                addReplaceInstant(timeline, instant);

Review Comment:
   in L163, we already filter only for clustering replace commits. So, wouldn't 
this mean, addCompletedClusteringInstant. 
   or am I mis-understanding something ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction
+   * operation is failed, so it is no longer need to be tracked as pending.
    *
    * @param instant Log Compaction Instant to be removed
+   * @param isSuccessful true if log compaction operation is successful, false 
if the operation is failed and rollbacked.
    */
-  private void removePendingLogCompactionInstant(HoodieInstant instant) throws 
IOException {
-    LOG.info("Removing completed log compaction instant (" + instant + ")");
-    HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-    
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
-        .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-            
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+  private void removePendingLogCompactionInstant(HoodieInstant instant, 
boolean isSuccessful) throws IOException {
+    if (isSuccessful) {
+      LOG.info("Removing completed log compaction instant (" + instant + ")");
+      HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+      
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
+          .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+              
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+    } else {
+      LOG.info("Removing failed log compaction instant (" + instant + ")");
+      removePendingLogCompactionOperations(instant.getTimestamp());

Review Comment:
   why I don't see this if else with removePendingCompactionInstant. can you 
help me understand please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to