danny0405 commented on code in PR #12984:
URL: https://github.com/apache/hudi/pull/12984#discussion_r2006678961
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -903,6 +875,166 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
}
}
+ private static Iterator<HoodieRecord>
getRecordIndexRecordsForFileSliceWithLogFiles(HoodieTableMetaClient
dataTableMetaClient, String partitionPath, String fileId,
+
HoodieTableFileSystemView fsView, List<HoodieWriteStat>
logFileWriteStats,
+
String basePath, EngineType engineType, String instantTime, int
writesFileIdEncoding,
+
Option<Schema> finalWriterSchemaOpt) throws Exception {
+ if
(dataTableMetaClient.getTableConfig().getRecordMergeMode().equals(RecordMergeMode.COMMIT_TIME_ORDERING))
{
+ // with commit time ordering, we don't need to merge log files with base
files and hence could compute the record index records in an optimized manner
+ return
getRecordIndexRecordsForLogFilesWithCommitTimeOrdering(dataTableMetaClient,
partitionPath, fileId, logFileWriteStats, engineType, instantTime,
+ writesFileIdEncoding, finalWriterSchemaOpt);
+ } else {
+ List<HoodieRecord> allRecords = new ArrayList<>();
+ List<FileSlice> previousFileSliceForFileIdList =
fsView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, instantTime)
+ .filter(fileSlice ->
fileSlice.getFileId().equals(fileId)).collect(toList());
+ if (previousFileSliceForFileIdList.size() > 1) {
+ throw new HoodieException("Found two file slices for same fileId " +
fileId + ", in partition " + partitionPath);
+ }
+ if (previousFileSliceForFileIdList.isEmpty()) {
+ throw new HoodieException("Cannot find the file slice for fileId " +
fileId + ", in partition + " + partitionPath + ", in FileSystemView. ");
+ }
+ FileSlice previousFileSliceForFileId =
previousFileSliceForFileIdList.get(0);
+ FileSlice latestFileSlicesIncludingInflight = new
FileSlice(previousFileSliceForFileId);
+ logFileWriteStats.forEach(logFileWriteStat -> {
+ latestFileSlicesIncludingInflight.addLogFile(new HoodieLogFile(new
StoragePath(basePath + "/" + logFileWriteStat.getPath())));
+ });
+
+ Set<String> prevSliceRecordKeys = new HashSet<>();
+ if (!previousFileSliceForFileId.hasLogFiles()) {
+ // if previous slice only contains base file, directly read base file
by projecting just record key instead of going via File slice or file group
reader.
+ prevSliceRecordKeys = new
HashSet<>(RecordIndexUtils.getRecordKeyStatuses(dataTableMetaClient.getBasePath().toString(),
partitionPath,
+ previousFileSliceForFileId.getBaseFile().get().getFileName(),
null, dataTableMetaClient.getStorage(),
Collections.singleton(RecordIndexUtils.RecordStatus.INSERT))
+ .get(RecordIndexUtils.RecordStatus.INSERT));
+ } else {
+ prevSliceRecordKeys =
getValidRecordKeysForFileSlice(dataTableMetaClient, engineType,
+ previousFileSliceForFileId.getLogFiles().map(entry ->
entry.getPath().toString()).collect(toList()),
+ tryResolveSchemaForTable(dataTableMetaClient), partitionPath,
previousFileSliceForFileId.getBaseFile()
+ .map(entry -> new StoragePath(entry.getPath())), instantTime);
+ }
+
+ Set<String> latestSliceRecordKeys =
getValidRecordKeysForFileSlice(dataTableMetaClient, engineType,
Review Comment:
With this patch, even with MOR, the log write performance would be degraded
with a `COW` style base and delta diff query. Let's do a micro-benchmarking.
BTW, can we introduce the event time merging for the RLI records, e.g. to
use the ordering value from the data record as the corresponding ordering value
of the RLI record.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]