xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740697734



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-    Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+    List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+    boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+    return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *        a. Filter affected partitions based on inputPaths.
+   *        b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *        this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when we get baseFileStatus from 
HoodieTableFileSystemView,
+   *        the BaseFileStatus will missing file size information.
+   *        We should use candidate fileStatus to update the size information 
for BaseFileStatus.
+   * Step6: For every file group from step3(b)
+   *        Get 1st available base file from all file slices. then we use 
candidate file status to update the baseFileStatus,
+   *        and construct RealTimeFileStatus and add it to result along with 
log files.
+   *        If file group just has log files, construct RealTimeFileStatus and 
add it to result.
+   * TODO: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    // step1
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    // step2
+    commitsToCheck.get().sort(HoodieInstant::compareTo);
+    List<HoodieCommitMetadata> metadataList = commitsToCheck
+        .get().stream().map(instant -> {
+          try {
+            return HoodieInputFormatUtils.getCommitMetadata(instant, 
commitsTimelineToReturn);
+          } catch (IOException e) {
+            throw new HoodieException(String.format("cannot get metadata for 
instant: %s", instant));
+          }
+        }).collect(Collectors.toList());
+
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
metadataList));
+    // step3
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    Path basePath = new Path(tableMetaClient.getBasePath());
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = 
HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()

Review comment:
       L148 has already added all files in HoodieTableFileSystemView, and 
partitionToFileGroupsMap will be filled and cached。
   L157 take the read lock, but it can load fileGroups from 
partitionToFileGroupsMap directly no need fs rpc call, i thinks It won't cost 
much to read the lock。




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to