yihua commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1584134207
##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -67,6 +70,61 @@ public static BaseFileUtils getInstance(HoodieFileFormat
fileFormat) {
throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
}
+ /**
+ * Aggregate column range statistics across files in a partition.
+ *
+ * @param fileColumnRanges List of column range statistics for each file in
a partition
+ */
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>>
fileColumnRanges) {
+ if (fileColumnRanges.size() == 1) {
Review Comment:
This seems unnecessary as the `reduce` should handle size of 1. Also, can
`fileColumnRanges` list be empty, e.g., empty parquet file?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -67,6 +70,61 @@ public static BaseFileUtils getInstance(HoodieFileFormat
fileFormat) {
throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
}
+ /**
+ * Aggregate column range statistics across files in a partition.
+ *
+ * @param fileColumnRanges List of column range statistics for each file in
a partition
+ */
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>>
fileColumnRanges) {
+ if (fileColumnRanges.size() == 1) {
+ // Only one parquet file, we can just return that range.
+ return fileColumnRanges.get(0);
+ }
+ // There are multiple files. Compute min(file_mins) and max(file_maxs)
+ return fileColumnRanges.stream()
+ .sequential()
Review Comment:
Does this have to be sequential? or it does not matter.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -403,6 +400,13 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
ValidationUtils.checkState(functionalIndexPartitionsToInit.size()
== 1, "Only one functional index at a time is supported for now");
fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
break;
+ case PARTITION_STATS:
+ if
(dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
+ LOG.warn("Skipping partition stats index initialization as
target columns are not set");
+ continue;
+ }
+ fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
+ break;
Review Comment:
Not required for this PR. IMO all such switch-case logic should be included
in the `MetadataPartitionType` enum for easier extensibility.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -372,6 +379,10 @@ public static Map<MetadataPartitionType,
HoodieData<HoodieRecord>> convertMetada
dataMetaClient, isColumnStatsIndexEnabled,
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
metadataColumnStatsRDD);
}
+ if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS))
{
+ final HoodieData<HoodieRecord> partitionStatsRDD =
convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient,
metadataConfig);
+ partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS,
partitionStatsRDD);
+ }
Review Comment:
A side topic, does this compose a sub-graph in DAG without triggering the
execution? Ideally, all types of metadata should be computed in parallel
leveraging the parallelism in Spark, instead of being computed type by type
sequentially.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -330,6 +330,27 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating functional
index.");
+ public static final ConfigProperty<Boolean>
ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
+ .key(METADATA_PREFIX + ".index.partition.stats.enable")
+ .defaultValue(true)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Enable aggregating stats for each column at the
storage partition level. ");
Review Comment:
```suggestion
.withDocumentation("Enable aggregating stats for each column at the
storage partition level.");
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -403,6 +400,13 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
ValidationUtils.checkState(functionalIndexPartitionsToInit.size()
== 1, "Only one functional index at a time is supported for now");
fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
break;
+ case PARTITION_STATS:
+ if
(dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
+ LOG.warn("Skipping partition stats index initialization as
target columns are not set");
+ continue;
+ }
+ fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
+ break;
Review Comment:
Then, `HoodieBackedTableMetadataWriter` can also be simplified.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -104,13 +104,14 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataPayload.class);
/**
* Type of the record. This can be an enum in the schema but Avro1.8
- * has a bug - https://issues.apache.org/jira/browse/AVRO-1810
+ * has a bug - <a
href="https://issues.apache.org/jira/browse/AVRO-1810">...</a>
*/
- protected static final int METADATA_TYPE_PARTITION_LIST = 1;
- protected static final int METADATA_TYPE_FILE_LIST = 2;
- protected static final int METADATA_TYPE_COLUMN_STATS = 3;
- protected static final int METADATA_TYPE_BLOOM_FILTER = 4;
+ private static final int METADATA_TYPE_PARTITION_LIST = 1;
+ private static final int METADATA_TYPE_FILE_LIST = 2;
+ private static final int METADATA_TYPE_COLUMN_STATS = 3;
+ private static final int METADATA_TYPE_BLOOM_FILTER = 4;
private static final int METADATA_TYPE_RECORD_INDEX = 5;
+ private static final int METADATA_TYPE_PARTITION_STATS = 6;
Review Comment:
Right, no schema change; just code restructuring maintaining the same ID,
i.e., add `getId` in `MetadataPartitionType`. You can refactor this part in a
separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]