codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1904263862


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2206,44 +2306,161 @@ public static boolean 
validateDataTypeForSecondaryIndex(List<String> sourceField
     });
   }
 
-  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
-                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
-                                                                        int 
secondaryIndexMaxParallelism,
-                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
-                                                                        
HoodieIndexDefinition indexDefinition) {
-    if (partitionFiles.isEmpty()) {
-      return engineContext.emptyHoodieData();
+  /**
+   * Converts the write stats to secondary index records.
+   *
+   * @param allWriteStats   list of write stats
+   * @param instantTime     instant time
+   * @param indexDefinition secondary index definition
+   * @param metadataConfig  metadata config
+   * @param fsView          file system view as of instant time
+   * @param dataMetaClient  data table meta client
+   * @param engineContext   engine context
+   * @param engineType      engine type (e.g. SPARK, FLINK or JAVA)
+   * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the 
metadata table for the given secondary index partition
+   */
+  public static HoodieData<HoodieRecord> 
convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats,
+                                                                               
   String instantTime,
+                                                                               
   HoodieIndexDefinition indexDefinition,
+                                                                               
   HoodieMetadataConfig metadataConfig,
+                                                                               
   HoodieMetadataFileSystemView fsView,
+                                                                               
   HoodieTableMetaClient dataMetaClient,
+                                                                               
   HoodieEngineContext engineContext,
+                                                                               
   EngineType engineType) {
+    // Secondary index cannot support logs having inserts with current 
offering. So, lets validate that.
+    if (allWriteStats.stream().anyMatch(writeStat -> {
+      String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+      return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+    })) {
+      throw new HoodieIOException("Secondary index cannot support logs having 
inserts with current offering. Please disable secondary index.");
     }
-    final int parallelism = Math.min(partitionFiles.size(), 
secondaryIndexMaxParallelism);
-    final StoragePath basePath = metaClient.getBasePath();
+
     Schema tableSchema;
     try {
-      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+      tableSchema = new 
TableSchemaResolver(dataMetaClient).getTableAvroSchema();
     } catch (Exception e) {
-      throw new HoodieException("Failed to get latest schema for " + 
metaClient.getBasePath(), e);
-    }
-
-    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
-    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionWithBaseAndLogFiles -> {
-      final String partition = partitionWithBaseAndLogFiles.getKey();
-      final Pair<String, List<String>> baseAndLogFiles = 
partitionWithBaseAndLogFiles.getValue();
-      List<String> logFilePaths = new ArrayList<>();
-      baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
-      String baseFilePath = baseAndLogFiles.getKey();
-      Option<StoragePath> dataFilePath = baseFilePath.isEmpty() ? 
Option.empty() : Option.of(FSUtils.constructAbsolutePath(basePath, 
baseFilePath));
-      Schema readerSchema;
-      if (dataFilePath.isPresent()) {
-        readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
-            
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
-            .readAvroSchema(metaClient.getStorage(), dataFilePath.get());
+      throw new HoodieException("Failed to get latest schema for " + 
dataMetaClient.getBasePath(), e);
+    }
+    Map<String, List<HoodieWriteStat>> writeStatsByFileId = 
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+    int parallelism = Math.max(Math.min(writeStatsByFileId.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+
+    return engineContext.parallelize(new 
ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism).flatMap(writeStatsByFileIdEntry -> {
+      String fileId = writeStatsByFileIdEntry.getKey();
+      List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
+      String partition = writeStats.get(0).getPartitionPath();
+      FileSlice previousFileSliceForFileId = 
fsView.getLatestFileSlice(partition, fileId).orElse(null);

Review Comment:
   We are still using filesystem view for now, instead of populating file slice 
in the writestats (through append handle). There could be some issue due to 
NBCC with CDC enabled. Trying to clarify it in 
https://github.com/apache/hudi/pull/12582
   Btw, here fsView call is destributed by partition, fileId. So, not making 
repeated calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to