leesf commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r716731093



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +90,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = 
HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> 
affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> 
fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, 
p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+    fileGroups.stream().forEach(f -> {
+      try {
+        List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> 
slice.getBaseFile().isPresent()).collect(Collectors.toList());
+        if (!baseFiles.isEmpty()) {
+          FileStatus baseFileStatus = 
HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+          String baseFilePath = baseFileStatus.getPath().toUri().toString();
+          if (!candidateFileStatus.containsKey(baseFilePath)) {
+            throw new HoodieException("Error obtaining fileStatus for file: " 
+ baseFilePath);
+          }
+          RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+          fileStatus.setMaxCommitTime(maxCommitTime);
+          fileStatus.setBelongToIncrementalFileStatus(true);
+          fileStatus.setBasePath(basePath);
+          fileStatus.setBaseFilePath(baseFilePath);
+          
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> 
l.getPath().toString()).collect(Collectors.toList()));
+          // try to set bootstrapfileStatus
+          if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile 
|| baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+            fileStatus.setBootStrapFileStatus(baseFileStatus);
+          }
+          result.add(fileStatus);
+        }
+        // add file group which has only logs.
+        if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
+          List<FileStatus> logFileStatus = 
f.getLatestFileSlice().get().getLogFiles().map(logFile -> 
logFile.getFileStatus()).collect(Collectors.toList());
+          if (logFileStatus.size() > 0) {
+            RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(logFileStatus.get(0));
+            fileStatus.setLogFileOnly(true);
+            fileStatus.setBelongToIncrementalFileStatus(true);
+            fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> 
l.getPath().toString()).collect(Collectors.toList()));
+            fileStatus.setMaxCommitTime(maxCommitTime);
+            fileStatus.setBasePath(basePath);
+            result.add(fileStatus);
+          }
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Error obtaining data file/log file grouping 
", e);
+      }
+    });
+    return result;
+  }
+
+  @Override
+  protected boolean includeLogFilesForSnapShortView() {
+    return false;

Review comment:
       here should return `true` since the RealtimeInputFormat contains log 
files?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to