bvaradar commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453225662
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
+ // is this an incremental query
+ List<String> incrementalTables =
getIncrementalTableNames(Job.getInstance(job));
+ if (!incrementalTables.isEmpty()) {
+ //TODO For now assuming the query can be either incremental or snapshot
and NOT both.
+ return getSplitsForIncrementalQueries(job, incrementalTables);
+ }
+
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job,
numSplits)).map(is -> (FileSplit) is);
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Call the HoodieInputFormat::listStatus to obtain all latest parquet
files, based on commit
- // timeline.
- return super.listStatus(job);
+ protected InputSplit[] getSplitsForIncrementalQueries(JobConf job,
List<String> incrementalTables) throws IOException {
+ InputPathHandler inputPathHandler = new InputPathHandler(conf,
getInputPaths(job), incrementalTables);
+ Map<String, HoodieTableMetaClient> tableMetaClientMap =
inputPathHandler.getTableMetaClientMap();
+ List<InputSplit> splits = new ArrayList<>();
+
+ for (String table : incrementalTables) {
+ HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+ if (metaClient == null) {
+ /* This can happen when the INCREMENTAL mode is set for a table but
there were no InputPaths
+ * in the jobConf
+ */
+ continue;
+ }
+ String tableName = metaClient.getTableConfig().getTableName();
+ Path basePath = new Path(metaClient.getBasePath());
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ String lastIncrementalTs =
HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to
get all the commits.
+ Integer maxCommits =
HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+ LOG.info("Last Incremental timestamp for table: " + table + ", was set
as " + lastIncrementalTs);
+ List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList());
+
+ Map<String, List<FileStatus>> partitionToFileStatusesMap =
listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+ fileStatuses.addAll(statuses);
+ }
+ LOG.info("Stats after applying Hudi incremental filter:
total_commits_to_check: " + commitsToCheck.size()
+ + ", total_partitions_touched: " + partitionToFileStatusesMap.size()
+ ", total_files_processed: "
+ + fileStatuses.size());
+ FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+ List<String> commitsList =
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+ // Iterate partitions to create splits
+ partitionToFileStatusesMap.keySet().forEach(path -> {
+ // create an Incremental Split for each file group.
+ fsView.getAllFileGroups(path)
+ .forEach(
+ fileGroup -> splits.add(
+ new HoodieMORIncrementalFileSplit(fileGroup,
basePath.toString(), commitsList.get(commitsList.size() - 1))
+ ));
+ });
+ }
+ Log.info("Total splits generated: " + splits.size());
+ return splits.toArray(new InputSplit[0]);
+ }
+
+ private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+ Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline
timeline) throws IOException {
+ // Extract files touched by these commits.
+ // TODO This might need to be done in parallel like listStatus parallelism
?
+ HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new
HashMap<>();
+ for (HoodieInstant commit: commitsToCheck) {
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+ for (Map.Entry<String, List<HoodieWriteStat>> entry:
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+ partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+ }
+ for (HoodieWriteStat stat : entry.getValue()) {
+ String relativeFilePath = stat.getPath();
+ Path fullPath = relativeFilePath != null ?
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+ if (fullPath != null) {
+ //TODO Should the length of file be totalWriteBytes or
fileSizeInBytes?
Review comment:
@bhasudha : totalWriteBytes = fileSizeInBytes for base files (parquet).
For log files, this is not the case as we use a heuristic to estimated the
bytes written per delta-commit.
Getting the actual file size would require a RPC call and will be costly
here. Also, looking at where the file size is useful in read-path, it is only
needed for combining and splitting file-splits which is based on base-file for
Realtime. So, it should be fine to use the metadata stat
stat.getTotalWriteBytes() with a change.
As same log file can appear in multiple delta commits (for HDFS and other
file-systems supporting appends), the logic below needs to handle that, You can
simply have the cumulative write-bytes for each log file appearing across
delta-commits to get a better approximate size of log files.
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
+ // is this an incremental query
+ List<String> incrementalTables =
getIncrementalTableNames(Job.getInstance(job));
+ if (!incrementalTables.isEmpty()) {
+ //TODO For now assuming the query can be either incremental or snapshot
and NOT both.
+ return getSplitsForIncrementalQueries(job, incrementalTables);
+ }
+
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job,
numSplits)).map(is -> (FileSplit) is);
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Call the HoodieInputFormat::listStatus to obtain all latest parquet
files, based on commit
- // timeline.
- return super.listStatus(job);
+ protected InputSplit[] getSplitsForIncrementalQueries(JobConf job,
List<String> incrementalTables) throws IOException {
+ InputPathHandler inputPathHandler = new InputPathHandler(conf,
getInputPaths(job), incrementalTables);
+ Map<String, HoodieTableMetaClient> tableMetaClientMap =
inputPathHandler.getTableMetaClientMap();
+ List<InputSplit> splits = new ArrayList<>();
+
+ for (String table : incrementalTables) {
+ HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+ if (metaClient == null) {
+ /* This can happen when the INCREMENTAL mode is set for a table but
there were no InputPaths
+ * in the jobConf
+ */
+ continue;
+ }
+ String tableName = metaClient.getTableConfig().getTableName();
+ Path basePath = new Path(metaClient.getBasePath());
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ String lastIncrementalTs =
HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to
get all the commits.
+ Integer maxCommits =
HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+ LOG.info("Last Incremental timestamp for table: " + table + ", was set
as " + lastIncrementalTs);
+ List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList());
+
+ Map<String, List<FileStatus>> partitionToFileStatusesMap =
listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+ fileStatuses.addAll(statuses);
+ }
+ LOG.info("Stats after applying Hudi incremental filter:
total_commits_to_check: " + commitsToCheck.size()
+ + ", total_partitions_touched: " + partitionToFileStatusesMap.size()
+ ", total_files_processed: "
+ + fileStatuses.size());
+ FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+ List<String> commitsList =
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+ // Iterate partitions to create splits
+ partitionToFileStatusesMap.keySet().forEach(path -> {
+ // create an Incremental Split for each file group.
+ fsView.getAllFileGroups(path)
+ .forEach(
+ fileGroup -> splits.add(
+ new HoodieMORIncrementalFileSplit(fileGroup,
basePath.toString(), commitsList.get(commitsList.size() - 1))
+ ));
+ });
+ }
+ Log.info("Total splits generated: " + splits.size());
+ return splits.toArray(new InputSplit[0]);
+ }
+
+ private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+ Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline
timeline) throws IOException {
+ // Extract files touched by these commits.
+ // TODO This might need to be done in parallel like listStatus parallelism
?
+ HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new
HashMap<>();
+ for (HoodieInstant commit: commitsToCheck) {
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+ for (Map.Entry<String, List<HoodieWriteStat>> entry:
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+ partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+ }
+ for (HoodieWriteStat stat : entry.getValue()) {
+ String relativeFilePath = stat.getPath();
+ Path fullPath = relativeFilePath != null ?
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+ if (fullPath != null) {
+ //TODO Should the length of file be totalWriteBytes or
fileSizeInBytes?
+ FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false,
0, 0,
+ 0, fullPath);
+ partitionToFileStatusesMap.get(entry.getKey()).add(fs);
Review comment:
need to handle duplicate log files here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]