bhasudha commented on a change in pull request #689: [HUDI-25] Optimize 
HoodieInputFormat.listStatus for faster Hive Incremental queries
URL: https://github.com/apache/incubator-hudi/pull/689#discussion_r294632849
 
 

 ##########
 File path: 
hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
 ##########
 @@ -218,4 +215,109 @@ protected static HoodieTableMetaClient 
getTableMetaClient(FileSystem fs, Path da
     LOG.info("Reading hoodie metadata from path " + baseDir.toString());
     return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
   }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  private List<FileStatus> listStatusForIncrementalMode(JobConf job,
+      HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws 
IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieTimeline timeline = 
tableMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants();
+    String lastIncrementalTs = 
HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), 
tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    List<HoodieInstant> commitsToCheck = 
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList());
+    // Extract partitions touched by the commitsToCheck
+    Set<String> partitionsToList = new HashSet<>();
+    for (int i = 0; i < commitsToCheck.size(); i++) {
+      HoodieInstant commit = commitsToCheck.get(i);
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+          .fromBytes(timeline.getInstantDetails(commit).get(),
+              HoodieCommitMetadata.class);
+      
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return null;
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that 
has incremental changes
+           * This check is needed for the following corner case -  When the 
caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches 
of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition 
once) we do not want to
+           * accidentally return all incremental changes for the entire table 
in every listStatus()
+           * call. This will create redundant splits. Instead we only want to 
return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
 
 Review comment:
   Sent a PR here for updating docs on this - 
https://github.com/apache/incubator-hudi/pull/742 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to