danny0405 commented on code in PR #5953:
URL: https://github.com/apache/hudi/pull/5953#discussion_r912871566
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -302,6 +312,51 @@ private Stream<HoodieInstant>
maySkipCompaction(Stream<HoodieInstant> instants)
: instants;
}
+ private Stream<FileSlice> filterFileSliceWithValidFiles(FileSystem fs,
Stream<FileSlice> fileSlices) {
+ // we need to filter out the base file and log file that does not exist
+ return fileSlices.map(fileSlice -> {
+ List<HoodieLogFile> logFiles = fileSlice.getLogFiles()
+ .filter(logFile -> {
+ try {
+ return fs.exists(logFile.getPath());
+ } catch (IOException e) {
+ LOG.error("Checking exists of log file path: {} error",
logFile.getPath().toString());
+ throw new HoodieException(e);
+ }
+ }).collect(Collectors.toList());
+ return generateFileSlice(fileSlice.getPartitionPath(),
+ fileSlice.getBaseInstantTime(),
+ fileSlice.getFileId(),
+ fileSlice.getBaseFile().orElse(null),
+ logFiles);
+ }).filter(fileSlice -> {
+ // we should keep the file slice if any base/log file exists
+ if (fileSlice.getLatestLogFile().isPresent()) {
+ return true;
+ }
+ Option<String> basePath = fileSlice.getBaseFile().map(BaseFile::getPath);
+ try {
+ return basePath.isPresent() && fs.exists(new
org.apache.hadoop.fs.Path(basePath.get()));
+ } catch (IOException e) {
+ LOG.error("Checking exists of base path: {} error", basePath);
+ throw new HoodieException(e);
+ }
+ });
+ }
+
+ private FileSlice generateFileSlice(String partitionPath,
+ String baseInstant,
Review Comment:
I have thought about the patch for a few days and maybe the best way it just
removing the existence check, the fs view and timeline should keep the layout
completeness
1. we always read to the latest commit for streaming read
2. for batch read with specific end commit, the user should ensure the
existence of the version.
So, just remove the existence check and throws directly if file disappears
for some reason.
[3953.patch.zip](https://github.com/apache/hudi/files/9038692/3953.patch.zip)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]