nsivabalan commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264603260


##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1246,6 +1304,19 @@ Stream<FileSlice> fetchAllFileSlices(String 
partitionPath) {
         .flatMap(HoodieFileGroup::getAllFileSlices);
   }
 
+  /**
+   * Default implementation for fetching all file-slices for a partition-path.
+   *
+   * @param partitionPath Partition path
+   * @return file-slice stream
+   */
+  Stream<FileSlice> fetchAllFileSlices(String partitionPath, boolean 
includePending) {

Review Comment:
   do we need access specifier here? private



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java:
##########
@@ -137,7 +137,7 @@ public void close() {
    */
   private static RocksDbBasedFileSystemView 
createRocksDBBasedFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
-    HoodieTimeline timeline = 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+    HoodieTimeline timeline = metaClient.getActiveTimeline();

Review Comment:
   ok



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -129,9 +131,15 @@ public HoodieTimeline 
filterCompletedOrMajorOrMinorCompactionInstants() {
   }
 
   @Override
-  public HoodieDefaultTimeline filterCompletedInstantsOrRewriteTimeline() {
-    Set<String> validActions = CollectionUtils.createSet(COMPACTION_ACTION, 
LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
-    return new HoodieDefaultTimeline(getInstantsAsStream().filter(s -> 
s.isCompleted() || validActions.contains(s.getAction())), details);
+  public HoodieTimeline filterCompletedWriteAndCompactionInstants() {
+    return getWriteTimeline().filterCompletedAndCompactionInstants();
+  }
+
+  @Override
+  public HoodieDefaultTimeline 
filterCompletedAndRewriteInstants(HoodieTableMetaClient metaClient) {

Review Comment:
   filterCompletedWriteAndRewriteInstants 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -165,12 +165,18 @@ public interface HoodieTimeline extends Serializable {
 
   HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
 
+  /**
+   * This is the filtered write timeline that contains only completed and 
pending compaction instants.
+   * This is the timeline that is used in constructing file groups.
+   */
+  HoodieTimeline filterCompletedWriteAndCompactionInstants();
+
   /**
    * Timeline to just include completed commits or all rewrites like 
compaction, logcompaction and replace actions.
    *
    * @return
    */
-  HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+  HoodieTimeline filterCompletedAndRewriteInstants(HoodieTableMetaClient 
metaClient);

Review Comment:
   whats the expected behavior of this api. 
   completed commits and all compaction and all replace actions 
   OR 
   only completed (writes, compaction, replace commits)  
   ? 
   can you fix the naming and java docs accordingly. its kind of mis-leading 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1298,6 +1369,14 @@ Stream<FileSlice> fetchLatestFileSlices(String 
partitionPath) {
         .map(Option::get);
   }
 
+  /**
+   * Default implementation for fetching latest file-slices for a partition 
path.
+   */
+  Stream<FileSlice> fetchLatestFileSlices(String partitionPath, boolean 
includePending) {

Review Comment:
   access specifier? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -144,44 +172,54 @@ private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingCompactionTran
    */
   public static class TimelineDiffResult {
 
-    private final List<HoodieInstant> newlySeenInstants;
+    private final List<HoodieInstant> newlySeenCompletedAndRewriteInstants;
     private final List<HoodieInstant> finishedCompactionInstants;

Review Comment:
   so we are not not interested in removed Compaction instants is it? curious 
to know why this is diff compared to log compactions.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -72,27 +82,75 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
               && instantPair.getValue().isCompleted())
           .map(Pair::getKey).collect(Collectors.toList());
 
-      newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+      // Collect new instants that are not present in 
oldCompletedAndRewriteTimeline
+      // but are present in new active timeline.
+      List<HoodieInstant> newCompletedAndRewriteInstants = new ArrayList<>();
+      newCompletedAndRewriteTimeline.getInstantsAsStream()
+          .filter(instant -> !oldTimelineInstants.contains(instant))
+          .forEach(newCompletedAndRewriteInstants::add);
 
-      List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      // Check for log compaction commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
+          getPendingLogCompactionTransitions(oldTimeline, 
newCompletedAndRewriteTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> Pair.of(instantPair.getKey(), 
instantPair.getValue() != null))
+          .collect(Collectors.toList());
+
+      // Check if replacecommits are completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
+          getPendingReplaceCommitTransitions(oldTimeline, 
newCompletedAndRewriteTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> Pair.of(instantPair.getKey(), 
instantPair.getValue() != null))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, log-compaction, clustering)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newCompletedAndRewriteInstants, 
finishedCompactionInstants, finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(
+      HoodieTimeline oldTimeline, HoodieTimeline 
newCompletedAndRewriteTimeline) {
+    Set<HoodieInstant> newTimelineInstants = 
newCompletedAndRewriteTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+    return 
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant -> 
{
+      if (newTimelineInstants.contains(instant)) {
+        return Pair.of(instant, instant);
+      } else {
+        HoodieInstant completedReplaceCommit =
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instant.getTimestamp());
+        if (newTimelineInstants.contains(completedReplaceCommit)) {
+          return Pair.of(instant, completedReplaceCommit);
+        }
+        HoodieInstant inflightRepaceCommit =
+            new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instant.getTimestamp());
+        if (newTimelineInstants.contains(inflightRepaceCommit)) {
+          return Pair.of(instant, inflightRepaceCommit);
+        }
+        return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
+      }
+    }).collect(Collectors.toList());
+  }
+
   /**
    * Getting pending log compaction transitions.
    */
-  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
-                                                                               
           HoodieTimeline newTimeline) {
-    Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingLogCompactionTransitions(
+      HoodieTimeline oldTimeline, HoodieTimeline 
newCompletedAndRewriteTimeline) {

Review Comment:
   what does newCompletedAndRewriteTimeline refer to?
   completed writes and compaction commits and all rewrite instants (replace 
commit)? 
   or does it include all compaction instants too. then, I feel we shd fix the 
naming. it is mis-leading 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -90,9 +91,11 @@ public abstract class AbstractTableFileSystemView implements 
SyncableFileSystemV
 
   protected HoodieTableMetaClient metaClient;
 
-  // This is the commits timeline that will be visible for all views extending 
this view
-  // This is nothing but the write timeline, which contains both ingestion and 
compaction(major and minor) writers.
-  private HoodieTimeline visibleCommitsAndCompactionTimeline;
+  // Timeline determines whether a file slice is committed or not. This 
timeline object is attached

Review Comment:
   can we fix the java docs. 
   ```
   // This is the commits timeline that will be visible for all views extending 
this view. This timeline contains completed write instants, all compaction 
instants(including pending). 
   This timeline object is attached to each HoodieFileGroup. Using this 
timeline object, HoodieFileGroup determines whether a fileSlice is committed or 
not. 
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -170,7 +170,7 @@ public interface HoodieTimeline extends Serializable {
    *
    * @return
    */
-  HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+  HoodieTimeline filterCompletedAndRewriteInstants();

Review Comment:
   generally, we have used rewrite to denote only replace commits. So, are we 
changing the definition ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -165,12 +165,18 @@ public interface HoodieTimeline extends Serializable {
 
   HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
 
+  /**
+   * This is the filtered write timeline that contains only completed and 
pending compaction instants.
+   * This is the timeline that is used in constructing file groups.
+   */
+  HoodieTimeline filterCompletedWriteAndCompactionInstants();
+
   /**
    * Timeline to just include completed commits or all rewrites like 
compaction, logcompaction and replace actions.
    *
    * @return
    */
-  HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+  HoodieTimeline filterCompletedAndRewriteInstants(HoodieTableMetaClient 
metaClient);

Review Comment:
   are you introducing a new terminology called "rewrite" with this patch. or 
is there precedence already? 
   my understanding is, rewrite refers to only replace commits. but some apis 
in this patch, we are using rewrite to refer to compaction as well? can we 
ensure its uniform across the board. 
   
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -130,36 +141,50 @@ private void runIncrementalSync(HoodieTimeline timeline, 
TimelineDiffResult diff
     });
 
     // Now remove pending log compaction instants which were completed or 
removed
-    
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instant 
-> {
+    
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instantPair
 -> {
+      try {
+        removePendingLogCompactionInstant(instantPair.getKey(), 
instantPair.getValue());
+      } catch (IOException e) {
+        throw new HoodieException(e);
+      }
+    });
+
+    // Now remove pending replace instants which were completed or removed
+    
diffResult.getFinishedOrRemovedReplaceInstants().stream().forEach(instantPair 
-> {
       try {
-        removePendingLogCompactionInstant(instant);
+        removePendingFileGroupsInPendingClustering(instantPair.getKey(), 
instantPair.getValue());
       } catch (IOException e) {
         throw new HoodieException(e);
       }
     });
 
-    // Add new completed instants found in the latest timeline, this also 
contains inflight instants.
-    diffResult.getNewlySeenInstants().stream()
-        .filter(instant -> instant.isCompleted()
-            || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)
-            || 
instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION))
+    HoodieTimeline newFileSystemViewTimeline =
+        newActiveTimeline.getFileSystemViewTimeline();
+    // Add new completed instants found in the latest timeline,
+    // this also contains inflight instants from rewrite timeline.
+    diffResult.getNewlySeenCompletedAndRewriteInstants()
         .forEach(instant -> {
           try {
             if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)
                 || 
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
-              addCommitInstant(timeline, instant);
+              addCommitInstant(newFileSystemViewTimeline, instant);

Review Comment:
   wouldn't newActiveTimeline a super set of newFileSystemViewTimeline? can we 
use newActiveTimeline for all from 170L to 187 ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction
+   * operation is failed, so it is no longer need to be tracked as pending.
    *
    * @param instant Log Compaction Instant to be removed
+   * @param isSuccessful true if log compaction operation is successful, false 
if the operation is failed and rollbacked.
    */
-  private void removePendingLogCompactionInstant(HoodieInstant instant) throws 
IOException {
-    LOG.info("Removing completed log compaction instant (" + instant + ")");
-    HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-    
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
-        .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-            
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+  private void removePendingLogCompactionInstant(HoodieInstant instant, 
boolean isSuccessful) throws IOException {
+    if (isSuccessful) {
+      LOG.info("Removing completed log compaction instant (" + instant + ")");
+      HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+      
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
+          .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+              
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+    } else {
+      LOG.info("Removing failed log compaction instant (" + instant + ")");
+      removePendingLogCompactionOperations(instant.getTimestamp());

Review Comment:
   I see why we are not interested in completed compaction instants now.
   but wouldn't the logic applicable to clustering commits as well then ? lets 
sync up f2f.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to