alexeykudinkin commented on a change in pull request #4559:
URL: https://github.com/apache/hudi/pull/4559#discussion_r799894937
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -65,12 +67,32 @@
* <li>Incremental mode: reading table's state as of particular timestamp
(or instant, in Hudi's terms)</li>
* <li>External mode: reading non-Hudi partitions</li>
* </ul>
+ *
+ * NOTE: This class is invariant of the underlying file-format of the files
being read
*/
public abstract class HoodieFileInputFormatBase extends
FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {
protected Configuration conf;
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles) {
+ List<HoodieLogFile> sortedLogFiles =
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+ try {
+ RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBaseFilePath(baseFile.getPath());
+ if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile ||
baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+ rtFileStatus.setBootStrapFileStatus(baseFileStatus);
Review comment:
Correct
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
##########
@@ -38,32 +36,18 @@
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
/**
* HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base
file format.
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
-public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
+public class HoodieHFileRealtimeInputFormat extends
HoodieRealtimeFileInputFormatBase {
private static final Logger LOG =
LogManager.getLogger(HoodieHFileRealtimeInputFormat.class);
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
- List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits))
- .map(is -> (FileSplit) is)
- .collect(Collectors.toList());
- return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
- }
-
- @Override
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline
timeline) {
- // no specific filtering for Realtime format
- return timeline;
- }
+ // NOTE: We're only using {@code HoodieHFileInputFormat} to compose {@code
RecordReader}
+ private final HoodieHFileInputFormat hFileInputFormat = new
HoodieHFileInputFormat();
Review comment:
In principle, COW/MOR handling should be invariant of file-format used
underneath.
Good catch w/ `isSplitable`
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -172,35 +198,25 @@ private static FileStatus
getFileStatusUnchecked(HoodieBaseFile baseFile) {
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
- protected abstract boolean includeLogFilesForSnapshotView();
+ @Override
+ protected FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts, String[] inMemoryHosts) {
+ FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
+ if (file instanceof PathWithBootstrapFileStatus) {
Review comment:
This is used only for COW
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]