bhasudha commented on a change in pull request #689: [HUDI-25] Optimize 
HoodieInputFormat.listStatus for faster Hive Incremental queries
URL: https://github.com/apache/incubator-hudi/pull/689#discussion_r291485227
 
 

 ##########
 File path: 
hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
 ##########
 @@ -218,4 +215,109 @@ protected static HoodieTableMetaClient 
getTableMetaClient(FileSystem fs, Path da
     LOG.info("Reading hoodie metadata from path " + baseDir.toString());
     return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
   }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  private List<FileStatus> listStatusForIncrementalMode(JobConf job,
+      HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws 
IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieTimeline timeline = 
tableMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants();
+    String lastIncrementalTs = 
HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), 
tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    List<HoodieInstant> commitsToCheck = 
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList());
+    // Extract partitions touched by the commitsToCheck
+    Set<String> partitionsToList = new HashSet<>();
+    for (int i = 0; i < commitsToCheck.size(); i++) {
+      HoodieInstant commit = commitsToCheck.get(i);
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+          .fromBytes(timeline.getInstantDetails(commit).get(),
+              HoodieCommitMetadata.class);
+      
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return null;
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that 
has incremental changes
+           * This check is needed for the following corner case -  When the 
caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches 
of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition 
once) we do not want to
+           * accidentally return all incremental changes for the entire table 
in every listStatus()
+           * call. This will create redundant splits. Instead we only want to 
return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, 
Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, 
it might be useful to
+           * disable fetch tasks using the hive session property for 
incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive 
query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() 
only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    setInputPaths(job, incrementalInputPaths);
+    FileStatus[] fileStatuses = super.listStatus(job);
+    TableFileSystemView.ReadOptimizedView roView = new 
HoodieTableFileSystemView(tableMetaClient,
+        timeline, fileStatuses);
+    List<String> commitsList = commitsToCheck.stream().map(s -> 
s.getTimestamp())
+        .collect(Collectors.toList());
+    List<HoodieDataFile> filteredFiles = 
roView.getLatestDataFilesInRange(commitsList)
+        .collect(Collectors.toList());
+    List<FileStatus> returns = new ArrayList<>();
+    for (HoodieDataFile filteredFile : filteredFiles) {
+      LOG.info("Processing incremental hoodie file - " + 
filteredFile.getPath());
 
 Review comment:
   All Incremental files will be logged here which can be a lot sometimes. 
However, I kept the logging activities comparable to the current 
HoodieInputFormat. What do you all think about removing this log ? Was this 
logged for any specific reason previously? Shall I just remove this file level 
logging?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to