codope commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1618694958


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
     });
   }
 
-  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition 
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     Schema tableSchema = schemaResolver.getTableAvroSchema();
     return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
   }
 
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+                                                                        int 
recordIndexMaxParallelism,
+                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                        
HoodieIndexDefinition indexDefinition) {
+    if (partitionFiles.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
+    final int parallelism = Math.min(partitionFiles.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final Pair<String, List<String>> baseAndLogFiles = 
partitionAndBaseFile.getValue();
+      List<String> logFilePaths = new ArrayList<>();
+      baseAndLogFiles.getValue().forEach(logFile -> {
+        logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + 
StoragePath.SEPARATOR + logFile);
+      });
+      String filePath = baseAndLogFiles.getKey();
+
+      Option<StoragePath> dataFilePath = Option.empty();
+      Schema tableSchema;
+
+      if (!filePath.isEmpty()) {
+        dataFilePath = Option.of(filePath(basePath, "", filePath));
+        tableSchema = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
 dataFilePath.get());
+      } else {
+        TableSchemaResolver schemaResolver = new 
TableSchemaResolver(metaClient);

Review Comment:
   Good point! I have moved out but still lazily instantiating only in the case 
when there is no base file in the file slice. In most case we will have some 
base file, so `TableSchemaResolver` won't be instantiated. Only when inserts 
can go to log files and compaction has not taken place, then we need the schema 
resolver. This is not very common I belive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to