alexeykudinkin commented on code in PR #6516:
URL: https://github.com/apache/hudi/pull/6516#discussion_r975782719


##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -412,19 +412,21 @@ protected boolean 
isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
    * With async compaction, it is possible to see partial/complete base-files 
due to inflight-compactions, Ignore those
    * base-files.
    *
-   * @param fileSlice File Slice
+   * @param fileSliceStream Stream of FileSlice
+   * @param includeEmptyFileSlice include empty file-slices
    */
-  protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice 
fileSlice) {
-    if (isFileSliceAfterPendingCompaction(fileSlice)) {
-      LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
-      // Base file is filtered out of the file-slice as the corresponding 
compaction
-      // instant not completed yet.
-      FileSlice transformed =
-          new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
-      fileSlice.getLogFiles().forEach(transformed::addLogFile);
-      return transformed;
-    }
-    return fileSlice;
+  protected Stream<FileSlice> 
filterBaseFileAfterPendingCompaction(Stream<FileSlice> fileSliceStream, boolean 
includeEmptyFileSlice) {
+    return fileSliceStream.map(fileSlice -> {
+      if (isFileSliceAfterPendingCompaction(fileSlice)) {
+        LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
+        // Base file is filtered out of the file-slice as the corresponding 
compaction
+        // instant not completed yet.
+        FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+        fileSlice.getLogFiles().forEach(transformed::addLogFile);
+        return transformed;
+      }
+      return fileSlice;
+    }).filter(slice -> includeEmptyFileSlice || !slice.isEmpty());

Review Comment:
   We don't need separate filtering instead:
   
    - Replace above `map` into `flatMap`
    - If `includeEmptyFileSlice` is false return `Stream.of()`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -605,9 +607,8 @@ public final Stream<FileSlice> getLatestFileSlices(String 
partitionStr) {
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchLatestFileSlices(partitionPath)
-          .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
-          .map(this::filterBaseFileAfterPendingCompaction)

Review Comment:
   Let's make signature like below:
   
   ```
   Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice, boolean)
   ```
   
   Then we just change `map` for `flatMap` here and everything else stays the 
same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to