danny0405 commented on a change in pull request #3122:
URL: https://github.com/apache/hudi/pull/3122#discussion_r664272560
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -102,28 +105,37 @@
HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
latestFileSlices.forEach(fileSlice -> {
+ List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList());
List<FileSplit> dataFileSplits =
groupedInputSplits.get(fileSlice.getFileId());
- dataFileSplits.forEach(split -> {
- try {
- List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList());
- if (split instanceof BootstrapBaseFileSplit) {
- BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
- String[] hosts = split.getLocationInfo() != null ?
Arrays.stream(split.getLocationInfo())
- .filter(x -> !x.isInMemory()).toArray(String[]::new) : new
String[0];
- String[] inMemoryHosts = split.getLocationInfo() != null ?
Arrays.stream(split.getLocationInfo())
-
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
- FileSplit baseSplit = new FileSplit(eSplit.getPath(),
eSplit.getStart(), eSplit.getLength(),
- hosts, inMemoryHosts);
- rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit,
metaClient.getBasePath(),
- logFilePaths, maxCommitTime,
eSplit.getBootstrapFileSplit()));
- } else {
- rtSplits.add(new HoodieRealtimeFileSplit(split,
metaClient.getBasePath(), logFilePaths, maxCommitTime));
+ if (dataFileSplits != null) {
+ dataFileSplits.forEach(split -> {
+ try {
+ if (split instanceof BootstrapBaseFileSplit) {
+ BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit)
split;
+ String[] hosts = split.getLocationInfo() != null ?
Arrays.stream(split.getLocationInfo())
+ .filter(x -> !x.isInMemory()).toArray(String[]::new) :
new String[0];
+ String[] inMemoryHosts = split.getLocationInfo() != null ?
Arrays.stream(split.getLocationInfo())
+
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
+ FileSplit baseSplit = new FileSplit(eSplit.getPath(),
eSplit.getStart(), eSplit.getLength(),
+ hosts, inMemoryHosts);
+ rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit,
metaClient.getBasePath(),
+ logFilePaths, maxCommitTime,
eSplit.getBootstrapFileSplit()));
+ } else {
+ rtSplits.add(new HoodieRealtimeFileSplit(split,
metaClient.getBasePath(), logFilePaths, maxCommitTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error creating hoodie real time
split ", e);
}
+ });
+ } else {
+ // the file group has only logs (say the index is global).
+ try {
+ rtSplits.add(new
HoodieRealtimeFileSplit(DummyInputSplit.INSTANCE, metaClient.getBasePath(),
logFilePaths, maxCommitTime));
Review comment:
Personally i want to fix the NPE at first, read the file groups with
parquets is better than a thrown exception. You you think reading the logs only
file group adds quite bit of complexity, we can avoid that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]