This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dbc1697431a017d2f13a1f5267e88161b003f253 Author: 5herhom <[email protected]> AuthorDate: Sun Aug 7 15:02:33 2022 +0800 [HUDI-4508] Repair the exception when reading optimized query for mor in hive and presto/trino (#6254) In MOR table, file slice may just have log file but no base file, before the file slice is compacted. In this case, read-optimized query will match the condition !baseFileOpt.isPresent() in HoodieCopyOnWriteTableInputFormat.createFileStatusUnchecked() and throw IllegalStateException. Instead of throwing exception, it is more suitable to query nothing in the file slice. Co-authored-by: sivabalan <[email protected]> --- .../hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java | 16 ++++++++++++++++ .../realtime/HoodieMergeOnReadTableInputFormat.java | 12 ++++++++++++ 2 files changed, 28 insertions(+) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 65a7259a6f..140e7ff5b6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableQueryType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -253,6 +254,7 @@ public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat { partitionedFileSlices.values() .stream() .flatMap(Collection::stream) + .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt)) .collect(Collectors.toList()) ); @@ -261,6 +263,20 @@ public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat { return targetFiles; } + protected boolean checkIfValidFileSlice(FileSlice fileSlice) { + Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile(); + Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile(); + + if (baseFileOpt.isPresent()) { + return true; + } else if (latestLogFileOpt.isPresent()) { + // It happens when reading optimized query to mor. + return false; + } else { + throw new IllegalStateException("Invalid state: base-file has to be present for " + fileSlice.getFileId()); + } + } + private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) { List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); checkState(diff.isEmpty(), "Should be empty"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 982d52b0d4..64fc54392a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -103,6 +103,18 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp } } + @Override + protected boolean checkIfValidFileSlice(FileSlice fileSlice) { + Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile(); + Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile(); + + if (baseFileOpt.isPresent() || latestLogFileOpt.isPresent()) { + return true; + } else { + throw new IllegalStateException("Invalid state: either base-file or log-file has to be present for " + fileSlice.getFileId()); + } + } + /** * Keep the logic of mor_incr_view as same as spark datasource. * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).
