bvaradar commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453225662



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
+    // is this an incremental query
+    List<String> incrementalTables = 
getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot 
and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet 
files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, 
List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, 
getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = 
inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but 
there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = 
HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to 
get all the commits.
+      Integer maxCommits = 
HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set 
as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = 
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = 
listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: 
total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() 
+ ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = 
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, 
basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline 
timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism 
?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new 
HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: 
commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? 
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or 
fileSizeInBytes?

Review comment:
       @bhasudha : totalWriteBytes = fileSizeInBytes for base files (parquet). 
For log files, this is not the case as we use a heuristic to estimated the 
bytes written per delta-commit. 
   
   Getting the actual file size would require a RPC call and will be costly 
here. Also, looking at where the file size is useful in read-path, it is only 
needed for combining and splitting file-splits which is based on base-file for 
Realtime. So, it should be fine to use the metadata stat 
stat.getTotalWriteBytes() with a change.
   
   As same log file can appear in multiple delta commits (for HDFS and other 
file-systems supporting appends), the logic below needs to handle that, You can 
simply have the cumulative write-bytes for each log file appearing across 
delta-commits to get a better approximate size of log files. 
   

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
+    // is this an incremental query
+    List<String> incrementalTables = 
getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot 
and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet 
files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, 
List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, 
getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = 
inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but 
there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = 
HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to 
get all the commits.
+      Integer maxCommits = 
HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set 
as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = 
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = 
listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: 
total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() 
+ ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = 
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, 
basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline 
timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism 
?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new 
HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: 
commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? 
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or 
fileSizeInBytes?
+            FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false, 
0, 0,
+                0, fullPath);
+            partitionToFileStatusesMap.get(entry.getKey()).add(fs);

Review comment:
       need to handle duplicate log files here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to