nsivabalan commented on code in PR #12269:
URL: https://github.com/apache/hudi/pull/12269#discussion_r1849264418
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,77 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeleteBlocks =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() == 0
&& writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0;
+ });
+ Option<String> latestCommitTimestamp = Option.empty();
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeleteBlocks) { // if we have a log file w/ pure
deletes.
+ latestCommitTimestamp =
Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime());
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ Option<String> finalLatestCommitTimestamp = latestCommitTimestamp;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat,
writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process delete blocks for RLI
+ if (writeStat.getNumInserts() == 0 &&
writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) {
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" +
writeStat.getPath(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize,
finalLatestCommitTimestamp.get());
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey,
HoodieRecord>) t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
Review Comment:
nope. this means there is some bug in RLI itself or some code path.
essentially in global index use-cases, we can't have dups. For eg, even if
we disable RLI and fallback to using GLOBAL_SIMPLE index, we should ensure
there are no dups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]