suryaprasanna commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264721375


##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction
+   * operation is failed, so it is no longer need to be tracked as pending.
    *
    * @param instant Log Compaction Instant to be removed
+   * @param isSuccessful true if log compaction operation is successful, false 
if the operation is failed and rollbacked.
    */
-  private void removePendingLogCompactionInstant(HoodieInstant instant) throws 
IOException {
-    LOG.info("Removing completed log compaction instant (" + instant + ")");
-    HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-    
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
-        .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-            
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+  private void removePendingLogCompactionInstant(HoodieInstant instant, 
boolean isSuccessful) throws IOException {
+    if (isSuccessful) {
+      LOG.info("Removing completed log compaction instant (" + instant + ")");
+      HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+      
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
+          .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+              
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+    } else {
+      LOG.info("Removing failed log compaction instant (" + instant + ")");
+      removePendingLogCompactionOperations(instant.getTimestamp());

Review Comment:
   The concern, is similar to the one raised in TimelineDiffHelper class, I 
explained the reasoning behind it there. 
   Also, one more point to note is Compaction plans are by their nature 
immutable, whereas replacecommits plans can be either mutable or immutable. It 
depends on what UpdateStrategy implementation users are using. That means 
traditional flow cannot rollback compaction plans except a developer manually 
doing a restore, and replacecommit can be rolled back. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,15 +137,16 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteTimeline();
+    this.visibleFileSystemViewTimeline = visibleActiveTimeline
+        .getFileSystemViewTimeline();
   }
 
   /**
    * Adds the provided statuses into the file system view, and also caches it 
inside this object.
    */
   public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
-    HoodieTimer timer = HoodieTimer.start();
-    List<HoodieFileGroup> fileGroups = buildFileGroups(statuses, 
visibleCommitsAndCompactionTimeline, true);
+    HoodieTimer timer = new HoodieTimer().startTimer();

Review Comment:
   Good catch, rebasing might have introduced it fixed it now.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -130,36 +141,50 @@ private void runIncrementalSync(HoodieTimeline timeline, 
TimelineDiffResult diff
     });
 
     // Now remove pending log compaction instants which were completed or 
removed
-    
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instant 
-> {
+    
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instantPair
 -> {
+      try {
+        removePendingLogCompactionInstant(instantPair.getKey(), 
instantPair.getValue());
+      } catch (IOException e) {
+        throw new HoodieException(e);
+      }
+    });
+
+    // Now remove pending replace instants which were completed or removed
+    
diffResult.getFinishedOrRemovedReplaceInstants().stream().forEach(instantPair 
-> {
       try {
-        removePendingLogCompactionInstant(instant);
+        removePendingFileGroupsInPendingClustering(instantPair.getKey(), 
instantPair.getValue());
       } catch (IOException e) {
         throw new HoodieException(e);
       }
     });
 
-    // Add new completed instants found in the latest timeline, this also 
contains inflight instants.
-    diffResult.getNewlySeenInstants().stream()
-        .filter(instant -> instant.isCompleted()
-            || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)
-            || 
instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION))
+    HoodieTimeline newFileSystemViewTimeline =
+        newActiveTimeline.getFileSystemViewTimeline();
+    // Add new completed instants found in the latest timeline,
+    // this also contains inflight instants from rewrite timeline.
+    diffResult.getNewlySeenCompletedAndRewriteInstants()
         .forEach(instant -> {
           try {
             if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)
                 || 
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
-              addCommitInstant(timeline, instant);
+              addCommitInstant(newFileSystemViewTimeline, instant);

Review Comment:
   It is, if I pass active timeline multiple times we will have to filter and 
recreate fileSystemViewTimeline. So, I created it once and passed it for 
methods that construct file groups. Also, all these method are private and are 
present in the same class so they could use a common view when building file 
groups.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -90,9 +91,11 @@ public abstract class AbstractTableFileSystemView implements 
SyncableFileSystemV
 
   protected HoodieTableMetaClient metaClient;
 
-  // This is the commits timeline that will be visible for all views extending 
this view
-  // This is nothing but the write timeline, which contains both ingestion and 
compaction(major and minor) writers.
-  private HoodieTimeline visibleCommitsAndCompactionTimeline;
+  // Timeline determines whether a file slice is committed or not. This 
timeline object is attached

Review Comment:
   Yeah, fixed it.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -150,11 +151,25 @@ public static Map<HoodieFileGroupId, HoodieInstant> 
getAllFileGroupsInPendingClu
 
   public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupsInPendingClusteringInstant(
       HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+    if (clusteringPlan == null || clusteringPlan.getInputGroups() == null || 
clusteringPlan.getInputGroups().isEmpty()) {
+      return Stream.empty();
+    }
     Stream<HoodieFileGroupId> partitionToFileIdLists = 
clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
     return partitionToFileIdLists.map(e -> Pair.of(e, instant));
   }
 
-  private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> 
getFileGroupEntriesInClusteringPlan(
+  public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupEntriesFromClusteringInstant(
+      HoodieInstant clusteringInstant, HoodieTableMetaClient metaClient) 
throws IOException {
+    Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
+        ClusteringUtils.getRequestedReplaceMetadata(metaClient, 
clusteringInstant);

Review Comment:
   isCluteringInstant checks are already before calling this API.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -478,8 +488,11 @@ protected FileStatus[] listPartition(Path partitionPath) 
throws IOException {
    * @param statuses List of File-Status
    */
   private Stream<HoodieBaseFile> convertFileStatusesToBaseFiles(FileStatus[] 
statuses) {
-    Predicate<FileStatus> roFilePredicate = fileStatus -> 
fileStatus.getPath().getName()
-        
.contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
+    // Format of .hoodie_partition_metadata is changed to 
.hoodie_partition_metadata.parquet

Review Comment:
   Ref: 
https://github.com/apache/hudi/pull/5179/files#diff-c2a2767975107231dd51373bedd57cff81256256e0cb9ea2393b5af78f2c66dbR112



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -299,6 +310,17 @@ void 
removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieIn
     });
   }
 
+  @Override
+  protected void removeFileGroupsInPendingClustering(String clusteringInstant) 
{
+    List<HoodieFileGroupId> fileGroupIdsToRemove = 
fgIdToPendingClustering.entrySet()
+        .stream().filter(fgIdInstantEntry -> 
clusteringInstant.equals(fgIdInstantEntry.getValue().getTimestamp()))
+        .map(Map.Entry::getKey).collect(Collectors.toList());
+    ValidationUtils.checkArgument(!fileGroupIdsToRemove.isEmpty(),

Review Comment:
   If we have arrived at this point, that means old timeline has replacecommit 
in-inflight and new timeline either completed it or rolled back. 
   In that case, if we are not finding the filegroups to remove that means 
there maybe some other issues. So, it is could to fail incremental file sync 
and let full sync happen. If required at later point we could add monitoring 
for incremental sync to make sure everything is working as expected.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -144,44 +172,54 @@ private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingCompactionTran
    */
   public static class TimelineDiffResult {
 
-    private final List<HoodieInstant> newlySeenInstants;
+    private final List<HoodieInstant> newlySeenCompletedAndRewriteInstants;
     private final List<HoodieInstant> finishedCompactionInstants;

Review Comment:
   Ideally compaction are not removed, since after creating a compaction plan 
even though they are in inflight state new data is written to new file slices. 
So, if we were to remove compaction, then we might need to remove latest files. 
We could add APIs to remove them, but it is bit risky.
   Also compaction plans are immutable plans so writers cannot delete them, 
only developers can remove them manually by using restore operations.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -833,11 +832,11 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>> 
twoUpsertCommitDataWithTwoP
     SyncableFileSystemView fsView = 
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
     List<HoodieFileGroup> firstPartitionCommit2FileGroups = 
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
     assertEquals(1, firstPartitionCommit2FileGroups.size());
-    
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+    
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlicesIncludingInflight().collect(Collectors.toList()));

Review Comment:
   Existing unit tests are written to read inflight file slices from the views, 
it is not evident from the APIs we have in File system view classes. 
   For example look at line 832 in this class., by passing active timeline they 
are expecting to read inflight file slices and by passing 
completedAndCompactionTimeline in FileSystemViewManager class they can read 
committed file slices. With this change everyone have to pass active timeline 
and use the appropriate APIs to read the respective slice streams.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -178,7 +203,15 @@ public Option<HoodieBaseFile> getLatestDataFile() {
    * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
    */
   public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime) 
{
-    return Option.fromJavaOptional(getAllFileSlices().filter(slice -> 
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, 
maxInstantTime)).findFirst());
+    return getLatestFileSliceBeforeOrOn(maxInstantTime, false);
+  }
+
+  /**
+   * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
+   */
+  public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime, 
boolean includePending) {

Review Comment:
   I have created these methods to fix broken test cases, existing unit tests 
are already use the logic present in these methods in a different way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to