alexeykudinkin commented on a change in pull request #4531:
URL: https://github.com/apache/hudi/pull/4531#discussion_r784477680
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -102,18 +138,98 @@ public final void setConf(Configuration conf) {
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
-
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job,
tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
+ returns.addAll(listStatusForSnapshotMode(job, tableMetaClientMap,
snapshotPaths));
}
return returns.toArray(new FileStatus[0]);
}
+ @Nonnull
+ private List<FileStatus> listStatusForSnapshotMode(JobConf job,
+ Map<String,
HoodieTableMetaClient> tableMetaClientMap,
+ List<Path> snapshotPaths)
throws IOException {
+ HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
+ List<FileStatus> targetFiles = new ArrayList<>();
+
+ TypedProperties props = new TypedProperties(new Properties());
+
+ Map<HoodieTableMetaClient, List<Path>> groupedPaths =
+
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(),
snapshotPaths);
+
+ for (Map.Entry<HoodieTableMetaClient, List<Path>> entry :
groupedPaths.entrySet()) {
+ HoodieTableMetaClient tableMetaClient = entry.getKey();
+ List<Path> partitionPaths = entry.getValue();
+
+ // Hive job might specify a max commit instant up to which table's state
+ // should be examined. We simply pass it as query's instant to the
file-index
+ Option<String> queryCommitInstant =
+ HoodieHiveUtils.getMaxCommit(job,
tableMetaClient.getTableConfig().getTableName());
+
+ boolean shouldIncludePendingCommits =
+ HoodieHiveUtils.shouldIncludePendingCommits(job,
tableMetaClient.getTableConfig().getTableName());
+
+ HiveHoodieTableFileIndex fileIndex =
+ new HiveHoodieTableFileIndex(
+ engineContext,
+ tableMetaClient,
+ props,
+ HoodieTableQueryType.QUERY_TYPE_SNAPSHOT,
+ partitionPaths,
+ queryCommitInstant,
+ shouldIncludePendingCommits);
+
+ Map<String, Seq<FileSlice>> partitionedFileSlices =
+
JavaConverters.mapAsJavaMapConverter(fileIndex.listFileSlices()).asJava();
+
+ targetFiles.addAll(
+ partitionedFileSlices.values()
+ .stream()
+ .flatMap(seq ->
JavaConverters.seqAsJavaListConverter(seq).asJava().stream())
+ .map(fileSlice -> {
+ Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
+ Option<HoodieLogFile> latestLogFileOpt =
fileSlice.getLatestLogFile();
+ if (baseFileOpt.isPresent()) {
+ return getFileStatusUnchecked(baseFileOpt);
+ } else if (includeLogFilesForSnapShotView() &&
latestLogFileOpt.isPresent()) {
+ return
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(),
fileSlice.getLogFiles());
+ } else {
+ throw new IllegalStateException("Invalid state: either
base-file or log-file should be present");
+ }
+ })
+ .collect(Collectors.toList())
+ );
+ }
+
+ // TODO cleanup
+ validate(targetFiles, listStatusForSnapshotModeLegacy(job,
tableMetaClientMap, snapshotPaths));
Review comment:
This was actually just for CI validation. Will clean it up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]