bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453132095
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
+ // is this an incremental query
+ List<String> incrementalTables =
getIncrementalTableNames(Job.getInstance(job));
+ if (!incrementalTables.isEmpty()) {
+ //TODO For now assuming the query can be either incremental or snapshot
and NOT both.
+ return getSplitsForIncrementalQueries(job, incrementalTables);
+ }
+
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job,
numSplits)).map(is -> (FileSplit) is);
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Call the HoodieInputFormat::listStatus to obtain all latest parquet
files, based on commit
- // timeline.
- return super.listStatus(job);
+ protected InputSplit[] getSplitsForIncrementalQueries(JobConf job,
List<String> incrementalTables) throws IOException {
+ InputPathHandler inputPathHandler = new InputPathHandler(conf,
getInputPaths(job), incrementalTables);
+ Map<String, HoodieTableMetaClient> tableMetaClientMap =
inputPathHandler.getTableMetaClientMap();
+ List<InputSplit> splits = new ArrayList<>();
+
+ for (String table : incrementalTables) {
+ HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+ if (metaClient == null) {
+ /* This can happen when the INCREMENTAL mode is set for a table but
there were no InputPaths
+ * in the jobConf
+ */
+ continue;
+ }
+ String tableName = metaClient.getTableConfig().getTableName();
+ Path basePath = new Path(metaClient.getBasePath());
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ String lastIncrementalTs =
HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to
get all the commits.
+ Integer maxCommits =
HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+ LOG.info("Last Incremental timestamp for table: " + table + ", was set
as " + lastIncrementalTs);
+ List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList());
+
+ Map<String, List<FileStatus>> partitionToFileStatusesMap =
listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+ fileStatuses.addAll(statuses);
+ }
+ LOG.info("Stats after applying Hudi incremental filter:
total_commits_to_check: " + commitsToCheck.size()
+ + ", total_partitions_touched: " + partitionToFileStatusesMap.size()
+ ", total_files_processed: "
+ + fileStatuses.size());
+ FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+ List<String> commitsList =
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+ // Iterate partitions to create splits
+ partitionToFileStatusesMap.keySet().forEach(path -> {
+ // create an Incremental Split for each file group.
+ fsView.getAllFileGroups(path)
+ .forEach(
+ fileGroup -> splits.add(
+ new HoodieMORIncrementalFileSplit(fileGroup,
basePath.toString(), commitsList.get(commitsList.size() - 1))
+ ));
+ });
+ }
+ Log.info("Total splits generated: " + splits.size());
+ return splits.toArray(new InputSplit[0]);
+ }
+
+ private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+ Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline
timeline) throws IOException {
+ // Extract files touched by these commits.
+ // TODO This might need to be done in parallel like listStatus parallelism
?
+ HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new
HashMap<>();
+ for (HoodieInstant commit: commitsToCheck) {
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+ for (Map.Entry<String, List<HoodieWriteStat>> entry:
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+ partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+ }
+ for (HoodieWriteStat stat : entry.getValue()) {
+ String relativeFilePath = stat.getPath();
+ Path fullPath = relativeFilePath != null ?
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+ if (fullPath != null) {
+ //TODO Should the length of file be totalWriteBytes or
fileSizeInBytes?
Review comment:
@bvaradar @n3nash can you help clarify this ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]