codope commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1616259478


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));
+        }
+      });
+    });
+
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {
+      status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+        // Consider those keys which were either updated or deleted in this 
commit
+        if (!recordDelegate.getNewLocation().isPresent() || 
(recordDelegate.getCurrentLocation().isPresent() && 
recordDelegate.getNewLocation().isPresent())) {
+          keysToRemove.add(recordDelegate.getRecordKey());
+        }
+      });
+    });
+    HoodieFunctionalIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexPartition);
+
+    // Fetch the secondary keys that each of the record keys ('keysToRemove') 
maps to
+    // This is obtained by scanning the entire secondary index partition in 
the metadata table
+    // This could be an expensive operation for a large commit 
(updating/deleting millions of rows)
+    Map<String, String> recordKeySecondaryKeyMap = 
metadata.getSecondaryKeys(keysToRemove);
+    HoodieData<HoodieRecord> deletedRecords = 
getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap, 
indexDefinition);
+
+    // Reuse record index parallelism config to build secondary index
+    int parallelism = Math.min(partitionFilePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to