bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453132014



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
+    // is this an incremental query
+    List<String> incrementalTables = 
getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot 
and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet 
files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, 
List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, 
getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = 
inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but 
there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = 
HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to 
get all the commits.
+      Integer maxCommits = 
HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set 
as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = 
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = 
listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: 
total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() 
+ ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = 
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, 
basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline 
timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism 
?

Review comment:
       There is scope for parallelized listing here. But should not be a 
blocker immediately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to