danny0405 commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1609596497


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -540,6 +547,51 @@ private HoodieFunctionalIndexDefinition 
getFunctionalIndexDefinition(String inde
     }
   }
 
+  private Set<String> getSecondaryIndexPartitionsToInit() {
+    Set<String> secondaryIndexPartitions = 
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().values().stream()
+        .map(HoodieFunctionalIndexDefinition::getIndexName)
+        .filter(indexName -> 
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))

Review Comment:
   So secondary index belongs to functional index?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -410,6 +413,14 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
             }
             fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(partitionInfoList);
             break;
+          case SECONDARY_INDEX:
+            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit();
+            if (secondaryIndexPartitionsToInit.isEmpty()) {
+              continue;
+            }
+            ValidationUtils.checkState(secondaryIndexPartitionsToInit.size() 
== 1, "Only one secondary index at a time is supported for now");

Review Comment:
   not sure why we have the number limit.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFunctionalIndexDefinition.java:
##########
@@ -52,7 +55,7 @@ public HoodieFunctionalIndexDefinition(String indexName, 
String indexType, Strin
                                          Map<String, String> indexOptions) {
     this.indexName = indexName;
     this.indexType = indexType;
-    this.indexFunction = indexFunction;
+    this.indexFunction = nonEmpty(indexFunction) ? indexFunction : 
SPARK_IDENTITY;

Review Comment:
   Can you elaborate the semantics for empty index function?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -783,4 +784,101 @@ public int 
getNumFileGroupsForPartition(MetadataPartitionType partition) {
             metadataFileSystemView, partition.getPartitionPath()));
     return partitionFileSliceMap.get(partition.getPartitionPath()).size();
   }
+
+  @Override
+  protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> 
recordKeys, String partitionName) {
+    if (recordKeys.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");

Review Comment:
   Can we just return empty instead of throwing.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -111,6 +111,14 @@ static boolean isMetadataTable(String basePath) {
     return basePath.endsWith(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH);
   }
 
+  static boolean isMetadataTableSecondaryIndexPartition(String basePath, 
Option<String> partitionName) {
+    if (!isMetadataTable(basePath) || !partitionName.isPresent()) {
+      return false;
+    }
+
+    return partitionName.get().startsWith("secondary_index_");

Review Comment:
   It's greate if we can avoid the hard code string.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -977,16 +1014,17 @@ public void updateFromWriteStatuses(HoodieCommitMetadata 
commitMetadata, HoodieD
       Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
-                  enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
-                  dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-                  dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
+              enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
+              dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
       HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpserts(writeStatus);
       HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX, 
updatesFromWriteStatuses.union(additionalUpdates));
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
+      updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
writeStatus);

Review Comment:
   Hmm, I see it in the bottom.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -948,9 +985,9 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
           relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime);
 
       // return early and populate enabledPartitionTypes correctly (check in 
initialCommit)
-      MetadataPartitionType partitionType = 
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX)
-          ? FUNCTIONAL_INDEX
-          : 
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+      MetadataPartitionType partitionType = 
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX)
 ? FUNCTIONAL_INDEX :
+          
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX)
 ? SECONDARY_INDEX :

Review Comment:
   ditto, holding some variables in partition type may simplify the code.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()

Review Comment:
   Maybe we can share code with RLI.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));

Review Comment:
   `Collections.emptyList()` is always better.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -437,9 +448,7 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
       HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
       bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
       metadataMetaClient.reloadActiveTimeline();
-      String partitionPath = partitionType == FUNCTIONAL_INDEX
-          ? dataWriteConfig.getFunctionalIndexConfig().getIndexName()
-          : partitionType.getPartitionPath();
+      String partitionPath = (partitionType == FUNCTIONAL_INDEX || 
partitionType == SECONDARY_INDEX) ? 
dataWriteConfig.getFunctionalIndexConfig().getIndexName() : 
partitionType.getPartitionPath();

Review Comment:
   Maybe we can extend the partition type to hold some variable flags inside.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -977,16 +1014,17 @@ public void updateFromWriteStatuses(HoodieCommitMetadata 
commitMetadata, HoodieD
       Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
-                  enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
-                  dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-                  dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
+              enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
+              dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
       HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpserts(writeStatus);
       HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX, 
updatesFromWriteStatuses.union(additionalUpdates));
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
+      updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
writeStatus);

Review Comment:
   Do we have the similiar retraction issue like RLI, should we union the 
retracted records too.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -540,6 +547,51 @@ private HoodieFunctionalIndexDefinition 
getFunctionalIndexDefinition(String inde
     }
   }
 
+  private Set<String> getSecondaryIndexPartitionsToInit() {
+    Set<String> secondaryIndexPartitions = 
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().values().stream()
+        .map(HoodieFunctionalIndexDefinition::getIndexName)
+        .filter(indexName -> 
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .collect(Collectors.toSet());
+    Set<String> completedMetadataPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
+    secondaryIndexPartitions.removeAll(completedMetadataPartitions);
+    return secondaryIndexPartitions;
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeSecondaryIndexPartition(String indexName) throws IOException {
+    HoodieFunctionalIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexName);
+    ValidationUtils.checkState(indexDefinition != null, "Secondary Index 
definition is not present for index " + indexName);
+    List<Pair<String, FileSlice>> partitionFileSlicePairs = 
getPartitionFileSlicePairs();
+
+    // Reuse record index parallelism config to build secondary index
+    int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+        engineContext,
+        partitionFileSlicePairs,
+        parallelism,
+        this.getClass().getSimpleName(),
+        dataMetaClient,
+        EngineType.SPARK,

Review Comment:
   don't think we should declare the engine type in abstract class.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -351,6 +351,25 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Parallelism to use, when generating partition stats 
index.");
 
+  public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.secondary.enable")
+      .defaultValue(false)
+      .sinceVersion("1.0.0")
+      .withDocumentation("Enable secondary index within the Metadata Table.");
+
+  public static final ConfigProperty<String> SECONDARY_INDEX_COLUMN = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.secondary.column")
+      .noDefaultValue()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Enable secondary index within the Metadata Table.");

Review Comment:
   Fix the doc.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));
+        }
+      });
+    });
+
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {
+      status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+        // Consider those keys which were either updated or deleted in this 
commit
+        if (!recordDelegate.getNewLocation().isPresent() || 
(recordDelegate.getCurrentLocation().isPresent() && 
recordDelegate.getNewLocation().isPresent())) {
+          keysToRemove.add(recordDelegate.getRecordKey());
+        }
+      });
+    });
+    HoodieFunctionalIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexPartition);
+
+    // Fetch the secondary keys that each of the record keys ('keysToRemove') 
maps to
+    // This is obtained by scanning the entire secondary index partition in 
the metadata table
+    // This could be an expensive operation for a large commit 
(updating/deleting millions of rows)
+    Map<String, String> recordKeySecondaryKeyMap = 
metadata.getSecondaryKeys(keysToRemove);
+    HoodieData<HoodieRecord> deletedRecords = 
getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap, 
indexDefinition);
+
+    // Reuse record index parallelism config to build secondary index
+    int parallelism = Math.min(partitionFilePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

Review Comment:
   We better introduce a new config for the secondary index, it looks like the 
secondary index is 1 exponent larger than the RLI index.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -295,6 +311,11 @@ protected HoodieMetadataPayload(String key, int type,
     this.recordIndexMetadata = recordIndexMetadata;
   }
 
+  public HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo 
secondaryIndexMetadata) {

Review Comment:
   should it be public scope or private, move it to line 301 seems better.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -540,6 +547,51 @@ private HoodieFunctionalIndexDefinition 
getFunctionalIndexDefinition(String inde
     }
   }
 
+  private Set<String> getSecondaryIndexPartitionsToInit() {
+    Set<String> secondaryIndexPartitions = 
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().values().stream()
+        .map(HoodieFunctionalIndexDefinition::getIndexName)
+        .filter(indexName -> 
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))

Review Comment:
   Looks like we have some code that can share with functional index.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -783,4 +784,101 @@ public int 
getNumFileGroupsForPartition(MetadataPartitionType partition) {
             metadataFileSystemView, partition.getPartitionPath()));
     return partitionFileSliceMap.get(partition.getPartitionPath()).size();
   }
+
+  @Override
+  protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> 
recordKeys, String partitionName) {
+    if (recordKeys.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+
+    // Lookup keys from each file slice
+    // TODO: parallelize this loop
+    Map<String, String> reverseSecondaryKeyMap = new HashMap<>();
+    for (FileSlice partition : partitionFileSlices) {
+      reverseLookupSecondaryKeys(partitionName, recordKeys, partition, 
reverseSecondaryKeyMap);
+    }
+
+    return reverseSecondaryKeyMap;
+  }
+
+  private void reverseLookupSecondaryKeys(String partitionName, List<String> 
recordKeys, FileSlice fileSlice, Map<String, String> recordKeyMap) {
+    Set<String> keySet = new HashSet<>(recordKeys.size());
+    Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();

Review Comment:
   Looks like we have many duplicate codes with RLI.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -783,4 +784,101 @@ public int 
getNumFileGroupsForPartition(MetadataPartitionType partition) {
             metadataFileSystemView, partition.getPartitionPath()));
     return partitionFileSliceMap.get(partition.getPartitionPath()).size();
   }
+
+  @Override
+  protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> 
recordKeys, String partitionName) {
+    if (recordKeys.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+
+    // Lookup keys from each file slice
+    // TODO: parallelize this loop
+    Map<String, String> reverseSecondaryKeyMap = new HashMap<>();
+    for (FileSlice partition : partitionFileSlices) {
+      reverseLookupSecondaryKeys(partitionName, recordKeys, partition, 
reverseSecondaryKeyMap);
+    }
+
+    return reverseSecondaryKeyMap;
+  }
+
+  private void reverseLookupSecondaryKeys(String partitionName, List<String> 
recordKeys, FileSlice fileSlice, Map<String, String> recordKeyMap) {
+    Set<String> keySet = new HashSet<>(recordKeys.size());
+    Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
+    try {
+      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+      if (baseFileReader == null && logRecordScanner == null) {
+        return;
+      }
+
+      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+      List<String> sortedKeys = new ArrayList<>(recordKeys);
+      Collections.sort(sortedKeys);
+      keySet.addAll(sortedKeys);
+
+      logRecordScanner.getRecords().forEach(record -> {
+        HoodieMetadataPayload payload = record.getData();
+        String recordKey = payload.getRecordKeyFromSecondaryIndex();
+        if (keySet.contains(recordKey)) {
+          logRecordsMap.put(recordKey, record);
+        }
+      });
+
+      // Map of (record-key, secondary-index-record)
+      Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = 
fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName);
+
+      // Iterate over all provided log-records, merging them into existing 
records
+      logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(
+          key1,
+          value1,
+          (oldRecord, newRecord) -> {
+            Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
+                HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, 
newRecord);
+            return mergedRecord.orElseGet(null);
+          }
+      ));
+
+      baseFileRecords.forEach((key, value) -> {
+        recordKeyMap.put(key, value.getRecordKey());
+      });
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + recordKeys.size() + " key : ", ioe);
+    } finally {
+      if (!reuse) {
+        closeReader(readers);
+      }
+    }
+  }
+
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader,
+                                                                               
             Set<String> keySet,
+                                                                               
             String partitionName) throws IOException {
+    if (reader == null) {
+      // No base file at all
+      return new HashMap<>();

Review Comment:
   `Collections.emptyMap` is always better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to