codope commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264702031


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -833,11 +832,11 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>> 
twoUpsertCommitDataWithTwoP
     SyncableFileSystemView fsView = 
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
     List<HoodieFileGroup> firstPartitionCommit2FileGroups = 
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
     assertEquals(1, firstPartitionCommit2FileGroups.size());
-    
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+    
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlicesIncludingInflight().collect(Collectors.toList()));

Review Comment:
   why are we including inflight slices as well?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -145,6 +150,17 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionPath, String max
      */
     Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime);
 
+    /**
+     * Stream all "merged" file-slices before or on an instant time If a 
file-group has a pending compaction request, the
+     * file-slice before and after compaction request instant is merged and 
returned. This API can also include pending instants.
+     *
+     * @param partitionPath Partition Path
+     * @param maxInstantTime Max Instant Time
+     * @param includePending Ability to include pending instants as well
+     * @return
+     */
+    Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime, boolean includePending);

Review Comment:
   please add tests for the new methods



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant 
instant) throws IOExce
   }
 
   /**
-   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit,
-   * so you no longer need to track them as pending.
+   * Remove Pending compaction instant. This is called when logcompaction is 
converted to delta commit or the log compaction
+   * operation is failed, so it is no longer need to be tracked as pending.
    *
    * @param instant Log Compaction Instant to be removed
+   * @param isSuccessful true if log compaction operation is successful, false 
if the operation is failed and rollbacked.
    */
-  private void removePendingLogCompactionInstant(HoodieInstant instant) throws 
IOException {
-    LOG.info("Removing completed log compaction instant (" + instant + ")");
-    HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-    
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
-        .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-            
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+  private void removePendingLogCompactionInstant(HoodieInstant instant, 
boolean isSuccessful) throws IOException {
+    if (isSuccessful) {
+      LOG.info("Removing completed log compaction instant (" + instant + ")");
+      HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+      
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
 plan)
+          .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+              
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+    } else {
+      LOG.info("Removing failed log compaction instant (" + instant + ")");
+      removePendingLogCompactionOperations(instant.getTimestamp());

Review Comment:
   yes i think we should apply the same to clustering commit too.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,15 +137,16 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteTimeline();
+    this.visibleFileSystemViewTimeline = visibleActiveTimeline
+        .getFileSystemViewTimeline();
   }
 
   /**
    * Adds the provided statuses into the file system view, and also caches it 
inside this object.
    */
   public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
-    HoodieTimer timer = HoodieTimer.start();
-    List<HoodieFileGroup> fileGroups = buildFileGroups(statuses, 
visibleCommitsAndCompactionTimeline, true);
+    HoodieTimer timer = new HoodieTimer().startTimer();

Review Comment:
   is there a need to change this? it re-introduces deprecated code.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -37,27 +38,31 @@ public class TimelineDiffHelper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TimelineDiffHelper.class);
 
-  public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
-      HoodieTimeline newTimeline) {
-
-    HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
-    HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
+  /**
+   * It performs validation to see if the incremental sync is possible. Once 
it determines it is safe to do,
+   * It tracks both the instants that are newly added and also instants that 
have transitioned from pending
+   * state to completed or removed.
+   * @param oldCompletedAndRewriteTimeline old completed and rewrite timeline
+   * @param newCompletedAndRewriteTimeline new completed and rewrite  timeline
+   * @return TimelineDiffResult object which is useful for incrementally 
syncing the timeline.
+   */
+  public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline oldCompletedAndRewriteTimeline,

Review Comment:
   Let's make sure that we add tests for the diff helper as it is used for MDT 
based fs view too.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -299,6 +310,17 @@ void 
removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieIn
     });
   }
 
+  @Override
+  protected void removeFileGroupsInPendingClustering(String clusteringInstant) 
{
+    List<HoodieFileGroupId> fileGroupIdsToRemove = 
fgIdToPendingClustering.entrySet()
+        .stream().filter(fgIdInstantEntry -> 
clusteringInstant.equals(fgIdInstantEntry.getValue().getTimestamp()))
+        .map(Map.Entry::getKey).collect(Collectors.toList());
+    ValidationUtils.checkArgument(!fileGroupIdsToRemove.isEmpty(),

Review Comment:
   Is this validation strictly necessary? If it's not found then also 
incremental sync can continue, isn't it?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -163,14 +163,19 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline filterCompletedAndCompactionInstants();
 
-  HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
+  /**
+   * File system view that is used for constructing file groups.
+   * This is the filtered timeline contains completed instants that are part 
of either
+   * write timeline or savepoint timeline. Along with this it also includes 
any pending compaction commits.
+   */
+  HoodieTimeline getFileSystemViewTimeline();

Review Comment:
   Why should it not include rollback and restore timeline? Let's make it clear 
in the documentation.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -178,7 +203,15 @@ public Option<HoodieBaseFile> getLatestDataFile() {
    * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
    */
   public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime) 
{
-    return Option.fromJavaOptional(getAllFileSlices().filter(slice -> 
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, 
maxInstantTime)).findFirst());
+    return getLatestFileSliceBeforeOrOn(maxInstantTime, false);
+  }
+
+  /**
+   * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
+   */
+  public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime, 
boolean includePending) {

Review Comment:
   let's make sure we have tests covering all these methods.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -150,11 +151,25 @@ public static Map<HoodieFileGroupId, HoodieInstant> 
getAllFileGroupsInPendingClu
 
   public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupsInPendingClusteringInstant(
       HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+    if (clusteringPlan == null || clusteringPlan.getInputGroups() == null || 
clusteringPlan.getInputGroups().isEmpty()) {
+      return Stream.empty();
+    }
     Stream<HoodieFileGroupId> partitionToFileIdLists = 
clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
     return partitionToFileIdLists.map(e -> Pair.of(e, instant));
   }
 
-  private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> 
getFileGroupEntriesInClusteringPlan(
+  public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupEntriesFromClusteringInstant(
+      HoodieInstant clusteringInstant, HoodieTableMetaClient metaClient) 
throws IOException {
+    Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
+        ClusteringUtils.getRequestedReplaceMetadata(metaClient, 
clusteringInstant);

Review Comment:
   note that for `insert_overwrite` operation, requested instant does not 
contain the clustering plan, so at line 165 below it might throw NPE.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -478,8 +488,11 @@ protected FileStatus[] listPartition(Path partitionPath) 
throws IOException {
    * @param statuses List of File-Status
    */
   private Stream<HoodieBaseFile> convertFileStatusesToBaseFiles(FileStatus[] 
statuses) {
-    Predicate<FileStatus> roFilePredicate = fileStatus -> 
fileStatus.getPath().getName()
-        
.contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
+    // Format of .hoodie_partition_metadata is changed to 
.hoodie_partition_metadata.parquet

Review Comment:
   When did this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to