codope commented on code in PR #11162:
URL: https://github.com/apache/hudi/pull/11162#discussion_r1624769566
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -851,6 +852,158 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
return recordKeyMap;
}
+ @Override
+ protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName) {
+ if (keys.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result = new
HashMap<>();
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+
+ // Lookup keys from each file slice
+ // TODO: parallelize this loop
+ for (FileSlice partition : partitionFileSlices) {
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys,
partition);
+
+ currentFileSliceResult.forEach((secondaryKey, secondaryRecords) -> {
+ result.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords)
-> {
+ newRecords.addAll(oldRecords);
+ return newRecords;
+ });
+ });
+ }
+
+ return result;
+ }
+
+ /**
+ * Lookup list of keys from a single file slice.
+ *
+ * @param partitionName Name of the partition
+ * @param secondaryKeys The list of secondary keys to lookup
+ * @param fileSlice The file slice to read
+ * @return A {@code Map} of secondary-key to list of {@code HoodieRecord}
for the secondary-keys which were found in the file slice
+ */
+ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
lookupSecondaryKeysFromFileSlice(String partitionName, List<String>
secondaryKeys, FileSlice fileSlice) {
+ Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice);
+ try {
+ List<Long> timings = new ArrayList<>(1);
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+ if (baseFileReader == null && logRecordScanner == null) {
+ return Collections.emptyMap();
+ }
+
+ // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
+ Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+ List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+ Collections.sort(sortedSecondaryKeys);
Review Comment:
Good point! I have now parallelized the lookup through engineContext. So,
this sorting would only be limited to single partition of data which should not
spill to disk. But, even if it does, spark will handle it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]