codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1905155911
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -774,65 +776,93 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
@VisibleForTesting
public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
-
HoodieCommitMetadata commitMetadata,
-
HoodieMetadataConfig metadataConfig,
-
HoodieTableMetaClient dataTableMetaClient,
- int
writesFileIdEncoding,
- String
instantTime) {
-
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+
int writesFileIdEncoding,
+
String instantTime,
+
EngineType engineType) {
List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
-
+ // Return early if there are no write stats, or if the operation is a
compaction.
if (allWriteStats.isEmpty() || commitMetadata.getOperationType() ==
WriteOperationType.COMPACT) {
return engineContext.emptyHoodieData();
}
+ // RLI cannot support logs having inserts with current offering. So, lets
validate that.
+ if (allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+ })) {
+ throw new HoodieIOException("RLI cannot support logs having inserts with
current offering. Would recommend disabling Record Level Index");
+ }
try {
- int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ Map<String, List<HoodieWriteStat>> writeStatsByFileId =
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+ int parallelism = Math.max(Math.min(writeStatsByFileId.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
String basePath = dataTableMetaClient.getBasePath().toString();
HoodieFileFormat baseFileFormat =
dataTableMetaClient.getTableConfig().getBaseFileFormat();
- // RLI cannot support logs having inserts with current offering. So,
lets validate that.
- if (allWriteStats.stream().anyMatch(writeStat -> {
- String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
- return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
- })) {
- throw new HoodieIOException("RLI cannot support logs having inserts
with current offering. Would recommend disabling Record Level Index");
- }
-
- // we might need to set some additional variables if we need to process
log files.
- // for RLI and MOR table, we only care about log files if they contain
any deletes. If not, all entries in logs are considered as updates, for which
- // we do not need to generate new RLI record.
- boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
- String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
- return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
- });
-
- Option<Schema> writerSchemaOpt = Option.empty();
- if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
- writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
- }
- int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> writerSchemaOpt =
tryResolveSchemaForTable(dataTableMetaClient);
Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
- HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
- .flatMap(writeStat -> {
- HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
- StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
- // handle base files
- if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
- return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
- } else if (FSUtils.isLogFile(fullFilePath)) {
- // for logs, we only need to process log files containing deletes
- if (writeStat.getNumDeletes() > 0) {
- Set<String> deletedRecordKeys =
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
- finalWriterSchemaOpt, maxBufferSize, instantTime, false,
true);
- return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
- }
- // ignore log file data blocks.
- return new ArrayList<HoodieRecord>().iterator();
- } else {
- throw new HoodieIOException("Unsupported file type " +
fullFilePath.toString() + " while generating MDT records");
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()),
parallelism)
+ .flatMap(writeStatsByFileIdEntry -> {
+ String fileId = writeStatsByFileIdEntry.getKey();
+ List<HoodieWriteStat> writeStats =
writeStatsByFileIdEntry.getValue();
+ // Partition the write stats into base file and log file write
stats
+ List<HoodieWriteStat> baseFileWriteStats = writeStats.stream()
+ .filter(writeStat ->
writeStat.getPath().endsWith(baseFileFormat.getFileExtension()))
+ .collect(Collectors.toList());
+ List<HoodieWriteStat> logFileWriteStats = writeStats.stream()
+ .filter(writeStat -> FSUtils.isLogFile(new
StoragePath(writeStats.get(0).getPath())))
+ .collect(Collectors.toList());
+ // Ensure that only one of base file or log file write stats exists
+ checkState(baseFileWriteStats.isEmpty() ||
logFileWriteStats.isEmpty(),
+ "A single fileId cannot have both base file and log file write
stats in the same commit. FileId: " + fileId);
+ // Process base file write stats
+ if (!baseFileWriteStats.isEmpty()) {
+ return baseFileWriteStats.stream()
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ return
CollectionUtils.toStream(BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage));
+ })
+ .iterator();
}
+ // Process log file write stats
+ if (!logFileWriteStats.isEmpty()) {
+ String partitionPath =
logFileWriteStats.get(0).getPartitionPath();
+ List<String> currentLogFilePaths = logFileWriteStats.stream()
+ .map(writeStat -> new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()).toString())
+ .collect(Collectors.toList());
+ List<String> allLogFilePaths = logFileWriteStats.stream()
+ .flatMap(writeStat -> {
+ checkState(writeStat instanceof HoodieDeltaWriteStat, "Log
file should be associated with a delta write stat");
+ List<String> currentLogFiles = ((HoodieDeltaWriteStat)
writeStat).getLogFiles().stream()
+ .map(logFile -> new StoragePath(new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()),
logFile).toString())
+ .collect(Collectors.toList());
+ return currentLogFiles.stream();
+ })
+ .collect(Collectors.toList());
+ // Extract revived and deleted keys
+ Pair<Set<String>, Set<String>> revivedAndDeletedKeys =
+ getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient,
instantTime, engineType, allLogFilePaths, finalWriterSchemaOpt,
currentLogFilePaths);
Review Comment:
yeah but i kept it modular so that it's easy to test just with key set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]