This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9c7c1019f5 [HUDI-6952] Skip reading the uncommitted log files for log 
reader (#9879)
c9c7c1019f5 is described below

commit c9c7c1019f5ca903ad3782f79370663f5ae26cb9
Author: Danny Chan <[email protected]>
AuthorDate: Sat Oct 21 04:21:14 2023 +0800

    [HUDI-6952] Skip reading the uncommitted log files for log reader (#9879)
    
    This is to avoid potential exceptions when the reader is processing an 
uncommitted log file while the
    cleaning or rollback service removes the log file.
---
 .../hudi/client/TestCompactionAdminClient.java     | 14 ++--
 .../table/log/AbstractHoodieLogRecordReader.java   | 24 ++-----
 .../table/timeline/CompletionTimeQueryView.java    | 19 +++--
 .../table/timeline/HoodieDefaultTimeline.java      | 57 +++++++++++----
 .../table/view/AbstractTableFileSystemView.java    | 84 +++++++++++++---------
 .../table/view/TestHoodieTableFileSystemView.java  | 23 +++---
 6 files changed, 129 insertions(+), 92 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 4177297a6ba..20677bf7c85 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.client.CompactionAdminClient.ValidationOpResult;
 import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -208,7 +207,6 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
     final HoodieTableFileSystemView newFsView =
         new HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
     Set<String> commitsWithDataFile = CollectionUtils.createSet("000", "004");
-    Set<String> commitsWithLogAfterCompactionRequest = 
CollectionUtils.createSet("000", "002");
     // Expect each file-slice whose base-commit is same as compaction commit 
to contain no new Log files
     
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 compactionInstant, true)
         .filter(fs -> fs.getBaseInstantTime().compareTo(compactionInstant) <= 
0)
@@ -218,16 +216,12 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
           } else {
             assertFalse(fs.getBaseFile().isPresent(), "No Data file should be 
present");
           }
-          if 
(commitsWithLogAfterCompactionRequest.contains(fs.getBaseInstantTime())) {
-            assertEquals(4, fs.getLogFiles().count(), "Has Log Files");
-          } else {
-            assertEquals(2, fs.getLogFiles().count(), "Has Log Files");
-          }
+          assertEquals(2, fs.getLogFiles().count(), "Has Log Files");
         });
 
     // Ensure same number of log-files before and after renaming per fileId
     Map<String, Long> fileIdToCountsAfterRenaming =
-        
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices)
+        
newFsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 ingestionInstant)
             .filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
             .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
             .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -246,7 +240,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
     ensureValidCompactionPlan(compactionInstant);
 
     // Check suggested rename operations
-    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
+    metaClient.reloadActiveTimeline();
 
     // Log files belonging to file-slices created because of compaction 
request must be renamed
 
@@ -277,7 +271,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
 
     // Ensure same number of log-files before and after renaming per fileId
     Map<String, Long> fileIdToCountsAfterRenaming =
-        
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices)
+        
newFsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 ingestionInstant)
             .filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
             .filter(fs -> fs.getFileId().equals(op.getFileId()))
             .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 3678efe7862..17c77d807c0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -235,13 +235,12 @@ public abstract class AbstractHoodieLogRecordReader {
     totalLogBlocks = new AtomicLong(0);
     totalLogRecords = new AtomicLong(0);
     HoodieLogFormatReader logFormatReaderWrapper = null;
-    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
-    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
-    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
     try {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
+          logFilePaths.stream()
+              .map(filePath -> new HoodieLogFile(new CachingPath(filePath)))
+              .collect(Collectors.toList()),
           readerSchema, true, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
@@ -269,11 +268,6 @@ public abstract class AbstractHoodieLogRecordReader {
           break;
         }
         if (logBlock.getBlockType() != CORRUPT_BLOCK && 
logBlock.getBlockType() != COMMAND_BLOCK) {
-          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
-              || inflightInstantsTimeline.containsInstant(instantTime)) {
-            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
-            continue;
-          }
           if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
             // filter the log block by instant range
             continue;
@@ -548,13 +542,12 @@ public abstract class AbstractHoodieLogRecordReader {
     totalLogBlocks = new AtomicLong(0);
     totalLogRecords = new AtomicLong(0);
     HoodieLogFormatReader logFormatReaderWrapper = null;
-    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
-    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
-    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
     try {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
+          logFilePaths.stream()
+              .map(logFile -> new HoodieLogFile(new CachingPath(logFile)))
+              .collect(Collectors.toList()),
           readerSchema, true, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
 
       /**
@@ -621,11 +614,6 @@ public abstract class AbstractHoodieLogRecordReader {
           break;
         }
         if (logBlock.getBlockType() != COMMAND_BLOCK) {
-          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
-              || inflightInstantsTimeline.containsInstant(instantTime)) {
-            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
-            continue;
-          }
           if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
             // filter the log block by instant range
             continue;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 104c40907a0..290f31ff344 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -34,6 +34,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
  * Query view for instant completion time.
@@ -57,9 +58,9 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   private final String startInstant;
 
   /**
-   * The first instant on the active timeline, used for query optimization.
+   * The first write instant on the active timeline, used for query 
optimization.
    */
-  private final String firstInstantOnActiveTimeline;
+  private final String firstNonSavepointCommit;
 
   /**
    * The constructor.
@@ -79,8 +80,9 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
startInstant) {
     this.metaClient = metaClient;
     this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
-    this.startInstant = startInstant;
-    this.firstInstantOnActiveTimeline = 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse("");
+    this.startInstant = minInstant(startInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+    // Note: use getWriteTimeline() to keep sync with the fs view 
visibleCommitsAndCompactionTimeline, see 
AbstractTableFileSystemView.refreshTimeline.
+    this.firstNonSavepointCommit = 
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
     load();
   }
 
@@ -88,7 +90,8 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
    * Returns whether the instant is completed.
    */
   public boolean isCompleted(String instantTime) {
-    return getCompletionTime(instantTime).isPresent();
+    return this.startToCompletionInstantTimeMap.containsKey(instantTime)
+        || HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN, 
this.firstNonSavepointCommit);
   }
 
   /**
@@ -154,7 +157,7 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     if (completionTime != null) {
       return Option.of(completionTime);
     }
-    if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN, 
this.firstInstantOnActiveTimeline)) {
+    if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN, 
this.startInstant)) {
       // the instant is still pending
       return Option.empty();
     }
@@ -199,6 +202,10 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     this.startToCompletionInstantTimeMap.putIfAbsent(instantTime, 
completionTime);
   }
 
+  private static String minInstant(String instant1, String instant2) {
+    return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 : 
instant2;
+  }
+
   @Override
   public void close() throws Exception {
     this.startToCompletionInstantTimeMap.clear();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 4b3386e5b3c..0ee3bd5bee4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -56,6 +56,10 @@ public class HoodieDefaultTimeline implements HoodieTimeline 
{
 
   protected transient Function<HoodieInstant, Option<byte[]>> details;
   private List<HoodieInstant> instants;
+  // for efficient #contains queries.
+  private transient volatile Set<String> instantTimeSet;
+  // for efficient #isBeforeTimelineStarts check.
+  private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
   private String timelineHash;
 
   public HoodieDefaultTimeline(Stream<HoodieInstant> instants, 
Function<HoodieInstant, Option<byte[]>> details) {
@@ -428,7 +432,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   @Override
   public boolean containsInstant(String ts) {
     // Check for 0.10.0+ timestamps which have msec granularity
-    if (getInstantsAsStream().anyMatch(s -> s.getTimestamp().equals(ts))) {
+    if (getOrCreateInstantSet().contains(ts)) {
       return true;
     }
 
@@ -480,20 +484,14 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   }
 
   public Option<HoodieInstant> getFirstNonSavepointCommit() {
-    Option<HoodieInstant> firstCommit = firstInstant();
-    Set<String> savepointTimestamps = getInstantsAsStream()
-        .filter(entry -> 
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
-        .map(HoodieInstant::getTimestamp)
-        .collect(Collectors.toSet());
-    Option<HoodieInstant> firstNonSavepointCommit = firstCommit;
-    if (!savepointTimestamps.isEmpty()) {
-      // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
-      // So, the first non-savepoint commit is considered as beginning of the 
active timeline.
-      firstNonSavepointCommit = Option.fromJavaOptional(getInstantsAsStream()
-          .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
-          .findFirst());
+    if (this.firstNonSavepointCommit == null) {
+      synchronized (this) {
+        if (this.firstNonSavepointCommit == null) {
+          this.firstNonSavepointCommit = 
findFirstNonSavepointCommit(this.instants);
+        }
+      }
     }
-    return firstNonSavepointCommit;
+    return this.firstNonSavepointCommit;
   }
 
   public Option<HoodieInstant> getLastClusterCommit() {
@@ -508,7 +506,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
           }
         }).findFirst());
   }
-  
+
   @Override
   public Option<byte[]> getInstantDetails(HoodieInstant instant) {
     return details.apply(instant);
@@ -524,6 +522,35 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     return this.getClass().getName() + ": " + 
getInstantsAsStream().map(Object::toString).collect(Collectors.joining(","));
   }
 
+  private Set<String> getOrCreateInstantSet() {
+    if (this.instantTimeSet == null) {
+      synchronized (this) {
+        if (this.instantTimeSet == null) {
+          this.instantTimeSet = 
this.instants.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+        }
+      }
+    }
+    return this.instantTimeSet;
+  }
+
+  /**
+   * Returns the first non savepoint commit on the timeline.
+   */
+  private static Option<HoodieInstant> 
findFirstNonSavepointCommit(List<HoodieInstant> instants) {
+    Set<String> savepointTimestamps = instants.stream()
+        .filter(entry -> 
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toSet());
+    if (!savepointTimestamps.isEmpty()) {
+      // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
+      // So, the first non-savepoint commit is considered as beginning of the 
active timeline.
+      return Option.fromJavaOptional(instants.stream()
+          .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
+          .findFirst());
+    }
+    return Option.fromJavaOptional(instants.stream().findFirst());
+  }
+
   /**
    * Merge this timeline with the given timeline.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 7e0d381bfb0..b3dc0fbce0a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -554,32 +554,22 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   /**
-   * Returns true if the file-group is under pending-compaction and the 
file-slice' baseInstant matches compaction
-   * Instant.
-   *
-   * @param fileSlice File Slice
-   */
-  protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
-    Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
-        getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
-    return (compactionWithInstantTime.isPresent())
-        && 
fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
-  }
-
-  /**
-   * With async compaction, it is possible to see partial/complete base-files 
due to inflight-compactions, Ignore those
-   * base-files.
+   * Ignores the uncommitted base and log files.
    *
    * @param fileSlice File Slice
    * @param includeEmptyFileSlice include empty file-slice
    */
-  protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice 
fileSlice, boolean includeEmptyFileSlice) {
-    if (isFileSliceAfterPendingCompaction(fileSlice)) {
-      LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
-      // Base file is filtered out of the file-slice as the corresponding 
compaction
-      // instant not completed yet.
+  private Stream<FileSlice> filterUncommittedFiles(FileSlice fileSlice, 
boolean includeEmptyFileSlice) {
+    Option<HoodieBaseFile> committedBaseFile = 
fileSlice.getBaseFile().isPresent() && 
completionTimeQueryView.isCompleted(fileSlice.getBaseInstantTime()) ? 
fileSlice.getBaseFile() : Option.empty();
+    List<HoodieLogFile> committedLogFiles = 
fileSlice.getLogFiles().filter(logFile -> 
completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())).collect(Collectors.toList());
+    if ((fileSlice.getBaseFile().isPresent() && !committedBaseFile.isPresent())
+        || committedLogFiles.size() != fileSlice.getLogFiles().count()) {
+      LOG.debug("File Slice (" + fileSlice + ") has uncommitted files.");
+      // A file is filtered out of the file-slice if the corresponding
+      // instant has not completed yet.
       FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
-      fileSlice.getLogFiles().forEach(transformed::addLogFile);
+      committedBaseFile.ifPresent(transformed::setBaseFile);
+      committedLogFiles.forEach(transformed::addLogFile);
       if (transformed.isEmpty() && !includeEmptyFileSlice) {
         return Stream.of();
       }
@@ -588,6 +578,25 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     return Stream.of(fileSlice);
   }
 
+  /**
+   * Ignores the uncommitted log files.
+   *
+   * @param fileSlice File Slice
+   */
+  private FileSlice filterUncommittedLogs(FileSlice fileSlice) {
+    List<HoodieLogFile> committedLogFiles = 
fileSlice.getLogFiles().filter(logFile -> 
completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())).collect(Collectors.toList());
+    if (committedLogFiles.size() != fileSlice.getLogFiles().count()) {
+      LOG.debug("File Slice (" + fileSlice + ") has uncommitted log files.");
+      // A file is filtered out of the file-slice if the corresponding
+      // instant has not completed yet.
+      FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+      fileSlice.getBaseFile().ifPresent(transformed::setBaseFile);
+      committedLogFiles.forEach(transformed::addLogFile);
+      return transformed;
+    }
+    return fileSlice;
+  }
+
   protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup 
fileGroup) {
     boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices()
         .anyMatch(fs -> 
fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS));
@@ -808,7 +817,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       ensurePartitionLoadedCorrectly(partitionPath);
       return fetchLatestFileSlices(partitionPath)
           .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
-          .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, 
true))
+          .flatMap(slice -> this.filterUncommittedFiles(slice, true))
           .map(this::addBootstrapBaseFileIfPresent);
     } finally {
       readLock.unlock();
@@ -831,7 +840,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
         if (!fs.isPresent()) {
           return Option.empty();
         }
-        return 
Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), 
true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
+        return Option.ofNullable(filterUncommittedFiles(fs.get(), 
true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
       }
     } finally {
       readLock.unlock();
@@ -873,16 +882,15 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
           .filter(slice -> 
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
           .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
       if (includeFileSlicesInPendingCompaction) {
-        return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice 
-> this.filterBaseFileAfterPendingCompaction(slice, false)))
-            .map(sliceStream -> 
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+        return 
allFileSliceStream.map(this::getLatestFileSliceFilteringUncommittedFiles)
+            .filter(Option::isPresent).map(Option::get)
             .map(this::addBootstrapBaseFileIfPresent);
       } else {
         return allFileSliceStream
             .map(sliceStream ->
-                Option.fromJavaOptional(sliceStream
+                getLatestFileSliceFilteringUncommittedFiles(sliceStream
                     .filter(slice -> 
!isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
-                    .filter(slice -> !slice.isEmpty())
-                    .findFirst()))
+                    .filter(slice -> !slice.isEmpty())))
             
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
       }
     } finally {
@@ -890,6 +898,16 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     }
   }
 
+  /**
+   * Looks for the latest file slice that is not empty after filtering out the 
uncommitted files.
+   *
+   * <p>Note: Checks from the latest file slice first to improve the 
efficiency. There is no need to check
+   * every file slice, the uncommitted files only exist in the latest file 
slice basically.
+   */
+  private Option<FileSlice> 
getLatestFileSliceFilteringUncommittedFiles(Stream<FileSlice> fileSlices) {
+    return Option.fromJavaOptional(fileSlices.flatMap(fileSlice -> 
filterUncommittedFiles(fileSlice, false)).findFirst());
+  }
+
   @Override
   public final Map<String, Stream<FileSlice>> 
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
     try {
@@ -900,8 +918,8 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
           partitionPath -> fetchAllStoredFileGroups(partitionPath)
               .filter(slice -> 
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
               .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime))
-              .map(sliceStream -> sliceStream.flatMap(slice -> 
this.filterBaseFileAfterPendingCompaction(slice, false)))
-              .map(sliceStream -> 
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+              .map(this::getLatestFileSliceFilteringUncommittedFiles)
+              .filter(Option::isPresent).map(Option::get)
               .map(this::addBootstrapBaseFileIfPresent)
       ));
     } finally {
@@ -921,7 +939,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
             Option<FileSlice> fileSlice = 
fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
             // if the file-group is under construction, pick the latest before 
compaction instant time.
             if (fileSlice.isPresent()) {
-              fileSlice = Option.of(fetchMergedFileSlice(fileGroup, 
fileSlice.get()));
+              fileSlice = Option.of(fetchMergedFileSlice(fileGroup, 
filterUncommittedLogs(fileSlice.get())));
             }
             return fileSlice;
           
}).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
@@ -951,7 +969,9 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       return fetchAllStoredFileGroups(partition)
           .filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), 
maxInstantTime))
           .map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, 
maxInstantTime))
-          
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+          .filter(Option::isPresent).map(Option::get)
+          .map(this::filterUncommittedLogs)
+          .map(this::addBootstrapBaseFileIfPresent);
     } finally {
       readLock.unlock();
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 5462a671f8c..ab8a1fd3aa2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -445,10 +445,9 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     assertThat("Base Instant for file-group set correctly", 
fileSlice.getBaseInstantTime(), is(instantTime1));
     assertThat("File-Id must be set correctly", fileSlice.getFileId(), 
is(fileId));
     List<HoodieLogFile> logFiles = 
fileSlice.getLogFiles().collect(Collectors.toList());
-    assertEquals(3, logFiles.size(), "Correct number of log-files shows up in 
file-slice");
-    assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order 
check");
-    assertEquals(deltaFile2, logFiles.get(1).getFileName(), "Log File Order 
check");
-    assertEquals(deltaFile1, logFiles.get(2).getFileName(), "Log File Order 
check");
+    assertEquals(2, logFiles.size(), "Correct number of log-files shows up in 
file-slice");
+    assertEquals(deltaFile2, logFiles.get(0).getFileName(), "Log File Order 
check");
+    assertEquals(deltaFile1, logFiles.get(1).getFileName(), "Log File Order 
check");
 
     // schedules a compaction
     String compactionInstantTime1 = metaClient.createNewInstantTime(); // 60 
-> 80
@@ -473,8 +472,7 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     assertThat("Base Instant for file-group set correctly", 
fileSlice.getBaseInstantTime(), is(compactionInstantTime1));
     assertThat("File-Id must be set correctly", fileSlice.getFileId(), 
is(fileId));
     logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
-    assertEquals(1, logFiles.size(), "Correct number of log-files shows up in 
file-slice");
-    assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order 
check");
+    assertEquals(0, logFiles.size(), "Correct number of log-files shows up in 
file-slice");
 
     // now finished the compaction
     saveAsComplete(commitTimeline, compactionInstant, Option.empty());
@@ -1037,6 +1035,9 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     testStreamLatestVersionInPartition(isLatestFileSliceOnly, 
fullPartitionPath, commitTime1, commitTime2, commitTime3,
         commitTime4, fileId1, fileId2, fileId3, fileId4);
 
+    // Note: the separate archiving of clean and rollback actions is removed 
since 1.0.0,
+    // now all the instants archive continuously.
+
     // Now create a scenario where archiving deleted commits (1,2, and 3) but 
retained cleaner clean1. Now clean1 is
     // the lowest commit time. Scenario for HUDI-162 - Here clean is the 
earliest action in active timeline
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").delete();
@@ -1078,7 +1079,7 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
 
     filenames = new HashSet<>();
     List<HoodieLogFile> logFilesList = 
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
-        .map(FileSlice::getLogFiles).flatMap(logFileList -> 
logFileList).collect(Collectors.toList());
+        .flatMap(FileSlice::getLogFiles).collect(Collectors.toList());
     assertEquals(4, logFilesList.size());
     for (HoodieLogFile logFile : logFilesList) {
       filenames.add(logFile.getFileName());
@@ -1422,8 +1423,8 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     String fullPartitionPath3 = basePath + "/" + partitionPath3 + "/";
     new File(fullPartitionPath3).mkdirs();
     String instantTime1 = "1";
-    String deltaInstantTime1 = "2";
-    String deltaInstantTime2 = "3";
+    String deltaInstantTime1 = "3";
+    String deltaInstantTime2 = "4";
     String fileId = UUID.randomUUID().toString();
 
     String dataFileName = FSUtils.makeBaseFileName(instantTime1, 
TEST_WRITE_TOKEN, fileId);
@@ -1489,8 +1490,8 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requested);
 
     // Fake delta-ingestion after compaction-requested
-    String deltaInstantTime4 = "4";
-    String deltaInstantTime5 = "6";
+    String deltaInstantTime4 = "5";
+    String deltaInstantTime5 = "7";
     String fileName3 =
         FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, 
deltaInstantTime4, 0, TEST_WRITE_TOKEN);
     String fileName4 =

Reply via email to