nsivabalan commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1901434133


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -855,6 +884,129 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
     }
   }
 
+  /**
+   * Get the deleted keys from the merged log files. The logic is as below. 
Suppose:

Review Comment:
   lets update java docs. guess we are also returning revived keys as well 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -855,6 +884,129 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
     }
   }
 
+  /**
+   * Get the deleted keys from the merged log files. The logic is as below. 
Suppose:
+   * <li>A = Set of keys that are valid (not deleted) in the previous log 
files merged</li>
+   * <li>B = Set of keys that are valid in all log files including current log 
file merged</li>
+   * <li>C = Set of keys that are deleted in the current log file</li>
+   * <li>Then, D = Set of deleted keys = C - (B - A)</li>
+   *
+   * @param dataTableMetaClient  data table meta client
+   * @param instantTime          timestamp of the commit
+   * @param engineType           engine type (SPARK, FLINK, JAVA)
+   * @param logFilePaths         list of log file paths including current and 
previous file slices
+   * @param finalWriterSchemaOpt records schema
+   * @param currentLogFilePaths  list of log file paths for the current instant
+   * @return pair of revived and deleted keys
+   */
+  @VisibleForTesting
+  public static Pair<Set<String>, Set<String>> 
getRevivedAndDeletedKeysFromMergedLogs(HoodieTableMetaClient 
dataTableMetaClient,
+                                                                               
       String instantTime,
+                                                                               
       EngineType engineType,
+                                                                               
       List<String> logFilePaths,
+                                                                               
       Option<Schema> finalWriterSchemaOpt,
+                                                                               
       List<String> currentLogFilePaths) {
+    // Separate out the current log files
+    List<String> logFilePathsWithoutCurrentLogFiles = logFilePaths.stream()
+        .filter(logFilePath -> !currentLogFilePaths.contains(logFilePath))
+        .collect(toList());
+    if (logFilePathsWithoutCurrentLogFiles.isEmpty()) {
+      // Only current log file is present, so we can directly get the deleted 
record keys from it and return the RLI records.
+      Map<String, HoodieRecord> currentLogRecords =
+          getLogRecords(currentLogFilePaths, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, engineType);
+      Set<String> deletedKeys = currentLogRecords.entrySet().stream()
+          .filter(entry -> isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+          .map(Map.Entry::getKey)
+          .collect(Collectors.toSet());
+      return Pair.of(Collections.emptySet(), deletedKeys);
+    }
+    // Fetch log records for all log files

Review Comment:
   can we move the else block to a private method. Lets try to avoid very large 
methods. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2228,9 +2379,17 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngi
     engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
     return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionWithBaseAndLogFiles -> {
       final String partition = partitionWithBaseAndLogFiles.getKey();
+      // get the log files for the partition and group them by fileId
+      Map<String, List<HoodieLogFile>> logFilesByFileId = 
getPartitionLatestFileSlicesIncludingInflight(metaClient, Option.of(fsView), 
partition).stream()
+          .map(fs -> Pair.of(fs.getFileId(), 
fs.getLogFiles().collect(toList()))).collect(Collectors.toMap(Pair::getKey, 
Pair::getValue));
       final Pair<String, List<String>> baseAndLogFiles = 
partitionWithBaseAndLogFiles.getValue();
-      List<String> logFilePaths = new ArrayList<>();
-      baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
+      Set<String> logFilePaths = new HashSet<>();
+      baseAndLogFiles.getValue().forEach(logFile -> {
+        // add all log files for the fileId
+        logFilesByFileId.get(FSUtils.getFileIdFromFileName(logFile)).stream()
+            
.map(HoodieLogFile::getPath).map(StoragePath::toString).forEach(logFilePaths::add);
+        logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + 
StoragePath.SEPARATOR + logFile);

Review Comment:
   are we not adding log file twice? once at L 2389 and again at L 2391? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1181,16 +1182,13 @@ private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata c
     long targetPartitionSize = 100 * 1024 * 1024;
     int parallelism = (int) Math.max(1, (totalWriteBytesForSecondaryIndex + 
targetPartitionSize - 1) / targetPartitionSize);
 
-    return readSecondaryKeysFromBaseFiles(
-        engineContext,
-        partitionFilePairs,
-        parallelism,
-        this.getClass().getSimpleName(),
-        dataMetaClient,
-        getEngineType(),
-        indexDefinition)
-        .union(deletedRecords)
-        .distinctWithKey(HoodieRecord::getKey, parallelism);
+    // Load file system view for only the affected partitions on the driver.
+    // By loading on the driver one time, we avoid loading the same metadata 
multiple times on the executors.
+    HoodieMetadataFileSystemView fsView = getMetadataView();
+    
fsView.loadPartitions(partitionFilePairs.stream().map(Pair::getKey).collect(Collectors.toList()));
+    HoodieData<HoodieRecord> insertRecords =
+        readSecondaryKeysFromBaseFiles(engineContext, partitionFilePairs, 
parallelism, this.getClass().getSimpleName(), dataMetaClient, getEngineType(), 
indexDefinition, fsView);
+    return 
insertRecords.union(deleteRecords).distinctWithKey(HoodieRecord::getKey, 
parallelism);

Review Comment:
   distinctWithKey has some issues right. Can you take over the patch/fix from 
Lin and get it landed and rebase this on top of that patch 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -774,65 +775,93 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
 
   @VisibleForTesting
   public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
-                                                                      
HoodieCommitMetadata commitMetadata,
-                                                                      
HoodieMetadataConfig metadataConfig,
-                                                                      
HoodieTableMetaClient dataTableMetaClient,
-                                                                      int 
writesFileIdEncoding,
-                                                                      String 
instantTime) {
-
+                                                                             
HoodieCommitMetadata commitMetadata,
+                                                                             
HoodieMetadataConfig metadataConfig,
+                                                                             
HoodieTableMetaClient dataTableMetaClient,
+                                                                             
int writesFileIdEncoding,
+                                                                             
String instantTime,
+                                                                             
EngineType engineType) {
     List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream).collect(Collectors.toList());
-
+    // Return early if there are no write stats, or if the operation is a 
compaction.
     if (allWriteStats.isEmpty() || commitMetadata.getOperationType() == 
WriteOperationType.COMPACT) {
       return engineContext.emptyHoodieData();
     }
+    // RLI cannot support logs having inserts with current offering. So, lets 
validate that.
+    if (allWriteStats.stream().anyMatch(writeStat -> {
+      String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+      return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+    })) {
+      throw new HoodieIOException("RLI cannot support logs having inserts with 
current offering. Would recommend disabling Record Level Index");
+    }
 
     try {
-      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      Map<String, List<HoodieWriteStat>> writeStatsByFileId = 
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+      int parallelism = Math.max(Math.min(writeStatsByFileId.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
       String basePath = dataTableMetaClient.getBasePath().toString();
       HoodieFileFormat baseFileFormat = 
dataTableMetaClient.getTableConfig().getBaseFileFormat();
-      // RLI cannot support logs having inserts with current offering. So, 
lets validate that.
-      if (allWriteStats.stream().anyMatch(writeStat -> {
-        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
-        return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
-      })) {
-        throw new HoodieIOException("RLI cannot support logs having inserts 
with current offering. Would recommend disabling Record Level Index");
-      }
-
-      // we might need to set some additional variables if we need to process 
log files.
-      // for RLI and MOR table, we only care about log files if they contain 
any deletes. If not, all entries in logs are considered as updates, for which
-      // we do not need to generate new RLI record.
-      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
-        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
-        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
-      });
-
-      Option<Schema> writerSchemaOpt = Option.empty();
-      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
-        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
-      }
-      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
       StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> writerSchemaOpt = 
tryResolveSchemaForTable(dataTableMetaClient);
       Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
-      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
-          .flatMap(writeStat -> {
-            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
-            StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
-            // handle base files
-            if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
-              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
-            } else if (FSUtils.isLogFile(fullFilePath)) {
-              // for logs, we only need to process log files containing deletes
-              if (writeStat.getNumDeletes() > 0) {
-                Set<String> deletedRecordKeys = 
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
-                    finalWriterSchemaOpt, maxBufferSize, instantTime, false, 
true);
-                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
-              }
-              // ignore log file data blocks.
-              return new ArrayList<HoodieRecord>().iterator();
-            } else {
-              throw new HoodieIOException("Unsupported file type " + 
fullFilePath.toString() + " while generating MDT records");
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism)
+          .flatMap(writeStatsByFileIdEntry -> {
+            String fileId = writeStatsByFileIdEntry.getKey();
+            List<HoodieWriteStat> writeStats = 
writeStatsByFileIdEntry.getValue();
+            // Partition the write stats into base file and log file write 
stats
+            List<HoodieWriteStat> baseFileWriteStats = writeStats.stream()
+                .filter(writeStat -> 
writeStat.getPath().endsWith(baseFileFormat.getFileExtension()))
+                .collect(Collectors.toList());
+            List<HoodieWriteStat> logFileWriteStats = writeStats.stream()
+                .filter(writeStat -> FSUtils.isLogFile(writeStat.getPath()))
+                .collect(Collectors.toList());
+            // Ensure that only one of base file or log file write stats exists
+            checkState(baseFileWriteStats.isEmpty() || 
logFileWriteStats.isEmpty(),
+                "A single fileId cannot have both base file and log file write 
stats in the same commit. FileId: " + fileId);
+            // Process base file write stats
+            if (!baseFileWriteStats.isEmpty()) {
+              return baseFileWriteStats.stream()
+                  .flatMap(writeStat -> {
+                    HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+                    return 
CollectionUtils.toStream(BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage));
+                  })
+                  .iterator();
+            }
+            // Process log file write stats
+            if (!logFileWriteStats.isEmpty()) {
+              String partitionPath = 
logFileWriteStats.get(0).getPartitionPath();
+              List<String> currentLogFilePaths = logFileWriteStats.stream()
+                  .map(writeStat -> new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()).toString())
+                  .collect(Collectors.toList());
+              List<String> allLogFilePaths = logFileWriteStats.stream()
+                  .flatMap(writeStat -> {
+                    checkState(writeStat instanceof HoodieDeltaWriteStat, "Log 
file should be associated with a delta write stat");
+                    List<String> currentLogFiles = ((HoodieDeltaWriteStat) 
writeStat).getLogFiles().stream()
+                        .map(logFile -> new StoragePath(new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), 
logFile).toString())
+                        .collect(Collectors.toList());
+                    return currentLogFiles.stream();
+                  })
+                  .collect(Collectors.toList());
+              // Extract revived and deleted keys
+              Pair<Set<String>, Set<String>> revivedAndDeletedKeys =
+                  getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient, 
instantTime, engineType, allLogFilePaths, finalWriterSchemaOpt, 
currentLogFilePaths);
+              Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
+              Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
+              // Process revived keys to create updates
+              List<HoodieRecord> revivedRecords = revivedKeys.stream()
+                  .map(recordKey -> 
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId, 
instantTime, writesFileIdEncoding))
+                  .collect(Collectors.toList());
+              // Process deleted keys to create deletes
+              List<HoodieRecord> deletedRecords = deletedKeys.stream()
+                  .map(HoodieMetadataPayload::createRecordIndexDelete)
+                  .collect(Collectors.toList());
+              // Combine all records into one list
+              List<HoodieRecord> allRecords = new ArrayList<>();
+              allRecords.addAll(revivedRecords);
+              allRecords.addAll(deletedRecords);
+              return allRecords.iterator();
             }
+            // No base file or log file write stats found
+            return Collections.emptyIterator();

Review Comment:
   can we atleast do a warn log in this case



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1160,17 +1159,19 @@ private void 
updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
 
   private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
     List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = 
getPartitionFilePairs(commitMetadata);
+    if (partitionFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
     // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
     // the secondary index partition for each of these keys.
-    List<String> keysToRemove = 
HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, 
commitMetadata, dataWriteConfig.getMetadataConfig(),
-        dataMetaClient, instantTime);
+    HoodieData<String> keysToRemove = 
getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, 
dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);

Review Comment:
   I thought based on our discussion, we are going to fix this as well in the 
same patch? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to