vinothchandar commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1459591053
##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -64,6 +67,51 @@ public static BaseFileUtils getInstance(HoodieFileFormat
fileFormat) {
throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
}
+ /**
+ * Aggregate column range statistics across files in a partition.
+ *
+ * @param fileRanges List of column range statistics for each file in a
partition
+ */
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>>
fileRanges) {
+ if (fileRanges.size() == 1) {
+ // Only one parquet file, we can just return that range.
+ return fileRanges.get(0);
+ }
+ // There are multiple files. Compute min(file_mins) and max(file_maxs)
+ return fileRanges.stream()
+ .sequential()
+ .reduce(BaseFileUtils::mergeRanges).get();
+ }
+
+ private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
mergeRanges(HoodieColumnRangeMetadata<T> one,
+
HoodieColumnRangeMetadata<T> another) {
+ final T minValue;
+ final T maxValue;
+ if (one.getMinValue() != null && another.getMinValue() != null) {
+ minValue =
one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ?
one.getMinValue() : another.getMinValue();
+ } else if (one.getMinValue() == null) {
+ minValue = another.getMinValue();
+ } else {
+ minValue = one.getMinValue();
+ }
+
+ if (one.getMaxValue() != null && another.getMaxValue() != null) {
+ maxValue =
one.getMaxValue().toString().compareTo(another.getMaxValue().toString()) < 0 ?
another.getMaxValue() : one.getMaxValue();
+ } else if (one.getMaxValue() == null) {
+ maxValue = another.getMaxValue();
+ } else {
+ maxValue = one.getMaxValue();
+ }
+
+ return HoodieColumnRangeMetadata.create(
+ one.getFilePath(),
Review Comment:
the file path itself is dummy right. Lets avoid code like this. Can we pass
in `null`
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String
partition, String filename)
return new Path(basePath, partition + Path.SEPARATOR + filename);
}
}
+
+ public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+
List<DirectoryInfo> partitionInfoList,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
+ // Find the columns to index
+ HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
+ final List<String> columnsToIndex = getColumnsToIndex(
+ recordsGenerationParams,
+ Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+ if (columnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+ LOG.debug(String.format("Indexing %d columns for partition stats index",
columnsToIndex.size()));
Review Comment:
slf4j should support arg substitution without the String.format?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -641,6 +642,36 @@ public static Stream<HoodieRecord>
createColumnStatsRecords(String partitionName
});
}
+ public static Stream<HoodieRecord> createPartitionStatsRecords(String
partitionName,
Review Comment:
partitionName or partitionPath?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -64,6 +67,51 @@ public static BaseFileUtils getInstance(HoodieFileFormat
fileFormat) {
throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
}
+ /**
+ * Aggregate column range statistics across files in a partition.
+ *
+ * @param fileRanges List of column range statistics for each file in a
partition
+ */
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>>
fileRanges) {
+ if (fileRanges.size() == 1) {
+ // Only one parquet file, we can just return that range.
+ return fileRanges.get(0);
+ }
+ // There are multiple files. Compute min(file_mins) and max(file_maxs)
+ return fileRanges.stream()
+ .sequential()
+ .reduce(BaseFileUtils::mergeRanges).get();
+ }
+
+ private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
mergeRanges(HoodieColumnRangeMetadata<T> one,
+
HoodieColumnRangeMetadata<T> another) {
+ final T minValue;
+ final T maxValue;
+ if (one.getMinValue() != null && another.getMinValue() != null) {
+ minValue =
one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ?
one.getMinValue() : another.getMinValue();
Review Comment:
does a string comparison help here? what if the partition columns are
long/int?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String
partition, String filename)
return new Path(basePath, partition + Path.SEPARATOR + filename);
}
}
+
+ public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+
List<DirectoryInfo> partitionInfoList,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
Review Comment:
this seems deprecated?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -641,6 +642,36 @@ public static Stream<HoodieRecord>
createColumnStatsRecords(String partitionName
});
}
+ public static Stream<HoodieRecord> createPartitionStatsRecords(String
partitionName,
+
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+ boolean
isDeleted) {
+ return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
+ HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionName,
columnRangeMetadata),
+ MetadataPartitionType.PARTITION_STATS.getPartitionPath());
+
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataColumnStats.newBuilder()
+ .setFileName(new
Path(columnRangeMetadata.getFilePath()).getName())
Review Comment:
remove the file name?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String
partition, String filename)
return new Path(basePath, partition + Path.SEPARATOR + filename);
}
}
+
+ public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
Review Comment:
I see that you are using
`recordsGenParams.getTargetColumnsForColumnStatsIndex();` ultimately to
generate the stats. do you just add the partition fields into the target
columns? to make partition stats aggregation happen automatically.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String
partition, String filename)
return new Path(basePath, partition + Path.SEPARATOR + filename);
}
}
+
+ public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+
List<DirectoryInfo> partitionInfoList,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
+ // Find the columns to index
+ HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
+ final List<String> columnsToIndex = getColumnsToIndex(
+ recordsGenerationParams,
+ Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+ if (columnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+ LOG.debug(String.format("Indexing %d columns for partition stats index",
columnsToIndex.size()));
+ // Create records for MDT
+ int parallelism = Math.max(Math.min(partitionInfoList.size(),
recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionInfoList,
parallelism).flatMap(partitionFiles -> {
+ final String partitionName = partitionFiles.getRelativePath();
+ Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata =
partitionFiles.getFileNameToSizeMap().keySet().stream()
+ .map(fileName -> getFileStatsRangeMetadata(partitionName,
partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
+ .map(BaseFileUtils::getColumnRangeInPartition);
+ return HoodieMetadataPayload.createPartitionStatsRecords(partitionName,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
+ });
+ }
+
+ private static List<HoodieColumnRangeMetadata<Comparable>>
getFileStatsRangeMetadata(String partitionPath,
+
String filePath,
+
HoodieTableMetaClient datasetMetaClient,
+
List<String> columnsToIndex,
+
boolean isDeleted) {
+ String filePartitionPath = filePath.startsWith("/") ?
filePath.substring(1) : filePath;
Review Comment:
what are the cases where we would and would not get a "/". or is this just
defensive ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]