nsivabalan commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264737094


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -165,12 +165,18 @@ public interface HoodieTimeline extends Serializable {
 
   HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
 
+  /**
+   * This is the filtered write timeline that contains only completed and 
pending compaction instants.
+   * This is the timeline that is used in constructing file groups.
+   */
+  HoodieTimeline filterCompletedWriteAndCompactionInstants();
+
   /**
    * Timeline to just include completed commits or all rewrites like 
compaction, logcompaction and replace actions.
    *
    * @return
    */
-  HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+  HoodieTimeline filterCompletedAndRewriteInstants(HoodieTableMetaClient 
metaClient);

Review Comment:
   ok, lets go w/ rewrite for now. but if we can come up w/ a good name, should 
try to use one. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -144,44 +172,54 @@ private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingCompactionTran
    */
   public static class TimelineDiffResult {
 
-    private final List<HoodieInstant> newlySeenInstants;
+    private final List<HoodieInstant> newlySeenCompletedAndRewriteInstants;
     private final List<HoodieInstant> finishedCompactionInstants;

Review Comment:
   synced up f2f. 
   bcoz, log compaction may not result in new file slices. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -123,21 +126,24 @@ public HoodieTimeline 
filterCompletedAndCompactionInstants() {
   }
 
   @Override
-  public HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants() {
-    return new HoodieDefaultTimeline(getInstantsAsStream().filter(s -> 
s.isCompleted()
-        || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) || 
s.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION)), details);
+  public HoodieTimeline getFileSystemViewTimeline() {
+    return new HoodieDefaultTimeline(getInstantsAsStream()
+        .filter(x ->
+            (x.isCompleted() && 
(HoodieTimeline.SAVEPOINT_ACTION.equals(x.getAction()) || 
VALID_WRITE_ACTIONS.contains(x.getAction())))
+                || HoodieTimeline.COMPACTION_ACTION.equals(x.getAction())
+        ), details);
   }
 
   @Override
-  public HoodieDefaultTimeline filterCompletedInstantsOrRewriteTimeline() {
-    Set<String> validActions = CollectionUtils.createSet(COMPACTION_ACTION, 
LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
-    return new HoodieDefaultTimeline(getInstantsAsStream().filter(s -> 
s.isCompleted() || validActions.contains(s.getAction())), details);
+  public HoodieDefaultTimeline filterCompletedAndRewriteInstants() {
+    List<String> validActions = Arrays.asList(COMPACTION_ACTION, 
LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);

Review Comment:
   can we name this rewriteActions



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -130,36 +141,50 @@ private void runIncrementalSync(HoodieTimeline timeline, 
TimelineDiffResult diff
     });
 
     // Now remove pending log compaction instants which were completed or 
removed
-    
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instant 
-> {
+    
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instantPair
 -> {
+      try {
+        removePendingLogCompactionInstant(instantPair.getKey(), 
instantPair.getValue());
+      } catch (IOException e) {
+        throw new HoodieException(e);
+      }
+    });
+
+    // Now remove pending replace instants which were completed or removed
+    
diffResult.getFinishedOrRemovedReplaceInstants().stream().forEach(instantPair 
-> {
       try {
-        removePendingLogCompactionInstant(instant);
+        removePendingFileGroupsInPendingClustering(instantPair.getKey(), 
instantPair.getValue());
       } catch (IOException e) {
         throw new HoodieException(e);
       }
     });
 
-    // Add new completed instants found in the latest timeline, this also 
contains inflight instants.
-    diffResult.getNewlySeenInstants().stream()
-        .filter(instant -> instant.isCompleted()
-            || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)
-            || 
instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION))
+    HoodieTimeline newFileSystemViewTimeline =
+        newActiveTimeline.getFileSystemViewTimeline();
+    // Add new completed instants found in the latest timeline,
+    // this also contains inflight instants from rewrite timeline.
+    diffResult.getNewlySeenCompletedAndRewriteInstants()
         .forEach(instant -> {
           try {
             if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)
                 || 
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
-              addCommitInstant(timeline, instant);
+              addCommitInstant(newFileSystemViewTimeline, instant);

Review Comment:
   sg



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -123,21 +126,24 @@ public HoodieTimeline 
filterCompletedAndCompactionInstants() {
   }
 
   @Override
-  public HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants() {
-    return new HoodieDefaultTimeline(getInstantsAsStream().filter(s -> 
s.isCompleted()
-        || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) || 
s.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION)), details);
+  public HoodieTimeline getFileSystemViewTimeline() {
+    return new HoodieDefaultTimeline(getInstantsAsStream()
+        .filter(x ->
+            (x.isCompleted() && 
(HoodieTimeline.SAVEPOINT_ACTION.equals(x.getAction()) || 
VALID_WRITE_ACTIONS.contains(x.getAction())))
+                || HoodieTimeline.COMPACTION_ACTION.equals(x.getAction())

Review Comment:
   lets add documentation on why compaction needs special treatment. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1246,6 +1304,19 @@ Stream<FileSlice> fetchAllFileSlices(String 
partitionPath) {
         .flatMap(HoodieFileGroup::getAllFileSlices);
   }
 
+  /**
+   * Default implementation for fetching all file-slices for a partition-path.
+   *
+   * @param partitionPath Partition path
+   * @return file-slice stream
+   */
+  Stream<FileSlice> fetchAllFileSlices(String partitionPath, boolean 
includePending) {

Review Comment:
   can you check out the methods used only in tests nad add annotations. 
   also, revisit access specifiers for the ones you are touching. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to