alexeykudinkin commented on code in PR #6516:
URL: https://github.com/apache/hudi/pull/6516#discussion_r975782719
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -412,19 +412,21 @@ protected boolean
isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
* With async compaction, it is possible to see partial/complete base-files
due to inflight-compactions, Ignore those
* base-files.
*
- * @param fileSlice File Slice
+ * @param fileSliceStream Stream of FileSlice
+ * @param includeEmptyFileSlice include empty file-slices
*/
- protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice
fileSlice) {
- if (isFileSliceAfterPendingCompaction(fileSlice)) {
- LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
- // Base file is filtered out of the file-slice as the corresponding
compaction
- // instant not completed yet.
- FileSlice transformed =
- new FileSlice(fileSlice.getPartitionPath(),
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
- fileSlice.getLogFiles().forEach(transformed::addLogFile);
- return transformed;
- }
- return fileSlice;
+ protected Stream<FileSlice>
filterBaseFileAfterPendingCompaction(Stream<FileSlice> fileSliceStream, boolean
includeEmptyFileSlice) {
+ return fileSliceStream.map(fileSlice -> {
+ if (isFileSliceAfterPendingCompaction(fileSlice)) {
+ LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
+ // Base file is filtered out of the file-slice as the corresponding
compaction
+ // instant not completed yet.
+ FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+ fileSlice.getLogFiles().forEach(transformed::addLogFile);
+ return transformed;
+ }
+ return fileSlice;
+ }).filter(slice -> includeEmptyFileSlice || !slice.isEmpty());
Review Comment:
We don't need separate filtering instead:
- Replace above `map` into `flatMap`
- If `includeEmptyFileSlice` is false return `Stream.of()`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -605,9 +607,8 @@ public final Stream<FileSlice> getLatestFileSlices(String
partitionStr) {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchLatestFileSlices(partitionPath)
- .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
- .map(this::filterBaseFileAfterPendingCompaction)
Review Comment:
Let's make signature like below:
```
Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice, boolean)
```
Then we just change `map` for `flatMap` here and everything else stays the
same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]