codope commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1618694958
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord>
readRecordKeysFromFileSlices(HoodieEngine
});
}
- public static Schema
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+ public static Schema
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
Schema tableSchema = schemaResolver.getTableAvroSchema();
return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
}
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+ int
recordIndexMaxParallelism,
+ String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) {
+ if (partitionFiles.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
+ final int parallelism = Math.min(partitionFiles.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final Pair<String, List<String>> baseAndLogFiles =
partitionAndBaseFile.getValue();
+ List<String> logFilePaths = new ArrayList<>();
+ baseAndLogFiles.getValue().forEach(logFile -> {
+ logFilePaths.add(basePath + StoragePath.SEPARATOR + partition +
StoragePath.SEPARATOR + logFile);
+ });
+ String filePath = baseAndLogFiles.getKey();
+
+ Option<StoragePath> dataFilePath = Option.empty();
+ Schema tableSchema;
+
+ if (!filePath.isEmpty()) {
+ dataFilePath = Option.of(filePath(basePath, "", filePath));
+ tableSchema =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
dataFilePath.get());
+ } else {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
Review Comment:
Good point! I have moved out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]