yihua commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r794793796



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code 
RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does 
have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have 
associated bootstrap file

Review comment:
       Got it.  Could you update the docs?  Originally I thought `else` block 
only handles non-Hudi path cases.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##########
@@ -44,9 +44,7 @@
 
   private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
 
-  public HoodieRealtimeFileSplit() {
-    super();

Review comment:
       In that case, we need to remove the explicit no-arg constructor to have 
the default behavior?  Here it explicitly does nothing.  

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          
HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> 
inputPaths,
+                                                          String 
incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, 
timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,

Review comment:
       Got it.  I saw two methods of `createRealtimeFileStatusUnchecked()` with 
different args so wondering what's the difference.  Basically, one for base 
file + log files, and the other for log files only.  Maybe rename them so the 
method names are more descriptive?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {

Review comment:
       Got it, makes sense.  In the future, you can have a separate PR for just 
reordering, which makes the review turnaround shorter.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -144,28 +204,32 @@
     return rtSplits.toArray(new InputSplit[0]);
   }
 
+  /**
+   * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, 
List)}
+   */
   // get IncrementalRealtimeSplits
-  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) throws IOException {
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+        "All splits have to belong to incremental query");
+
     List<InputSplit> rtSplits = new ArrayList<>();
-    List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
-    Set<Path> partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+    Set<Path> partitionSet = fileSplits.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
     // Pre process tableConfig from first partition to fetch virtual key info
     Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
     if (partitionSet.size() > 0) {
       hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
     }
     Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-    fileSplitList.stream().forEach(s -> {
+    fileSplits.stream().forEach(s -> {
       // deal with incremental query.
       try {
         if (s instanceof BaseFileWithLogsSplit) {
-          BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
-          if (bs.getBelongToIncrementalSplit()) {
-            rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
-          }
+          BaseFileWithLogsSplit bs = unsafeCast(s);
+          rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
         } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
-          rtSplits.add(s);
+          RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);

Review comment:
       Yeah, my point is, you're not calling methods only in 
`RealtimeBootstrapBaseFileSplit` here.  `s` is a instance of 
`RealtimeBootstrapBaseFileSplit` already and `RealtimeBootstrapBaseFileSplit` 
is a subclass of `InputSplit`.  Directly adding `s` with `rtSplits.add(s);` is 
not a problem.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean 
populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> 
fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = 
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       Understood.  What you are saying is that input format now takes 
partition paths instead of base file paths.
   
   Then the question is, there could be base files under the same partition 
path.  `inputPaths` can potentially have duplicated partition paths.   Should 
it be deduped with `Set<String>` and then passed to 
getRecordsUsingInputFormat() with `new List<>(pathSet)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to