nsivabalan commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798155468
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,70 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
- public static InputSplit[] getRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) {
+ public static InputSplit[] getRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
+ if (fileSplits.isEmpty()) {
+ return new InputSplit[0];
+ }
+
+ FileSplit fileSplit = fileSplits.get(0);
+
+ // Pre-process table-config to fetch virtual key info
+ Path partitionPath = fileSplit.getPath().getParent();
+ HoodieTableMetaClient metaClient =
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+ Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt =
getHoodieVirtualKeyInfo(metaClient);
+
+ // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+ HoodieInstant latestCommitInstant =
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+ InputSplit[] finalSplits = fileSplits.stream()
+ .map(split -> {
+ // There are 4 types of splits could we have to handle here
+ // - {@code BootstrapBaseFileSplit}: in case base file does have
associated bootstrap file,
+ // but does NOT have any log files appended (convert it to {@code
RealtimeBootstrapBaseFileSplit})
+ // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does
have associated bootstrap file
+ // and does have log files appended
+ // - {@code BaseFileWithLogsSplit}: in case base file does NOT have
associated bootstrap file
+ // and does have log files appended;
+ // - {@code FileSplit}: in case Hive passed down non-Hudi path
+ if (split instanceof RealtimeBootstrapBaseFileSplit) {
+ return split;
+ } else if (split instanceof BootstrapBaseFileSplit) {
+ BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+ return createRealtimeBoostrapBaseFileSplit(
+ bootstrapBaseFileSplit,
+ metaClient.getBasePath(),
+ Collections.emptyList(),
+ latestCommitInstant.getTimestamp(),
+ false);
+ } else if (split instanceof BaseFileWithLogsSplit) {
+ BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);
Review comment:
does the maxCommitTime in baseFileSplit will be in sync with
latestCommitInstant computed at L89. Prior to this patch, use the
latestCommitInstant computed here, where as now, we just reuse the same thats
comes from BaseFileWithLogsSplit.
Just wanted to confirm as these are new code to me.
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,70 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
- public static InputSplit[] getRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) {
+ public static InputSplit[] getRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
Review comment:
this refactoring makes total sense assuming each FileSplit will
correspond to one FileSlice. and there won't be a case where multiple
FileSplits can store info about a single FileSlice.
thanks for doing this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]