codope commented on a change in pull request #5070:
URL: https://github.com/apache/hudi/pull/5070#discussion_r830436453
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -838,55 +857,52 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext,
MetadataRecordsGenerationParams recordsGenerationParams) {
- try {
- List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
- .flatMap(entry -> entry.stream()).collect(Collectors.toList());
- return
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext,
allWriteStats, recordsGenerationParams);
- } catch (Exception e) {
- throw new HoodieException("Failed to generate column stats records for
metadata table ", e);
- }
- }
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
- /**
- * Create column stats from write status.
- *
- * @param engineContext - Engine context
- * @param allWriteStats - Write status to convert
- * @param recordsGenerationParams - Parameters for columns stats record
generation
- */
- public static HoodieData<HoodieRecord>
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
-
List<HoodieWriteStat> allWriteStats,
-
MetadataRecordsGenerationParams recordsGenerationParams) {
if (allWriteStats.isEmpty()) {
return engineContext.emptyHoodieData();
}
- final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(),
recordsGenerationParams.isAllColumnStatsIndexEnabled());
- final int parallelism = Math.max(Math.min(allWriteStats.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- HoodieData<HoodieWriteStat> allWriteStatsRDD =
engineContext.parallelize(allWriteStats, parallelism);
- return allWriteStatsRDD.flatMap(writeStat ->
translateWriteStatToColumnStats(writeStat,
recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator());
+
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .map(writerSchemaStr -> new
Schema.Parser().parse(writerSchemaStr));
+
+ HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
+
+ List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+ dataTableMetaClient.getTableConfig(), writerSchema);
+
+ if (columnsToIndex.isEmpty()) {
+ // In case there are no columns to index, bail
+ return engineContext.emptyHoodieData();
+ }
+
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat ->
+ translateWriteStatToColumnStats(writeStat, dataTableMetaClient,
columnsToIndex).iterator());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for
metadata table", e);
+ }
}
/**
* Get the latest columns for the table for column stats indexing.
- *
- * @param datasetMetaClient - Data table meta client
- * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing
enabled for all columns
*/
- private static List<String> getColumnsToIndex(HoodieTableMetaClient
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
- if (!isMetaIndexColumnStatsForAllColumns
- ||
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
< 1) {
- return
Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(","));
+ private static List<String>
getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
+ HoodieTableConfig tableConfig,
+ Option<Schema> writerSchema) {
+ if (!recordsGenParams.isAllColumnStatsIndexEnabled()) {
+ // TODO why are we only indexing primary key? revisit fallback
Review comment:
This will change as we'll be adding support to index multiple columns:
https://github.com/apache/hudi/pull/4693/files#diff-11e9ef6bd53ef1001b669a1dc68dde2aba9b33c9eb72cc1e4198750336d79772
No change needed from your side. I'll take care of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]