alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798007427
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)}
operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to
the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+
HoodieTableMetaClient tableMetaClient,
+ List<Path>
inputPaths,
+ String
incrementalTable) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option<List<HoodieInstant>> commitsToCheck =
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName,
timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option<String> incrementalInputPaths =
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(),
tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ protected abstract boolean includeLogFilesForSnapshotView();
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+
Stream<HoodieLogFile> logFiles,
+
Option<HoodieInstant> latestCompletedInstantOpt,
+
HoodieTableMetaClient tableMetaClient) {
+ List<HoodieLogFile> sortedLogFiles =
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+ try {
+ RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBaseFilePath(baseFile.getPath());
+ rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+ if (latestCompletedInstantOpt.isPresent()) {
+ HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+ checkState(latestCompletedInstant.isCompleted());
+
+ rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+ }
+
+ if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile ||
baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+ rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+ }
+
+ return rtFileStatus;
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to init %s",
RealtimeFileStatus.class.getSimpleName()), e);
+ }
+ }
+
+ @Nonnull
+ private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path>
snapshotPaths) throws IOException {
+ return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job,
tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+ }
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Review comment:
Not sure i follow your suggestion, can you elaborate on what you have in
mind to be included in this doc?
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,126 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)}
operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to
the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+
HoodieTableMetaClient tableMetaClient,
+ List<Path>
inputPaths,
+ String
incrementalTable) throws IOException {
+ Job jobContext = Job.getInstance(job);
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option<List<HoodieInstant>> commitsToCheck =
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext,
incrementalTable, timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option<String> incrementalInputPaths =
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(),
tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ protected abstract boolean includeLogFilesForSnapshotView();
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+
Stream<HoodieLogFile> logFiles,
+
Option<HoodieInstant> latestCompletedInstantOpt,
+
HoodieTableMetaClient tableMetaClient) {
+ List<HoodieLogFile> sortedLogFiles =
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
Review comment:
This is a new
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)}
operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to
the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+
HoodieTableMetaClient tableMetaClient,
+ List<Path>
inputPaths,
+ String
incrementalTable) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option<List<HoodieInstant>> commitsToCheck =
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName,
timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option<String> incrementalInputPaths =
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(),
tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ protected abstract boolean includeLogFilesForSnapshotView();
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Review comment:
Do you have something in mind?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean
populateMetaFields) throws Exception {
assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry ->
fileIdToSize.get(entry.getKey()) < entry.getValue()));
- List<String> dataFiles =
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+ List<String> inputPaths = roView.getLatestBaseFiles()
+ .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+ .collect(Collectors.toList());
Review comment:
Duplication isn't an issue b/c this list is only used for files
filtering currently
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -187,13 +285,28 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
.map(fileSlice -> {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt =
fileSlice.getLatestLogFile();
- if (baseFileOpt.isPresent()) {
- return getFileStatusUnchecked(baseFileOpt);
- } else if (includeLogFilesForSnapShotView() &&
latestLogFileOpt.isPresent()) {
- return
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(),
fileSlice.getLogFiles());
+ Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+
+ Option<HoodieInstant> latestCompletedInstantOpt =
+ fromScala(fileIndex.latestCompletedInstant());
+
+ // Check if we're reading a MOR table
+ if (includeLogFilesForSnapshotView()) {
+ if (baseFileOpt.isPresent()) {
+ return
createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles,
latestCompletedInstantOpt, tableMetaClient);
Review comment:
A list of partition paths that are being read by Hive in snapshot mode
(name is a carry-over from the previous code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]