codope commented on a change in pull request #4761:
URL: https://github.com/apache/hudi/pull/4761#discussion_r803315237
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -599,4 +600,34 @@ public static Object getRecordColumnValues(HoodieRecord<?
extends HoodieRecordPa
SerializableSchema schema,
boolean consistentLogicalTimestampEnabled) {
return getRecordColumnValues(record, columns, schema.get(),
consistentLogicalTimestampEnabled);
}
+
+ /**
+ * Accumulate column range statistics for the requested record.
+ *
+ * @param record - Record to get the column range statistics for
+ * @param schema - Schema for the record
+ * @param filePath - File that record belongs to
+ */
+ public static void accumulateColumnRanges(IndexedRecord record, Schema
schema, String filePath,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+
+ schema.getFields().forEach(field -> {
+ final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, true);
+ final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ final HoodieColumnRangeMetadata<Comparable> fieldRange = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ fieldVal,
+ fieldVal,
+ fieldVal == null ? 1 : 0,
Review comment:
nit: better to declare `1` and `0` as meaningful constant to help
readers?
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -599,4 +600,34 @@ public static Object getRecordColumnValues(HoodieRecord<?
extends HoodieRecordPa
SerializableSchema schema,
boolean consistentLogicalTimestampEnabled) {
return getRecordColumnValues(record, columns, schema.get(),
consistentLogicalTimestampEnabled);
}
+
+ /**
+ * Accumulate column range statistics for the requested record.
+ *
+ * @param record - Record to get the column range statistics for
+ * @param schema - Schema for the record
+ * @param filePath - File that record belongs to
+ */
+ public static void accumulateColumnRanges(IndexedRecord record, Schema
schema, String filePath,
Review comment:
Shall we move this method out of HoodieAvroUtils? I don't think avro
utils should be concerned with construction of column range metadata. Moreover,
we can define this method where write config is available so that
`consistentLogicalTimestampEnabled` is not hardcoded in the call
`HoodieAvroUtils#getNestedFieldValAsString`. I am okay with keeping this method
private in `HoodieAppendHandle` as that's the only place it's being used
currently.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -33,6 +38,24 @@
private final long totalSize;
private final long totalUncompressedSize;
+ public static final BiFunction<HoodieColumnRangeMetadata<Comparable>,
HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>>
COLUMN_RANGE_MERGE_FUNCTION =
+ (oldColumnRange, newColumnRange) -> {
+
ValidationUtils.checkArgument(oldColumnRange.getColumnName().equals(newColumnRange.getColumnName()));
+
ValidationUtils.checkArgument(oldColumnRange.getFilePath().equals(newColumnRange.getFilePath()));
+ return new HoodieColumnRangeMetadata<>(
+ newColumnRange.getFilePath(),
+ newColumnRange.getColumnName(),
+ (Comparable) Arrays.asList(oldColumnRange.getMinValue(),
newColumnRange.getMinValue())
Review comment:
nit: remove redundant cast to comparable?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -874,45 +869,53 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
}
}
- private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient) {
- return getLatestColumns(datasetMetaClient, false);
+ private static List<String> getColumnsToIndex(HoodieTableMetaClient
datasetMetaClient) {
+ return getColumnsToIndex(datasetMetaClient, false);
}
public static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
HoodieTableMetaClient datasetMetaClient,
-
List<String> latestColumns) {
- return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, latestColumns, false);
+
List<String> columnsToIndex) {
+ Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> columnRangeMap
= Option.empty();
+ if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)
writeStat).getRecordsStats().isPresent()) {
+ columnRangeMap = Option.of(((HoodieDeltaWriteStat)
writeStat).getRecordsStats().get().getStats());
+ }
+ return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, columnsToIndex,
+ columnRangeMap, false);
}
private static Stream<HoodieRecord> getColumnStats(final String
partitionPath, final String filePathWithPartition,
HoodieTableMetaClient
datasetMetaClient,
- List<String> columns,
boolean isDeleted) {
+ List<String>
columnsToIndex,
+ Option<Map<String,
HoodieColumnRangeMetadata<Comparable>>> columnRangeMap,
+ boolean isDeleted) {
final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionPath;
final int offset = partition.equals(NON_PARTITIONED_NAME) ?
(filePathWithPartition.startsWith("/") ? 1 : 0)
: partition.length() + 1;
final String fileName = filePathWithPartition.substring(offset);
- if (!FSUtils.isBaseFile(new Path(fileName))) {
- return Stream.empty();
- }
if
(filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ArrayList<>();
final Path fullFilePath = new Path(datasetMetaClient.getBasePath(),
filePathWithPartition);
if (!isDeleted) {
try {
columnRangeMetadataList = new
ParquetUtils().readRangeFromParquetMetadata(
- datasetMetaClient.getHadoopConf(), fullFilePath, columns);
+ datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
} catch (Exception e) {
LOG.error("Failed to read column stats for " + fullFilePath, e);
}
} else {
columnRangeMetadataList =
- columns.stream().map(entry -> new
HoodieColumnRangeMetadata<Comparable>(fileName,
+ columnsToIndex.stream().map(entry -> new
HoodieColumnRangeMetadata<Comparable>(fileName,
entry, null, null, 0, 0, 0, 0))
.collect(Collectors.toList());
}
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadataList, isDeleted);
+ } else if (columnRangeMap.isPresent()) {
Review comment:
Should we also check that the stat map is non-empty?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -118,6 +121,26 @@
private HoodieMetadataBloomFilter bloomFilterMetadata = null;
private HoodieMetadataColumnStats columnStatMetadata = null;
+ public static final BiFunction<HoodieMetadataColumnStats,
HoodieMetadataColumnStats, HoodieMetadataColumnStats>
COLUMN_STATS_MERGE_FUNCTION =
+ (oldColumnStats, newColumnStats) -> {
+
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+ if (newColumnStats.getIsDeleted()) {
+ return newColumnStats;
+ }
+ return new HoodieMetadataColumnStats(
+ newColumnStats.getFileName(),
+ Arrays.asList(oldColumnStats.getMinValue(),
newColumnStats.getMinValue())
+
.stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null),
+ Arrays.asList(oldColumnStats.getMinValue(),
newColumnStats.getMinValue())
+
.stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null),
+ oldColumnStats.getNullCount() + newColumnStats.getNullCount(),
Review comment:
Is my understanding correct that since this is append handle and we
don't expect duplicates so simply add these stats?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -118,6 +121,26 @@
private HoodieMetadataBloomFilter bloomFilterMetadata = null;
private HoodieMetadataColumnStats columnStatMetadata = null;
+ public static final BiFunction<HoodieMetadataColumnStats,
HoodieMetadataColumnStats, HoodieMetadataColumnStats>
COLUMN_STATS_MERGE_FUNCTION =
+ (oldColumnStats, newColumnStats) -> {
+
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+ if (newColumnStats.getIsDeleted()) {
+ return newColumnStats;
+ }
+ return new HoodieMetadataColumnStats(
+ newColumnStats.getFileName(),
Review comment:
So this field is called `fileName` in HoodieMetadataColumnStats but
`filePath` in HoodieColumnRangeMetadata. If possible, can we keep the names
consistent?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]