the-other-tim-brown commented on code in PR #13361:
URL: https://github.com/apache/hudi/pull/13361#discussion_r2116138598
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -84,51 +93,50 @@ protected HoodieWriteConfig getWriteConfig() {
return this.writeConfig;
}
- protected ClosableIterator<HoodieRecord<T>>
getRecordIteratorWithLogFiles(ClusteringOperation operation, String
instantTime, long maxMemory,
-
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader>
baseFileReaderOpt) {
+ protected ClosableIterator<HoodieRecord<T>>
getRecordIterator(ReaderContextFactory<T> readerContextFactory,
ClusteringOperation operation, String instantTime, long maxMemory) {
HoodieWriteConfig config = getWriteConfig();
+ TypedProperties props = TypedProperties.copy(config.getProps());
+ props.setProperty(MAX_MEMORY_FOR_MERGE.key(), Long.toString(maxMemory));
+
HoodieTable table = getHoodieTable();
- HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
- HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(table.getStorage())
- .withBasePath(table.getMetaClient().getBasePath())
- .withLogFilePaths(operation.getDeltaFilePaths())
- .withReaderSchema(readerSchemaWithMetaFields)
- .withLatestInstantTime(instantTime)
- .withMaxMemorySizeInBytes(maxMemory)
- .withReverseReader(config.getCompactionReverseLogReadEnabled())
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withSpillableMapBasePath(config.getSpillableMapBasePath())
- .withPartition(operation.getPartitionPath())
- .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
- .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
- .withRecordMerger(config.getRecordMerger())
- .withTableMetaClient(table.getMetaClient())
- .build();
+ FileSlice fileSlice =
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(),
operation);
+ final boolean usePosition =
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+ Option<InternalSchema> internalSchema =
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
try {
- return new HoodieFileSliceReader(baseFileReaderOpt, scanner,
readerSchemaWithMetaFields, tableConfig.getPreCombineField(),
config.getRecordMerger(),
- tableConfig.getProps(),
- tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
- tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
+ return getFileGroupReader(table.getMetaClient(), fileSlice,
readerSchemaWithMetaFields, internalSchema, readerContextFactory, instantTime,
usePosition).getClosableHoodieRecordIterator();
} catch (IOException e) {
throw new HoodieClusteringException("Error reading file slices", e);
}
}
- protected ClosableIterator<HoodieRecord<T>>
getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt,
HoodieFileReader baseFileReader) {
- // NOTE: Record have to be cloned here to make sure if it holds low-level
engine-specific
- // payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
- // it since these records will be shuffled later.
- ClosableIterator<HoodieRecord> baseRecordsIterator;
- try {
- baseRecordsIterator =
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
- } catch (IOException e) {
- throw new HoodieClusteringException("Error reading base file", e);
- }
- return new CloseableMappingIterator(
- baseRecordsIterator,
- rec -> ((HoodieRecord)
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields,
writeConfig.getProps(), keyGeneratorOpt));
+ /**
+ * Construct FileSlice from a given clustering operation {@code
clusteringOperation}.
+ */
+ protected FileSlice clusteringOperationToFileSlice(String basePath,
ClusteringOperation clusteringOperation) {
+ String partitionPath = clusteringOperation.getPartitionPath();
+ boolean baseFileExists =
!StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath());
+ HoodieBaseFile baseFile = baseFileExists ? new HoodieBaseFile(new
StoragePath(basePath, clusteringOperation.getDataFilePath()).toString()) : null;
+ List<HoodieLogFile> logFiles =
clusteringOperation.getDeltaFilePaths().stream().map(p ->
+ new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
+ basePath, partitionPath), p)))
+ .sorted(new HoodieLogFile.LogFileComparator())
+ .collect(Collectors.toList());
+
+ ValidationUtils.checkState(baseFileExists || !logFiles.isEmpty(), "Both
base file and log files are missing from this clustering operation " +
clusteringOperation);
+ String baseInstantTime = baseFileExists ? baseFile.getCommitTime() :
logFiles.get(0).getDeltaCommitTime();
+ FileSlice fileSlice = new FileSlice(partitionPath, baseInstantTime,
clusteringOperation.getFileId());
+ fileSlice.setBaseFile(baseFile);
Review Comment:
Previously the FileGroupReader path was gated by a check for bootstrap base
files. Now I have removed that check since it is no longer required.
There is at least one existing test that will cover this
[here](https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala#L338)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]