nsivabalan commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1904675170


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -774,65 +776,93 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
 
   @VisibleForTesting
   public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
-                                                                      
HoodieCommitMetadata commitMetadata,
-                                                                      
HoodieMetadataConfig metadataConfig,
-                                                                      
HoodieTableMetaClient dataTableMetaClient,
-                                                                      int 
writesFileIdEncoding,
-                                                                      String 
instantTime) {
-
+                                                                             
HoodieCommitMetadata commitMetadata,
+                                                                             
HoodieMetadataConfig metadataConfig,
+                                                                             
HoodieTableMetaClient dataTableMetaClient,
+                                                                             
int writesFileIdEncoding,
+                                                                             
String instantTime,
+                                                                             
EngineType engineType) {
     List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream).collect(Collectors.toList());
-
+    // Return early if there are no write stats, or if the operation is a 
compaction.
     if (allWriteStats.isEmpty() || commitMetadata.getOperationType() == 
WriteOperationType.COMPACT) {
       return engineContext.emptyHoodieData();
     }
+    // RLI cannot support logs having inserts with current offering. So, lets 
validate that.
+    if (allWriteStats.stream().anyMatch(writeStat -> {
+      String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+      return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+    })) {
+      throw new HoodieIOException("RLI cannot support logs having inserts with 
current offering. Would recommend disabling Record Level Index");
+    }
 
     try {
-      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      Map<String, List<HoodieWriteStat>> writeStatsByFileId = 
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+      int parallelism = Math.max(Math.min(writeStatsByFileId.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
       String basePath = dataTableMetaClient.getBasePath().toString();
       HoodieFileFormat baseFileFormat = 
dataTableMetaClient.getTableConfig().getBaseFileFormat();
-      // RLI cannot support logs having inserts with current offering. So, 
lets validate that.
-      if (allWriteStats.stream().anyMatch(writeStat -> {
-        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
-        return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
-      })) {
-        throw new HoodieIOException("RLI cannot support logs having inserts 
with current offering. Would recommend disabling Record Level Index");
-      }
-
-      // we might need to set some additional variables if we need to process 
log files.
-      // for RLI and MOR table, we only care about log files if they contain 
any deletes. If not, all entries in logs are considered as updates, for which
-      // we do not need to generate new RLI record.
-      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
-        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
-        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
-      });
-
-      Option<Schema> writerSchemaOpt = Option.empty();
-      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
-        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
-      }
-      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
       StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> writerSchemaOpt = 
tryResolveSchemaForTable(dataTableMetaClient);
       Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
-      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
-          .flatMap(writeStat -> {
-            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
-            StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
-            // handle base files
-            if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
-              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
-            } else if (FSUtils.isLogFile(fullFilePath)) {
-              // for logs, we only need to process log files containing deletes
-              if (writeStat.getNumDeletes() > 0) {
-                Set<String> deletedRecordKeys = 
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
-                    finalWriterSchemaOpt, maxBufferSize, instantTime, false, 
true);
-                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
-              }
-              // ignore log file data blocks.
-              return new ArrayList<HoodieRecord>().iterator();
-            } else {
-              throw new HoodieIOException("Unsupported file type " + 
fullFilePath.toString() + " while generating MDT records");
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism)
+          .flatMap(writeStatsByFileIdEntry -> {
+            String fileId = writeStatsByFileIdEntry.getKey();
+            List<HoodieWriteStat> writeStats = 
writeStatsByFileIdEntry.getValue();
+            // Partition the write stats into base file and log file write 
stats
+            List<HoodieWriteStat> baseFileWriteStats = writeStats.stream()
+                .filter(writeStat -> 
writeStat.getPath().endsWith(baseFileFormat.getFileExtension()))
+                .collect(Collectors.toList());
+            List<HoodieWriteStat> logFileWriteStats = writeStats.stream()
+                .filter(writeStat -> FSUtils.isLogFile(new 
StoragePath(writeStats.get(0).getPath())))
+                .collect(Collectors.toList());
+            // Ensure that only one of base file or log file write stats exists
+            checkState(baseFileWriteStats.isEmpty() || 
logFileWriteStats.isEmpty(),
+                "A single fileId cannot have both base file and log file write 
stats in the same commit. FileId: " + fileId);
+            // Process base file write stats
+            if (!baseFileWriteStats.isEmpty()) {
+              return baseFileWriteStats.stream()
+                  .flatMap(writeStat -> {
+                    HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+                    return 
CollectionUtils.toStream(BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage));
+                  })
+                  .iterator();
             }
+            // Process log file write stats
+            if (!logFileWriteStats.isEmpty()) {
+              String partitionPath = 
logFileWriteStats.get(0).getPartitionPath();
+              List<String> currentLogFilePaths = logFileWriteStats.stream()
+                  .map(writeStat -> new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()).toString())
+                  .collect(Collectors.toList());
+              List<String> allLogFilePaths = logFileWriteStats.stream()
+                  .flatMap(writeStat -> {
+                    checkState(writeStat instanceof HoodieDeltaWriteStat, "Log 
file should be associated with a delta write stat");
+                    List<String> currentLogFiles = ((HoodieDeltaWriteStat) 
writeStat).getLogFiles().stream()
+                        .map(logFile -> new StoragePath(new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), 
logFile).toString())
+                        .collect(Collectors.toList());
+                    return currentLogFiles.stream();
+                  })
+                  .collect(Collectors.toList());
+              // Extract revived and deleted keys
+              Pair<Set<String>, Set<String>> revivedAndDeletedKeys =
+                  getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient, 
instantTime, engineType, allLogFilePaths, finalWriterSchemaOpt, 
currentLogFilePaths);

Review Comment:
   the method can return List<HoodieRecord> only directly. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2206,44 +2306,161 @@ public static boolean 
validateDataTypeForSecondaryIndex(List<String> sourceField
     });
   }
 
-  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
-                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
-                                                                        int 
secondaryIndexMaxParallelism,
-                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
-                                                                        
HoodieIndexDefinition indexDefinition) {
-    if (partitionFiles.isEmpty()) {
-      return engineContext.emptyHoodieData();
+  /**
+   * Converts the write stats to secondary index records.
+   *
+   * @param allWriteStats   list of write stats
+   * @param instantTime     instant time
+   * @param indexDefinition secondary index definition
+   * @param metadataConfig  metadata config
+   * @param fsView          file system view as of instant time
+   * @param dataMetaClient  data table meta client
+   * @param engineContext   engine context
+   * @param engineType      engine type (e.g. SPARK, FLINK or JAVA)
+   * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the 
metadata table for the given secondary index partition
+   */
+  public static HoodieData<HoodieRecord> 
convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats,
+                                                                               
   String instantTime,
+                                                                               
   HoodieIndexDefinition indexDefinition,
+                                                                               
   HoodieMetadataConfig metadataConfig,
+                                                                               
   HoodieMetadataFileSystemView fsView,
+                                                                               
   HoodieTableMetaClient dataMetaClient,
+                                                                               
   HoodieEngineContext engineContext,
+                                                                               
   EngineType engineType) {
+    // Secondary index cannot support logs having inserts with current 
offering. So, lets validate that.
+    if (allWriteStats.stream().anyMatch(writeStat -> {
+      String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+      return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+    })) {
+      throw new HoodieIOException("Secondary index cannot support logs having 
inserts with current offering. Please disable secondary index.");
     }
-    final int parallelism = Math.min(partitionFiles.size(), 
secondaryIndexMaxParallelism);
-    final StoragePath basePath = metaClient.getBasePath();
+
     Schema tableSchema;
     try {
-      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+      tableSchema = new 
TableSchemaResolver(dataMetaClient).getTableAvroSchema();
     } catch (Exception e) {
-      throw new HoodieException("Failed to get latest schema for " + 
metaClient.getBasePath(), e);
-    }
-
-    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
-    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionWithBaseAndLogFiles -> {
-      final String partition = partitionWithBaseAndLogFiles.getKey();
-      final Pair<String, List<String>> baseAndLogFiles = 
partitionWithBaseAndLogFiles.getValue();
-      List<String> logFilePaths = new ArrayList<>();
-      baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
-      String baseFilePath = baseAndLogFiles.getKey();
-      Option<StoragePath> dataFilePath = baseFilePath.isEmpty() ? 
Option.empty() : Option.of(FSUtils.constructAbsolutePath(basePath, 
baseFilePath));
-      Schema readerSchema;
-      if (dataFilePath.isPresent()) {
-        readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
-            
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
-            .readAvroSchema(metaClient.getStorage(), dataFilePath.get());
+      throw new HoodieException("Failed to get latest schema for " + 
dataMetaClient.getBasePath(), e);
+    }
+    Map<String, List<HoodieWriteStat>> writeStatsByFileId = 
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+    int parallelism = Math.max(Math.min(writeStatsByFileId.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+
+    return engineContext.parallelize(new 
ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism).flatMap(writeStatsByFileIdEntry -> {
+      String fileId = writeStatsByFileIdEntry.getKey();
+      List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
+      String partition = writeStats.get(0).getPartitionPath();
+      FileSlice previousFileSliceForFileId = 
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+      Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
+      if (previousFileSliceForFileId == null) {
+        // new file slice, so empty mapping for previous slice
+        recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
       } else {
-        readerSchema = tableSchema;
+        StoragePath previousBaseFile = 
previousFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
+        List<String> logFiles =
+            
previousFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
+        recordKeyToSecondaryKeyForPreviousFileSlice =
+            getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFiles, 
tableSchema, partition, Option.ofNullable(previousBaseFile), indexDefinition, 
instantTime);
       }
-      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
-          
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(""));
+      List<FileSlice> latestIncludingInflightFileSlices = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), 
partition);
+      FileSlice currentFileSliceForFileId = 
latestIncludingInflightFileSlices.stream().filter(fs -> 
fs.getFileId().equals(fileId)).findFirst()
+          .orElseThrow(() -> new HoodieException("Could not find any file 
slice for fileId " + fileId));
+      StoragePath currentBaseFile = 
currentFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
+      List<String> logFilesIncludingInflight =
+          
currentFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
+      Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice =
+          getRecordKeyToSecondaryKey(dataMetaClient, engineType, 
logFilesIncludingInflight, tableSchema, partition, 
Option.ofNullable(currentBaseFile), indexDefinition, instantTime);
+      // Need to find what secondary index record should be deleted, and what 
should be inserted.
+      // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it 
is not present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should 
be inserted.
+      // For each entry in recordKeyToSecondaryKeyForPreviousFileSlice, if it 
is not present in recordKeyToSecondaryKeyForCurrentFileSlice, then it should be 
deleted.
+      // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it 
is present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should be 
updated
+      List<HoodieRecord> records = new ArrayList<>();
+      recordKeyToSecondaryKeyForCurrentFileSlice.forEach((recordKey, 
secondaryKey) -> {

Review Comment:
   I see we are doing 3 for loops here. Can we reduce it to 2. 
   among the bullets you have listed, 1 and 3 can be combined to 1. 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2206,44 +2306,161 @@ public static boolean 
validateDataTypeForSecondaryIndex(List<String> sourceField
     });
   }
 
-  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
-                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
-                                                                        int 
secondaryIndexMaxParallelism,
-                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
-                                                                        
HoodieIndexDefinition indexDefinition) {
-    if (partitionFiles.isEmpty()) {
-      return engineContext.emptyHoodieData();
+  /**
+   * Converts the write stats to secondary index records.
+   *
+   * @param allWriteStats   list of write stats
+   * @param instantTime     instant time
+   * @param indexDefinition secondary index definition
+   * @param metadataConfig  metadata config
+   * @param fsView          file system view as of instant time
+   * @param dataMetaClient  data table meta client
+   * @param engineContext   engine context
+   * @param engineType      engine type (e.g. SPARK, FLINK or JAVA)
+   * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the 
metadata table for the given secondary index partition
+   */
+  public static HoodieData<HoodieRecord> 
convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats,
+                                                                               
   String instantTime,
+                                                                               
   HoodieIndexDefinition indexDefinition,
+                                                                               
   HoodieMetadataConfig metadataConfig,
+                                                                               
   HoodieMetadataFileSystemView fsView,
+                                                                               
   HoodieTableMetaClient dataMetaClient,
+                                                                               
   HoodieEngineContext engineContext,
+                                                                               
   EngineType engineType) {
+    // Secondary index cannot support logs having inserts with current 
offering. So, lets validate that.
+    if (allWriteStats.stream().anyMatch(writeStat -> {
+      String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+      return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+    })) {
+      throw new HoodieIOException("Secondary index cannot support logs having 
inserts with current offering. Please disable secondary index.");
     }
-    final int parallelism = Math.min(partitionFiles.size(), 
secondaryIndexMaxParallelism);
-    final StoragePath basePath = metaClient.getBasePath();
+
     Schema tableSchema;
     try {
-      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+      tableSchema = new 
TableSchemaResolver(dataMetaClient).getTableAvroSchema();

Review Comment:
   what happens here if its a fresh table and we are trying to apply first 
commit to MDT? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -800,52 +797,6 @@ public int 
getNumFileGroupsForPartition(MetadataPartitionType partition) {
     return partitionFileSliceMap.get(partition.getPartitionPath()).size();
   }
 
-  @Override
-  protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> 
recordKeys, String partitionName) {

Review Comment:
   good that we were able to get rid of code snippets w/ adhoc data reading



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to