nsivabalan commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1896330232


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1160,17 +1159,19 @@ private void 
updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
 
   private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
     List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = 
getPartitionFilePairs(commitMetadata);
+    if (partitionFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
     // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
     // the secondary index partition for each of these keys.
-    List<String> keysToRemove = 
HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, 
commitMetadata, dataWriteConfig.getMetadataConfig(),
-        dataMetaClient, instantTime);
+    HoodieData<String> keysToRemove = 
getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, 
dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);

Review Comment:
   Trying to see if we should optimize only to fetch keys that are part of 
delete log blocks here? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1181,16 +1182,13 @@ private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata c
     long targetPartitionSize = 100 * 1024 * 1024;
     int parallelism = (int) Math.max(1, (totalWriteBytesForSecondaryIndex + 
targetPartitionSize - 1) / targetPartitionSize);
 
-    return readSecondaryKeysFromBaseFiles(
-        engineContext,
-        partitionFilePairs,
-        parallelism,
-        this.getClass().getSimpleName(),
-        dataMetaClient,
-        getEngineType(),
-        indexDefinition)
-        .union(deletedRecords)
-        .distinctWithKey(HoodieRecord::getKey, parallelism);
+    // Load file system view for only the affected partitions on the driver.
+    // By loading on the driver one time, we avoid loading the same metadata 
multiple times on the executors.
+    HoodieMetadataFileSystemView fsView = getMetadataView();
+    
fsView.loadPartitions(partitionFilePairs.stream().map(Pair::getKey).collect(Collectors.toList()));
+    HoodieData<HoodieRecord> insertRecords =
+        readSecondaryKeysFromBaseFiles(engineContext, partitionFilePairs, 
parallelism, this.getClass().getSimpleName(), dataMetaClient, getEngineType(), 
indexDefinition, fsView);

Review Comment:
   do we have a tracking ticket to support inserts to log files. For eg, w/ 
bucket index, we should be able to create secondary index. May be today, we 
rely on RLI. but eventually we need to support SI for any table and index type. 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2241,7 +2377,7 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngi
       } else {
         readerSchema = tableSchema;
       }
-      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
+      return createSecondaryIndexGenerator(metaClient, engineType, new 
ArrayList<>(logFilePaths), readerSchema, partition, dataFilePath, 
indexDefinition,

Review Comment:
   not sure if we need this fix if we not getting rid of entries to be deleted 
from SI processed separately. 
   
   I am talking about 
   ```
    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
       // the secondary index partition for each of these keys.
       HoodieData<String> keysToRemove = 
getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, 
dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);
   
       HoodieIndexDefinition indexDefinition = 
getIndexDefinition(indexPartition);
       // Fetch the secondary keys that each of the record keys 
('keysToRemove') maps to
       HoodiePairData<String, String> recordKeySecondaryKeyMap =
           metadata.getSecondaryKeys(keysToRemove, 
indexDefinition.getIndexName(), 
dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
       HoodieData<HoodieRecord> deleteRecords = recordKeySecondaryKeyMap.map(
           (recordKeyAndSecondaryKey) -> 
HoodieMetadataPayload.createSecondaryIndexRecord(recordKeyAndSecondaryKey.getKey(),
 recordKeyAndSecondaryKey.getValue(), indexDefinition.getIndexName(), true));
   ```
   
   In HoodieBackedTableMetadataWriter.getSecondaryIndexUpdates
   
   lets sync up 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,17 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
             if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
               return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
             } else if (FSUtils.isLogFile(fullFilePath)) {
-              // for logs, we only need to process log files containing deletes
-              if (writeStat.getNumDeletes() > 0) {
-                Set<String> deletedRecordKeys = 
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
-                    finalWriterSchemaOpt, maxBufferSize, instantTime, false, 
true);
-                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
-              }
-              // ignore log file data blocks.
-              return new ArrayList<HoodieRecord>().iterator();
+              checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file 
should be associated with a delta write stat");
+              List<String> logFilePaths = ((HoodieDeltaWriteStat) 
writeStat).getLogFiles().stream()
+                  .map(logFile -> new StoragePath(new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), 
logFile).toString())
+                  .collect(toList());

Review Comment:
   also, don't we need to group by fileSlice for all write stats?
   for eg, if there are more than 1 log file added to the same file slice, does 
the current record generation will be right? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -855,6 +859,131 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
     }
   }
 
+  /**
+   * Get the deleted keys from the merged log files. The logic is as below. 
Suppose:
+   * <li>A = Set of keys that are valid (not deleted) in the previous log 
files merged</li>
+   * <li>B = Set of keys that are valid in all log files including current log 
file merged</li>
+   * <li>C = Set of keys that are deleted in the current log file</li>
+   * <li>Then, D = Set of deleted keys = C - (B - A)</li>
+   *
+   * @param dataTableMetaClient  data table meta client
+   * @param instantTime          timestamp of the commit
+   * @param engineType           engine type (SPARK, FLINK, JAVA)
+   * @param logFilePaths         list of log file paths for which records need 
to be merged
+   * @param finalWriterSchemaOpt records schema
+   * @param fullFilePath         full path of the current log file
+   * @return set of deleted keys
+   */
+  @VisibleForTesting
+  public static Set<String> getDeletedKeysFromMergedLogs(HoodieTableMetaClient 
dataTableMetaClient,
+                                                         String instantTime,
+                                                         EngineType engineType,
+                                                         List<String> 
logFilePaths,
+                                                         Option<Schema> 
finalWriterSchemaOpt,
+                                                         StoragePath 
fullFilePath) {
+    // Separate out the current log file
+    List<String> logFilePathsWithoutCurrentLogFile = logFilePaths.stream()
+        .filter(logFilePath -> !logFilePath.equals(fullFilePath.toString()))
+        .collect(toList());
+    if (logFilePathsWithoutCurrentLogFile.isEmpty()) {
+      // Only current log file is present, so we can directly get the deleted 
record keys from it and return the RLI records.
+      Map<String, HoodieRecord> currentLogRecords =
+          getLogRecords(Collections.singletonList(fullFilePath.toString()), 
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+      return currentLogRecords.entrySet().stream()
+          .filter(entry -> isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+          .map(Map.Entry::getKey)
+          .collect(Collectors.toSet());
+    }
+    // Fetch log records for all log files
+    Map<String, HoodieRecord> allLogRecords =
+        getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt, 
instantTime, engineType);
+
+    // Fetch log records for previous log files (excluding the current log 
file)
+    Map<String, HoodieRecord> previousLogRecords =
+        getLogRecords(logFilePathsWithoutCurrentLogFile, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, engineType);
+
+    // Fetch log records for the current log file
+    Map<String, HoodieRecord> currentLogRecords =
+        getLogRecords(Collections.singletonList(fullFilePath.toString()), 
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+
+    // Calculate valid (non-deleted) keys
+    Set<String> validKeysForPreviousLogs = 
previousLogRecords.entrySet().stream()
+        .filter(entry -> !isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+
+    Set<String> validKeysIncludingCurrentLogs = 
allLogRecords.entrySet().stream()
+        .filter(entry -> !isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+
+    // Calculate deleted keys in the current log file
+    Set<String> deletedKeysInCurrentLog = currentLogRecords.entrySet().stream()

Review Comment:
   I thought we could simplify this. 
   
   1. parse previous log files to generate {recordKey -> isDeleted}
   2. parse entire file slice (including inflight) to generate {recordKey -> 
isDeleted}
   
   we are interested in deleted record keys w/ current log file. So, we could 
do something like 
   
   ```
   for every valid entry in (1)
      if deleted in (2) => add to deletedRecordKeyList. 
   ```
   
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -801,28 +801,47 @@ public int 
getNumFileGroupsForPartition(MetadataPartitionType partition) {
   }
 
   @Override
-  protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> 
recordKeys, String partitionName) {
+  protected HoodiePairData<String, String> 
getSecondaryKeysForRecordKeys(HoodieData<String> recordKeys, String 
partitionName, int batchSize) {
     if (recordKeys.isEmpty()) {
-      return Collections.emptyMap();
+      return getEngineContext().emptyHoodiePairData();
     }
 
     // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
     List<FileSlice> partitionFileSlices =
         partitionFileSliceMap.computeIfAbsent(partitionName, k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
     if (partitionFileSlices.isEmpty()) {
-      return Collections.emptyMap();
+      return getEngineContext().emptyHoodiePairData();
     }
 
-    // Parallel lookup keys from each file slice
-    Map<String, String> reverseSecondaryKeyMap = new 
HashMap<>(recordKeys.size());
-    getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Lookup 
secondary keys from metadata table partition " + partitionName);
-    List<Pair<String, String>> secondaryToRecordKeyPairList = 
getEngineContext().flatMap(partitionFileSlices,
-        (SerializableFunction<FileSlice, Stream<Pair<String, String>>>) v1 -> 
reverseLookupSecondaryKeys(partitionName, recordKeys, v1)
-            .entrySet().stream()
-            .map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList()).stream(), 
partitionFileSlices.size());
+    // Step 1: Batch record keys
+    HoodieData<List<String>> batchedRecordKeys = recordKeys.mapPartitions(iter 
-> {
+      List<List<String>> batches = new ArrayList<>();
+      List<String> currentBatch = new ArrayList<>();
+
+      while (iter.hasNext()) {

Review Comment:
   why can't we do count and then do repartition(count/batchSize) 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,17 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
             if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
               return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
             } else if (FSUtils.isLogFile(fullFilePath)) {
-              // for logs, we only need to process log files containing deletes
-              if (writeStat.getNumDeletes() > 0) {
-                Set<String> deletedRecordKeys = 
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
-                    finalWriterSchemaOpt, maxBufferSize, instantTime, false, 
true);
-                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
-              }
-              // ignore log file data blocks.
-              return new ArrayList<HoodieRecord>().iterator();
+              checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file 
should be associated with a delta write stat");
+              List<String> logFilePaths = ((HoodieDeltaWriteStat) 
writeStat).getLogFiles().stream()

Review Comment:
   we need to fix L 806 to 814. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to