codope commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1616259478
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
+ }
+ });
+ });
+
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
+ status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+ // Consider those keys which were either updated or deleted in this
commit
+ if (!recordDelegate.getNewLocation().isPresent() ||
(recordDelegate.getCurrentLocation().isPresent() &&
recordDelegate.getNewLocation().isPresent())) {
+ keysToRemove.add(recordDelegate.getRecordKey());
+ }
+ });
+ });
+ HoodieFunctionalIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexPartition);
+
+ // Fetch the secondary keys that each of the record keys ('keysToRemove')
maps to
+ // This is obtained by scanning the entire secondary index partition in
the metadata table
+ // This could be an expensive operation for a large commit
(updating/deleting millions of rows)
+ Map<String, String> recordKeySecondaryKeyMap =
metadata.getSecondaryKeys(keysToRemove);
+ HoodieData<HoodieRecord> deletedRecords =
getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap,
indexDefinition);
+
+ // Reuse record index parallelism config to build secondary index
+ int parallelism = Math.min(partitionFilePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]