alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798007427



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          
HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> 
inputPaths,
+                                                          String 
incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, 
timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      
Stream<HoodieLogFile> logFiles,
+                                                                      
Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      
HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = 
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBaseFilePath(baseFile.getPath());
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());
+
+        rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+      }
+
+      if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || 
baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+        rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+      }
+
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new HoodieIOException(String.format("Failed to init %s", 
RealtimeFileStatus.class.getSimpleName()), e);
+    }
+  }
+
+  @Nonnull
+  private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, 
Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> 
snapshotPaths) throws IOException {
+    return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, 
tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+  }
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,

Review comment:
       Not sure i follow your suggestion, can you elaborate on what you have in 
mind to be included in this doc? 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,126 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          
HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> 
inputPaths,
+                                                          String 
incrementalTable) throws IOException {
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, 
incrementalTable, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      
Stream<HoodieLogFile> logFiles,
+                                                                      
Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      
HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = 
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());

Review comment:
       This is a new

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          
HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> 
inputPaths,
+                                                          String 
incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, 
timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,

Review comment:
       Do you have something in mind?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean 
populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> 
fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = 
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       Duplication isn't an issue b/c this list is only used for files 
filtering currently

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -187,13 +285,28 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
               .map(fileSlice -> {
                 Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
                 Option<HoodieLogFile> latestLogFileOpt = 
fileSlice.getLatestLogFile();
-                if (baseFileOpt.isPresent()) {
-                  return getFileStatusUnchecked(baseFileOpt);
-                } else if (includeLogFilesForSnapShotView() && 
latestLogFileOpt.isPresent()) {
-                  return 
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), 
fileSlice.getLogFiles());
+                Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+
+                Option<HoodieInstant> latestCompletedInstantOpt =
+                    fromScala(fileIndex.latestCompletedInstant());
+
+                // Check if we're reading a MOR table
+                if (includeLogFilesForSnapshotView()) {
+                  if (baseFileOpt.isPresent()) {
+                    return 
createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, 
latestCompletedInstantOpt, tableMetaClient);

Review comment:
       A list of partition paths that are being read by Hive in snapshot mode 
(name is a carry-over from the previous code)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to