danny0405 commented on code in PR #12313:
URL: https://github.com/apache/hudi/pull/12313#discussion_r1853159685
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,120 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey,
HoodieRecord>) t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for
metadata table", e);
+ }
+ }
+
+ static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName);
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFiles) {
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat,
storage).iterator();
+ } else {
+ // for logs, every entry is either an update or a delete
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ return getRecordKeys(fullFilePath.toString(),
dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize,
instantTime).iterator();
+ }
+ }).collectAsList();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for
metadata table", e);
Review Comment:
The error msg is incorrect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]