yihua commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r794793796
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
- public static InputSplit[] getRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) {
+ public static InputSplit[] getRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
+ if (fileSplits.isEmpty()) {
+ return new InputSplit[0];
+ }
+
+ FileSplit fileSplit = fileSplits.get(0);
+
+ // Pre-process table-config to fetch virtual key info
+ Path partitionPath = fileSplit.getPath().getParent();
+ HoodieTableMetaClient metaClient =
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+ Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt =
getHoodieVirtualKeyInfo(metaClient);
+
+ // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+ HoodieInstant latestCommitInstant =
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+ InputSplit[] finalSplits = fileSplits.stream()
+ .map(split -> {
+ // There are 4 types of splits could we have to handle here
+ // - {@code BootstrapBaseFileSplit}: in case base file does have
associated bootstrap file,
+ // but does NOT have any log files appended (convert it to {@code
RealtimeBootstrapBaseFileSplit})
+ // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does
have associated bootstrap file
+ // and does have log files appended
+ // - {@code BaseFileWithLogsSplit}: in case base file does NOT have
associated bootstrap file
Review comment:
Got it. Could you update the docs? Originally I thought `else` block
only handles non-Hudi path cases.
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##########
@@ -44,9 +44,7 @@
private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
- public HoodieRealtimeFileSplit() {
- super();
Review comment:
In that case, we need to remove the explicit no-arg constructor to have
the default behavior? Here it explicitly does nothing.
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)}
operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to
the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+
HoodieTableMetaClient tableMetaClient,
+ List<Path>
inputPaths,
+ String
incrementalTable) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option<List<HoodieInstant>> commitsToCheck =
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName,
timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option<String> incrementalInputPaths =
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(),
tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ protected abstract boolean includeLogFilesForSnapshotView();
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Review comment:
Got it. I saw two methods of `createRealtimeFileStatusUnchecked()` with
different args so wondering what's the difference. Basically, one for base
file + log files, and the other for log files only. Maybe rename them so the
method names are more descriptive?
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
Review comment:
Got it, makes sense. In the future, you can have a separate PR for just
reordering, which makes the review turnaround shorter.
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -144,28 +204,32 @@
return rtSplits.toArray(new InputSplit[0]);
}
+ /**
+ * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration,
List)}
+ */
// get IncrementalRealtimeSplits
- public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) throws IOException {
+ public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
+
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+ "All splits have to belong to incremental query");
+
List<InputSplit> rtSplits = new ArrayList<>();
- List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
- Set<Path> partitionSet = fileSplitList.stream().map(f ->
f.getPath().getParent()).collect(Collectors.toSet());
+ Set<Path> partitionSet = fileSplits.stream().map(f ->
f.getPath().getParent()).collect(Collectors.toSet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient =
getTableMetaClientByPartitionPath(conf, partitionSet);
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionSet.size() > 0) {
hoodieVirtualKeyInfo =
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo =
hoodieVirtualKeyInfo;
- fileSplitList.stream().forEach(s -> {
+ fileSplits.stream().forEach(s -> {
// deal with incremental query.
try {
if (s instanceof BaseFileWithLogsSplit) {
- BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
- if (bs.getBelongToIncrementalSplit()) {
- rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(),
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
- }
+ BaseFileWithLogsSplit bs = unsafeCast(s);
+ rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(),
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
- rtSplits.add(s);
+ RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);
Review comment:
Yeah, my point is, you're not calling methods only in
`RealtimeBootstrapBaseFileSplit` here. `s` is a instance of
`RealtimeBootstrapBaseFileSplit` already and `RealtimeBootstrapBaseFileSplit`
is a subclass of `InputSplit`. Directly adding `s` with `rtSplits.add(s);` is
not a problem.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean
populateMetaFields) throws Exception {
assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry ->
fileIdToSize.get(entry.getKey()) < entry.getValue()));
- List<String> dataFiles =
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+ List<String> inputPaths = roView.getLatestBaseFiles()
+ .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+ .collect(Collectors.toList());
Review comment:
Understood. What you are saying is that input format now takes
partition paths instead of base file paths.
Then the question is, there could be base files under the same partition
path. `inputPaths` can potentially have duplicated partition paths. Should
it be deduped with `Set<String>` and then passed to
getRecordsUsingInputFormat() with `new List<>(pathSet)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]