codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1896422413


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -801,28 +801,47 @@ public int 
getNumFileGroupsForPartition(MetadataPartitionType partition) {
   }
 
   @Override
-  protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> 
recordKeys, String partitionName) {
+  protected HoodiePairData<String, String> 
getSecondaryKeysForRecordKeys(HoodieData<String> recordKeys, String 
partitionName, int batchSize) {
     if (recordKeys.isEmpty()) {
-      return Collections.emptyMap();
+      return getEngineContext().emptyHoodiePairData();
     }
 
     // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
     List<FileSlice> partitionFileSlices =
         partitionFileSliceMap.computeIfAbsent(partitionName, k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
     if (partitionFileSlices.isEmpty()) {
-      return Collections.emptyMap();
+      return getEngineContext().emptyHoodiePairData();
     }
 
-    // Parallel lookup keys from each file slice
-    Map<String, String> reverseSecondaryKeyMap = new 
HashMap<>(recordKeys.size());
-    getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Lookup 
secondary keys from metadata table partition " + partitionName);
-    List<Pair<String, String>> secondaryToRecordKeyPairList = 
getEngineContext().flatMap(partitionFileSlices,
-        (SerializableFunction<FileSlice, Stream<Pair<String, String>>>) v1 -> 
reverseLookupSecondaryKeys(partitionName, recordKeys, v1)
-            .entrySet().stream()
-            .map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList()).stream(), 
partitionFileSlices.size());
+    // Step 1: Batch record keys
+    HoodieData<List<String>> batchedRecordKeys = recordKeys.mapPartitions(iter 
-> {
+      List<List<String>> batches = new ArrayList<>();
+      List<String> currentBatch = new ArrayList<>();
+
+      while (iter.hasNext()) {

Review Comment:
   Count will trigger the DAG, and repartition could trigger shuffles right? I 
am just doing simple transformation to batch record keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to