bhat-vinay commented on code in PR #10625:
URL: https://github.com/apache/hudi/pull/10625#discussion_r1499861590


##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
           + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using\n"
           + "the dot notation eg: `a.b.c`");
 
+  public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = 
ConfigProperty

Review Comment:
   One of the current limitations - only supports one secondary index (per 
table). Will remove  the limitation once the functionality is working end-end. 
Thinking of using the same approach as functional index (different partition 
for different secondary index based on a config file/json)



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -360,6 +415,112 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysFromFileSlice
     }
   }
 
+  /**
+   * Lookup list of keys from a single file slice.
+   *
+   * @param partitionName Name of the partition
+   * @param secondaryKeys The list of secondary keys to lookup
+   * @param fileSlice     The file slice to read
+   * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} 
for the secondary-keys which were found in the file slice
+   */
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
lookupSecondaryKeysFromFileSlice(String partitionName, List<String> 
secondaryKeys, FileSlice fileSlice) {
+    Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice, Option.of(true));
+    try {
+      List<Long> timings = new ArrayList<>(1);
+      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+      if (baseFileReader == null && logRecordScanner == null) {
+        return Collections.emptyMap();
+      }
+
+      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+      Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+      List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+      Collections.sort(sortedSecondaryKeys);
+      secondaryKeySet.addAll(sortedSecondaryKeys);
+
+      // TODO: Look at using scanByFullKeys() which pushes down the 
'filtering' keys and
+      //  only buffers matching records
+      logRecordScanner.scan();
+
+      Map<String, HashMap<String, HoodieRecord>> nonUniqueRecordsMap = 
logRecordScanner.getNonUniqueRecordsMap();
+      nonUniqueRecordsMap.forEach((secondaryKey, recordsMap) -> {
+        if (secondaryKeySet.contains(secondaryKey) && 
!logRecordsMap.containsKey(secondaryKey)) {
+          logRecordsMap.put(secondaryKey, recordsMap);
+        }
+      });
+
+      return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, 
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + secondaryKeys.size() + " key : ", ioe);
+    } finally {
+      if (!reuse) {
+        closeReader(readers);
+      }
+    }
+  }
+
+  private void reverseLookupSecondaryKeys(String partitionName, List<String> 
recordKeys, FileSlice fileSlice, Map<String, String> recordKeyMap) {

Review Comment:
   Please ignore for now. This may not be needed if `WriteStatus` is changed to 
carry `(record-key, Optional<new-secondary-key>,Optional<old-secondary-key>)` 
as discussed earlier



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -548,6 +787,31 @@ private Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>> fetchBaseFileAllR
         .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
   }
 
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader,

Review Comment:
   Ignore for now. This is again related to the reverse-scanning to aid the 
writer (to emit tombstone records) - will be change based on the design choice.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -33,6 +33,7 @@
 import org.apache.hadoop.fs.FileSystem;
 

Review Comment:
   Please ignore the changes in this file. Will revert in the next upload. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -240,6 +255,11 @@ public static HoodieMergedLogRecordScanner.Builder 
newBuilder() {
 
   @Override
   public <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
+    if (logContainsNonUniqueKeys) {

Review Comment:
   If the log files are for partitions that can have non-unique keys, then this 
logic makes use of the new map to buffer the scanned records.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -305,6 +305,52 @@ public Map<String, List<HoodieRecordGlobalLocation>> 
readRecordIndex(List<String
     return recordKeyToLocation;
   }
 
+  /**
+   * Get record-location using secondary-index and record-index
+   * <p>
+   * If the Metadata Table is not enabled, an exception is thrown to 
distinguish this from the absence of the key.
+   *
+   * @param secondaryKeys The list of secondary keys to read
+   */
+  @Override
+  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys) {
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
+        "Record index is not initialized in MDT");
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.SECONDARY_INDEX),
+        "Secondary index is not initialized in MDT");
+
+    HoodieTimer timer = HoodieTimer.start();
+
+    // Fetch secondary-index records
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> secondaryKeyRecords 
= getSecondaryIndexRecords(secondaryKeys, 
MetadataPartitionType.SECONDARY_INDEX.getPartitionPath());
+
+    // Now collect the record-keys and fetch the RLI records
+    List<String> recordKeys = new ArrayList<>();
+    secondaryKeyRecords.forEach((key, records) -> {
+      records.forEach(record -> {
+        if (!record.getData().isDeleted()) {
+          recordKeys.add(record.getData().getRecordKeyFromSecondaryIndex());
+        }
+      });
+    });
+
+    return readRecordIndex(recordKeys);
+  }
+
+  // Returns a map of (record-key -> secondary-key) for the provided 
record-keys

Review Comment:
   This is the reverse scan entry point --- used by the writer to extract 
(record-key, old-secondary-key) mapping for updated/deleted record keys. This 
is subsequently used by the writer to emit tombstone records into the 
respective file groups. Please ignore this for now - based on what is decided, 
we might choose to enhance `WriteStatus` to carry this information and aid the 
writer to emit tombstone markers.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T> 
newRecord) throws IOException
     }
   }
 
+  private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) 
throws IOException {
+    String key = newRecord.getRecordKey();
+    HoodieMetadataPayload newPayload = (HoodieMetadataPayload) 
newRecord.getData();
+
+    // The rules for merging the prevRecord and the latestRecord is noted 
below. Note that this only applies for SecondaryIndex
+    // records in the metadata table (which is the only user of this API as of 
this implementation)
+    // 1. Iff latestRecord is deleted (i.e it is a tombstone) AND prevRecord 
is null (i.e not buffered), then retain the latestRecord
+    //    The rationale here is that there could be a 'prev record' in the 
base-file that needs to be merged at a later stage
+    // 2. Iff latestRecord is deleted AND prevRecord is non-null, then remove 
prevRecord from the buffer AND discard the latestRecord
+    // 3. Iff latestRecord is not deleted AND prevRecord is non-null, then 
remove the prevRecord from the buffer AND retain the latestRecord
+    //    The rationale is that the most recent record is always retained 
(based on arrival time). TODO: verify this logic
+    // 4. Iff latestRecord is not deleted AND prevRecord is null, then retain 
the latestRecord (same rationale as #1)
+
+    HashMap<String, HoodieRecord> prevRecords = nonUniqueKeyRecords.get(key);
+    if (prevRecords == null) {
+      // Case #1 and #4
+      HashMap<String, HoodieRecord> recordsMap = new HashMap<>();
+      recordsMap.put(newPayload.getRecordKeyFromSecondaryIndex(), 
newRecord.copy());
+      nonUniqueKeyRecords.put(key, recordsMap);
+      return;
+    }
+
+    String newRecordKey = newPayload.getRecordKeyFromSecondaryIndex();
+    HoodieRecord prevRecord = prevRecords.get(newRecordKey);
+    if (prevRecord == null) {
+      // Case #1 and #4
+      prevRecords.put(newRecordKey, newRecord.copy());
+      nonUniqueKeyRecords.put(key, prevRecords);
+      return;
+    }
+
+    HoodieMetadataPayload prevPayload = (HoodieMetadataPayload) 
prevRecord.getData();
+    assert 
prevPayload.getRecordKeyFromSecondaryIndex().equals(newPayload.getRecordKeyFromSecondaryIndex());
+
+    // TODO: Merger need not be called here as the merging logic is handled 
explicitly in this function.
+    // Retain until Secondary Index feature is tested and stabilized
+    HoodieRecord<T> combinedRecord = (HoodieRecord<T>) 
recordMerger.merge(prevRecord, readerSchema,

Review Comment:
   Will remove this later. The merging logic does not really rely on the merger 
(because of the comment made earlier).



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -466,12 +627,90 @@ private Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>> readAllLogRecords
     }
 
     try {
-      return logRecordReader.getAllRecordsByKeys(sortedKeys);
+      Map<String, List<HoodieRecord<HoodieMetadataPayload>>> records = 
logRecordReader.getAllRecordsByKeys(sortedKeys);
+      return records;
     } finally {
       timings.add(timer.endTimer());
     }
   }
 
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
readNonUniqueRecordsAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,

Review Comment:
   This method is to merge the buffer obtained by reading all log-records with 
the buffer obtained by reading base-file.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T> 
newRecord) throws IOException
     }
   }
 
+  private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) 
throws IOException {
+    String key = newRecord.getRecordKey();
+    HoodieMetadataPayload newPayload = (HoodieMetadataPayload) 
newRecord.getData();
+
+    // The rules for merging the prevRecord and the latestRecord is noted 
below. Note that this only applies for SecondaryIndex

Review Comment:
   The crux of the merging logic is here. The main issue with using  the 
existing `preCombine(...)` method is that it returns 'either-or' i.e chooses 
only one record. Changing the API was a little tedious- - hence this approach 
of moving the merge logic directly in the scanner. @vinothchandar @codope. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -305,6 +305,52 @@ public Map<String, List<HoodieRecordGlobalLocation>> 
readRecordIndex(List<String
     return recordKeyToLocation;
   }
 
+  /**
+   * Get record-location using secondary-index and record-index
+   * <p>
+   * If the Metadata Table is not enabled, an exception is thrown to 
distinguish this from the absence of the key.
+   *
+   * @param secondaryKeys The list of secondary keys to read
+   */
+  @Override
+  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys) {

Review Comment:
   This is the main entry point for the query planner (HoodieFileIndex) to 
choose the secondary-index and prune candidate files



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1902,6 +1910,141 @@ public HoodieRecord next() {
     });
   }
 
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFiles(HoodieEngineContext engineContext,
+                                                                    
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+                                                                    int 
recordIndexMaxParallelism,
+                                                                    String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType) {
+    if (partitionFiles.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
+    final int parallelism = Math.min(partitionFiles.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    final SerializableConfiguration configuration = new 
SerializableConfiguration(metaClient.getHadoopConf());
+
+    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final Pair<String, List<String>> baseAndLogFiles = 
partitionAndBaseFile.getValue();
+      List<String> logFilePaths = new ArrayList<>();
+      baseAndLogFiles.getValue().forEach(logFile -> {
+        logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + 
StoragePath.SEPARATOR + logFile);
+      });
+      String filePath = baseAndLogFiles.getKey();
+
+      Option<Path> dataFilePath = Option.empty();
+      Schema tableSchema;
+
+      if (!filePath.isEmpty()) {
+        dataFilePath = Option.of(filePath(basePath, "", filePath));
+        tableSchema = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getHadoopConf(),
 dataFilePath.get());
+      } else {
+        TableSchemaResolver schemaResolver = new 
TableSchemaResolver(metaClient);
+        tableSchema = schemaResolver.getTableAvroSchema();
+      }
+
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, tableSchema, partition, dataFilePath);
+    });
+  }
+
+  private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
+                                                                              
EngineType engineType, List<String> logFilePaths,
+                                                                              
Schema tableSchema, String partition,
+                                                                              
Option<Path> dataFilePath) throws Exception {
+    final String basePath = metaClient.getBasePathV2().toString();
+    final SerializableConfiguration configuration = new 
SerializableConfiguration(metaClient.getHadoopConf());
+
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
+        basePath,
+        engineType,
+        Collections.emptyList(),
+        metaClient.getTableConfig().getRecordMergerStrategy());
+
+    HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(metaClient.getFs())
+        .withBasePath(basePath)
+        .withLogFilePaths(logFilePaths)
+        .withReaderSchema(tableSchema)

Review Comment:
   Will change to filter only the required columns. Could not find a easy 
way/API to get that yet. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -402,6 +422,31 @@ public HoodieMetadataPayload 
preCombine(HoodieMetadataPayload previousRecord) {
         // 2. A key moved to a different file due to clustering
 
         // No need to merge with previous record index, always pick the latest 
payload.
+        return this;
+      case METADATA_TYPE_SECONDARY_INDEX:
+        // TODO: This block and checks are just for validation and to detecte 
all callers.

Review Comment:
   This is here mainly for asserting and will be removed later. Ideally all 
merging logic of secondary (or non-unique-key) index records need to be handled 
at upper layers directly (when the scanner is running the scan)



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -360,6 +415,112 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysFromFileSlice
     }
   }
 
+  /**
+   * Lookup list of keys from a single file slice.
+   *
+   * @param partitionName Name of the partition
+   * @param secondaryKeys The list of secondary keys to lookup
+   * @param fileSlice     The file slice to read
+   * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} 
for the secondary-keys which were found in the file slice
+   */
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
lookupSecondaryKeysFromFileSlice(String partitionName, List<String> 
secondaryKeys, FileSlice fileSlice) {
+    Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice, Option.of(true));
+    try {
+      List<Long> timings = new ArrayList<>(1);
+      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+      if (baseFileReader == null && logRecordScanner == null) {
+        return Collections.emptyMap();
+      }
+
+      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+      Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+      List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+      Collections.sort(sortedSecondaryKeys);
+      secondaryKeySet.addAll(sortedSecondaryKeys);
+
+      // TODO: Look at using scanByFullKeys() which pushes down the 
'filtering' keys and
+      //  only buffers matching records
+      logRecordScanner.scan();

Review Comment:
   Tests failed a couple of times (missed records) when using the API that 
supports filter push-down. Will debug further and update later.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala:
##########
@@ -0,0 +1,222 @@
+/*

Review Comment:
   Please ignore the repetive/duplicated code blocks. Will change once the 
functionality is ready and subsequently add more tests.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, hadoopConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));
+        }
+      });
+    });
+
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {

Review Comment:
   This needs to be fixed. If the design is changed so that `WriteStatus` 
includes the (record-key, old-secondary-key, new-secondary-key) then this needs 
to change anyways.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, hadoopConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));
+        }
+      });
+    });
+
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {
+      status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+        // Consider those keys which were either updated or deleted in this 
commit
+        if (!recordDelegate.getNewLocation().isPresent() || 
(recordDelegate.getCurrentLocation().isPresent() && 
recordDelegate.getNewLocation().isPresent())) {
+          keysToRemove.add(recordDelegate.getRecordKey());
+        }
+      });
+    });
+
+    // Fetch the secondary keys that each of the record keys ('keysToRemove') 
maps to
+    // This is obtained by scanning the entire secondary index partition in 
the metadata table
+    // This could be an expensive operation for a large commit 
(updating/deleting millions of rows)
+    Map<String, String> recordKeySecondaryKeyMap = 
metadata.getSecondaryKeys(keysToRemove);
+    HoodieData<HoodieRecord> deletedRecords = 
getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap);
+
+    // Reuse record index parallelism config to build secondary index
+    int parallelism = Math.min(partitionFilePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+
+    return deletedRecords.union(readSecondaryKeysFromFiles(

Review Comment:
   This is convoluted. `deletedRecords` are the tombstone records. For 
correctness, this tombstone records should be emitted before the regular 
records. Could not find anywhere in the doc if this ordering is preserved? But 
noticed in the tests that it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to