bhasudha commented on a change in pull request #689: [HUDI-25] Optimize
HoodieInputFormat.listStatus for faster Hive Incremental queries
URL: https://github.com/apache/incubator-hudi/pull/689#discussion_r291485227
##########
File path:
hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
##########
@@ -218,4 +215,109 @@ protected static HoodieTableMetaClient
getTableMetaClient(FileSystem fs, Path da
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
}
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ private List<FileStatus> listStatusForIncrementalMode(JobConf job,
+ HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws
IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ HoodieTimeline timeline =
tableMetaClient.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants();
+ String lastIncrementalTs =
HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to get
all the commits.
+ Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job),
tableName);
+ LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+ List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList());
+ // Extract partitions touched by the commitsToCheck
+ Set<String> partitionsToList = new HashSet<>();
+ for (int i = 0; i < commitsToCheck.size(); i++) {
+ HoodieInstant commit = commitsToCheck.get(i);
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+ }
+ if (partitionsToList.isEmpty()) {
+ return null;
+ }
+ String incrementalInputPaths = partitionsToList.stream()
+ .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+ .filter(s -> {
+ /*
+ * Ensure to return only results from the original input path that
has incremental changes
+ * This check is needed for the following corner case - When the
caller invokes
+ * HoodieInputFormat.listStatus multiple times (with small batches
of Hive partitions each
+ * time. Ex. Hive fetch task calls listStatus for every partition
once) we do not want to
+ * accidentally return all incremental changes for the entire table
in every listStatus()
+ * call. This will create redundant splits. Instead we only want to
return the incremental
+ * changes (if so any) in that batch of input paths.
+ *
+ * NOTE on Hive queries that are executed using Fetch task:
+ * Since Fetch tasks invoke InputFormat.listStatus() per partition,
Hoodie metadata can be
+ * listed in every such listStatus() call. In order to avoid this,
it might be useful to
+ * disable fetch tasks using the hive session property for
incremental queries:
+ * `set hive.fetch.task.conversion=none;`
+ * This would ensure Map Reduce execution is chosen for a Hive
query, which combines
+ * partitions (comma separated) and calls InputFormat.listStatus()
only once with all
+ * those partitions.
+ */
+ for (Path path : inputPaths) {
+ if (path.toString().contains(s)) {
+ return true;
+ }
+ }
+ return false;
+ })
+ .collect(Collectors.joining(","));
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ setInputPaths(job, incrementalInputPaths);
+ FileStatus[] fileStatuses = super.listStatus(job);
+ TableFileSystemView.ReadOptimizedView roView = new
HoodieTableFileSystemView(tableMetaClient,
+ timeline, fileStatuses);
+ List<String> commitsList = commitsToCheck.stream().map(s ->
s.getTimestamp())
+ .collect(Collectors.toList());
+ List<HoodieDataFile> filteredFiles =
roView.getLatestDataFilesInRange(commitsList)
+ .collect(Collectors.toList());
+ List<FileStatus> returns = new ArrayList<>();
+ for (HoodieDataFile filteredFile : filteredFiles) {
+ LOG.info("Processing incremental hoodie file - " +
filteredFile.getPath());
Review comment:
All Incremental files will be logged here which can be a lot sometimes.
However, I kept the logging activities comparable to the current
HoodieInputFormat. What do you all think about removing this log ? Was this
logged for any specific reason previously? Shall I just remove this file level
logging?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services