suryaprasanna commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264527544


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -53,6 +53,7 @@ public static Comparator<String> 
getReverseCommitTimeComparator() {
 
   /**
    * Timeline, based on which all getter work.
+   * This should be a write timeline that contains either completed instants 
or pending compaction instants.

Review Comment:
   Good idea, I will use this name across all the classes.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,14 +134,20 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteTimeline();
+    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline
+        .getWriteTimeline()
+        .filterCompletedAndCompactionInstants();
   }
 
   /**
    * Adds the provided statuses into the file system view, and also caches it 
inside this object.
    */
   public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
-    HoodieTimer timer = HoodieTimer.start();
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    // Timeline determines whether a file slice is committed or not. Timeline 
object is attached to each HoodieFileGroup.
+    // Using this timeline object HoodieFileGroup determines whether a 
fileSlice is committed or not.
+    // visibleCommitsAndCompactionTimeline contains entire write timeline with 
pending commits, so filtering it

Review Comment:
   I think this comment was leftover from original PR that I have raised. 
Removed it from here and added it at the appropriate place.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -145,6 +150,17 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionPath, String max
      */
     Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime);
 
+    /**
+     * Stream all "merged" file-slices before on an instant time If a 
file-group has a pending compaction request, the

Review Comment:
   Fixed it.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -170,15 +176,25 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[] 
statuses) {
 
   /**
    * Build FileGroups from passed in file-status.
+   * TODO: Note here timeline is completedAndCompactionTimeline.

Review Comment:
   Yeah, makes sense. We can also avoid doing multiple times filtering on the 
activeTimeline to get this timeline. So, renamed in all the places so that 
developers dont even have to perform extra filtering.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -327,11 +357,43 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add pending replace file groups
+   * @param pendingClusteringInstant pending instant
+   * @throws IOException
+   */
+  private void addPendingClusteringInstant(HoodieInstant 
pendingClusteringInstant) throws IOException {
+    LOG.info("Syncing pending clustering instant (" + pendingClusteringInstant 
+ ")");
+    Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupStreamToInstant =
+        
ClusteringUtils.getFileGroupEntriesFromClusteringInstant(pendingClusteringInstant,
 metaClient);
+    addFileGroupsInPendingClustering(fileGroupStreamToInstant);
+  }
+
+  /**
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction

Review Comment:
   Fixed it.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction
+   * operation is failed, so it is no longer need to be tracked as pending.
    *
    * @param instant Log Compaction Instant to be removed
+   * @param isSuccessful true if log compaction operation is successful, false 
if the operation is failed and rollbacked.
    */
-  private void removePendingLogCompactionInstant(HoodieInstant instant) throws 
IOException {
-    LOG.info("Removing completed log compaction instant (" + instant + ")");
-    HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-    
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
-        .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-            
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+  private void removePendingLogCompactionInstant(HoodieInstant instant, 
boolean isSuccessful) throws IOException {
+    if (isSuccessful) {
+      LOG.info("Removing completed log compaction instant (" + instant + ")");
+      HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+      
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
+          .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+              
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+    } else {
+      LOG.info("Removing failed log compaction instant (" + instant + ")");
+      removePendingLogCompactionOperations(instant.getTimestamp());

Review Comment:
   If a compaction instant is removed then it is not safe to do incremental 
file sync, because there could be removal of instants and also removal of file 
slices, so we fail the check in TimelineDiffHelper and the job automatically 
fallback on doing full sync.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction

Review Comment:
   Fixed it.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -146,17 +189,22 @@ public static class TimelineDiffResult {
 
     private final List<HoodieInstant> newlySeenInstants;
     private final List<HoodieInstant> finishedCompactionInstants;
-    private final List<HoodieInstant> finishedOrRemovedLogCompactionInstants;
+    // Completed instants will have true as the value where as instants 
removed due to rollback will have false as value.
+    private final List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants;
+    // Completed instants will have true as the value where as instants 
removed due to rollback will have false as value.
+    private final List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants;
     private final boolean canSyncIncrementally;
 
     public static final TimelineDiffResult UNSAFE_SYNC_RESULT =
-        new TimelineDiffResult(null, null, null, false);
+        new TimelineDiffResult(null, null, null, null, false);
 
     public TimelineDiffResult(List<HoodieInstant> newlySeenInstants, 
List<HoodieInstant> finishedCompactionInstants,
-                              List<HoodieInstant> 
finishedOrRemovedLogCompactionInstants, boolean canSyncIncrementally) {
+                              List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants,
+                              List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants, boolean canSyncIncrementally) {
       this.newlySeenInstants = newlySeenInstants;
       this.finishedCompactionInstants = finishedCompactionInstants;
       this.finishedOrRemovedLogCompactionInstants = 
finishedOrRemovedLogCompactionInstants;
+      this.finishedOrRemovedReplaceCommitInstants = 
finishedOrRemovedReplaceCommitInstants;

Review Comment:
   Yes, renamed all these variables.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -170,7 +170,7 @@ public interface HoodieTimeline extends Serializable {
    *
    * @return
    */
-  HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+  HoodieTimeline filterCompletedAndRewriteInstants();

Review Comment:
   Here we need to filter based on completed instants and rewrite instants. So, 
with rewrite you can get pending replacecommits, logcompactions and compactions 
with completed you will get all the commits that are completed.
   This API is similar to filterCompletedAndCompactionInstants, but instead of 
just including compactions it also includes other rewrites instants.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -159,7 +177,12 @@ private void runIncrementalSync(HoodieTimeline timeline, 
TimelineDiffResult diff
             } else if 
(instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
               addRollbackInstant(timeline, instant);
             } else if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-              addReplaceInstant(timeline, instant);
+              boolean isClusteringCommit = 
ClusteringUtils.isClusteringCommit(metaClient, instant);
+              if (!instant.isCompleted() && isClusteringCommit) {
+                addPendingClusteringInstant(instant);
+              } else {
+                addReplaceInstant(timeline, instant);

Review Comment:
   Completed insert-overwrite commits also can come here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)

Review Comment:
   Refactored it.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java:
##########
@@ -325,6 +325,15 @@ private void registerFileSlicesAPI() {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INCLUDE_PENDING_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_SLICES", 1);

Review Comment:
   Fixed it.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // Check for replace commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants = 
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, logcompaciton, replacecommit)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,

Review Comment:
   Good catch, I think the filtering is missing in the HoodieTimeline's 
filterCompletedAndRewriteInstants method. I added it now. Also, renamed as 
clustering transitions instead of replace-commit transitions.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java:
##########
@@ -544,10 +574,13 @@ protected Option<HoodieInstant> getReplaceInstant(final 
HoodieFileGroupId fileGr
   }
 
   private Stream<HoodieFileGroup> getFileGroups(Stream<FileSlice> sliceStream) 
{
+    // HoodieFileGroup using timeline to determine whether a file slice is 
committed or not so passing
+    // completedAndCompactionTimeline object while creating HoodieFileGroup.
+    HoodieTimeline completedAndCompactionTimeline = 
getVisibleCommitsAndCompactionTimeline();

Review Comment:
   Refactored it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to