Zouxxyy commented on code in PR #8364:
URL: https://github.com/apache/hudi/pull/8364#discussion_r1188647600


##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -850,6 +850,25 @@ public final Stream<FileSlice> 
getLatestFileSlicesBeforeOrOn(String partitionStr
     }
   }
 
+  @Override
+  public final Map<String, Stream<FileSlice>> 
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+      List<String> formattedPartitionList = 
ensureAllPartitionsLoadedCorrectly();
+      return formattedPartitionList.stream().collect(Collectors.toMap(
+          Function.identity(),
+          partitionPath -> fetchAllStoredFileGroups(partitionPath)
+              .filter(slice -> 
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+              .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime))
+              .map(sliceStream -> sliceStream.flatMap(slice -> 
this.filterBaseFileAfterPendingCompaction(slice, false)))
+              .map(sliceStream -> 
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)

Review Comment:
   > This may give in-complete slice for the `FileGroup`, but it still works 
for your use case. A more general way is we merge the pending slice log files 
with the previous file slice logs, just like what we do for the reader view.
   
   `getAllLatestFileSlicesBeforeOrOn` copy the the logic of 
`getLatestFileSlicesBeforeOrOn` and set `includeFileSlicesInPendingCompaction` 
to true.
   In fact, I'm a little confused about this:
   ```java
     protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice 
fileSlice, boolean includeEmptyFileSlice) {
       if (isFileSliceAfterPendingCompaction(fileSlice)) {
         LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
         // Base file is filtered out of the file-slice as the corresponding 
compaction
         // instant not completed yet.
         FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
         fileSlice.getLogFiles().forEach(transformed::addLogFile);
         if (transformed.isEmpty() && !includeEmptyFileSlice) {
           return Stream.of();
         }
         return Stream.of(transformed);
       }
       return Stream.of(fileSlice);
     }
   ```
   Here only the base file is filtered, so there is such a situation: the base 
file is not completed(under inflight compaction), but the log files are 
completed? If this is the case, I think it is reasonable to add these log files 
to the save point.
   
   And can you explain the reader view solution you provided? Here we may just 
return the required files for save point, do we need to merge them?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to