the-other-tim-brown commented on code in PR #13602:
URL: https://github.com/apache/hudi/pull/13602#discussion_r2229919982
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -152,64 +143,31 @@ private HoodieFileGroupReader(HoodieReaderContext<T>
readerContext, HoodieStorag
return Option.of(preCombineField);
});
this.readStats = new HoodieReadStats();
- this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
- readerContext.getMergeMode(), tableConfig.getPartialUpdateMode(),
props,
- isSkipMerge, shouldUseRecordPosition, readStats, emitDelete,
sortOutput);
- this.allowInflightInstants = allowInflightInstants;
- }
-
- /**
- * Initialize correct record buffer
- */
- private FileGroupRecordBuffer<T> getRecordBuffer(HoodieReaderContext<T>
readerContext,
- HoodieTableMetaClient
hoodieTableMetaClient,
- RecordMergeMode
recordMergeMode,
- PartialUpdateMode
partialUpdateMode,
- TypedProperties props,
- boolean isSkipMerge,
- boolean
shouldUseRecordPosition,
- HoodieReadStats readStats,
- boolean emitDelete,
- boolean sortOutput) {
- if (inputSplit.logFiles.isEmpty()) {
- return null;
- }
- UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, emitDelete, fileGroupUpdateCallback);
- if (isSkipMerge) {
- return new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, readStats);
- } else if (sortOutput) {
- return new SortedKeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, orderingFieldName, updateProcessor);
- } else if (shouldUseRecordPosition &&
inputSplit.baseFileOption.isPresent()) {
- return new PositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, inputSplit.baseFileOption.get().getCommitTime(), props,
orderingFieldName, updateProcessor);
- } else {
- return new KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, orderingFieldName, updateProcessor);
- }
}
/**
* Initialize internal iterators on the base and log files.
*/
private void initRecordIterators() throws IOException {
ClosableIterator<T> iter = makeBaseFileIterator();
- if (inputSplit.logFiles.isEmpty()) {
+ if (inputSplit.getLogFiles().isEmpty()) {
this.baseFileIterator = new CloseableMappingIterator<>(iter,
readerContext::seal);
} else {
this.baseFileIterator = iter;
- scanLogFiles();
+ Pair<HoodieFileGroupRecordBuffer<T>, List<String>> intializationResult =
recordBufferInitializer.getRecordBuffer(
Review Comment:
thanks, updating this in my latest commit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]