alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798204981



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,70 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code 
RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does 
have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have 
associated bootstrap file
+        //       and does have log files appended;
+        //    - {@code FileSplit}: in case Hive passed down non-Hudi path
+        if (split instanceof RealtimeBootstrapBaseFileSplit) {
+          return split;
+        } else if (split instanceof BootstrapBaseFileSplit) {
+          BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+          return createRealtimeBoostrapBaseFileSplit(
+              bootstrapBaseFileSplit,
+              metaClient.getBasePath(),
+              Collections.emptyList(),
+              latestCommitInstant.getTimestamp(),
+              false);
+        } else if (split instanceof BaseFileWithLogsSplit) {
+          BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);

Review comment:
       Yes, it's in sync. However, you brought up a very good point that the 
instant shouldn't actually be set here. This will be cleaned up in subsequent 
PRs where `HoodieRealtimeFileSplit` will be merged with `BaseWithLogFilesSplit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to