codope commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1618694958
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord>
readRecordKeysFromFileSlices(HoodieEngine
});
}
- public static Schema
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+ public static Schema
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
Schema tableSchema = schemaResolver.getTableAvroSchema();
return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
}
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+ int
recordIndexMaxParallelism,
+ String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) {
+ if (partitionFiles.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
+ final int parallelism = Math.min(partitionFiles.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final Pair<String, List<String>> baseAndLogFiles =
partitionAndBaseFile.getValue();
+ List<String> logFilePaths = new ArrayList<>();
+ baseAndLogFiles.getValue().forEach(logFile -> {
+ logFilePaths.add(basePath + StoragePath.SEPARATOR + partition +
StoragePath.SEPARATOR + logFile);
+ });
+ String filePath = baseAndLogFiles.getKey();
+
+ Option<StoragePath> dataFilePath = Option.empty();
+ Schema tableSchema;
+
+ if (!filePath.isEmpty()) {
+ dataFilePath = Option.of(filePath(basePath, "", filePath));
+ tableSchema =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
dataFilePath.get());
+ } else {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
Review Comment:
Good point! I have moved out but still lazily instantiating only in the case
when there is no base file in the file slice. In most case we will have some
base file, so `TableSchemaResolver` won't be instantiated. Only when inserts
can go to log files and compaction has not taken place, then we need the schema
resolver. This is not very common I belive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]