xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740695822
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -61,9 +85,183 @@
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
- Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job,
numSplits)).map(is -> (FileSplit) is);
+ List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job,
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+ boolean isIncrementalSplits =
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+ return isIncrementalSplits ?
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job,
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job,
fileSplits.stream());
+ }
+
+ /**
+ * Keep the logic of mor_incr_view as same as spark datasource.
+ * Step1: Get list of commits to be fetched based on start commit and max
commits(for snapshot max commits is -1).
+ * Step2: Get list of affected files status for these affected file status.
+ * Step3: Construct HoodieTableFileSystemView based on those affected file
status.
+ * a. Filter affected partitions based on inputPaths.
+ * b. Get list of fileGroups based on affected partitions by
fsView.getAllFileGroups.
+ * Step4: Set input paths based on filtered affected partition paths.
changes that amony original input paths passed to
+ * this method. some partitions did not have commits as part of the
trimmed down list of commits and hence we need this step.
+ * Step5: Find candidate fileStatus, since when we get baseFileStatus from
HoodieTableFileSystemView,
+ * the BaseFileStatus will missing file size information.
+ * We should use candidate fileStatus to update the size information
for BaseFileStatus.
+ * Step6: For every file group from step3(b)
+ * Get 1st available base file from all file slices. then we use
candidate file status to update the baseFileStatus,
+ * and construct RealTimeFileStatus and add it to result along with
log files.
+ * If file group just has log files, construct RealTimeFileStatus and
add it to result.
+ * TODO: unify the incremental view code between hive/spark-sql and spark
datasource
+ */
+ @Override
+ protected List<FileStatus> listStatusForIncrementalMode(
+ JobConf job, HoodieTableMetaClient tableMetaClient, List<Path>
inputPaths) throws IOException {
+ List<FileStatus> result = new ArrayList<>();
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+
+ // step1
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return result;
+ }
+ String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext,
tableName);
+ // Total number of commits to return in this batch. Set this to -1 to get
all the commits.
+ Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
Review comment:
good suggest, thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]