alexeykudinkin commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r822126414
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
updateWriteStatus(stat, result);
}
+ if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
stat.getRecordsStats().isPresent()
+ ? stat.getRecordsStats().get().getStats() : new HashMap<>();
Review comment:
@codope that's what i was referring to with my comments regarding
increased complexity in respect to `RecordStats`. Why not just have
`stat.getRecordsStats().get()` instead?
Now, when reading this code reader actually need to understand what is this
additional `getStats()` call is about and why it's needed, while w/o it the
call-site is crystal clear and doesn't require scanning through of
`getRecordStats` to understand what's going on
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
updateWriteStatus(stat, result);
}
+ if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
stat.getRecordsStats().isPresent()
+ ? stat.getRecordsStats().get().getStats() : new HashMap<>();
+ final String filePath = stat.getPath();
+ // initialize map of column name to map of stats name to stats value
+ Map<String, Map<String, Object>> columnToStats = new HashMap<>();
+ writeSchemaWithMetaFields.getFields().forEach(field ->
columnToStats.putIfAbsent(field.name(), new HashMap<>()));
+ // collect stats for columns at once per record and keep iterating
through every record to eventually find col stats for all fields.
+ recordList.forEach(record -> aggregateColumnStats(record,
writeSchemaWithMetaFields, columnToStats,
config.isConsistentLogicalTimestampEnabled()));
Review comment:
Can we, instead of placing iteration and aggregation into separate
methods, consolidate them in `aggregateColumnStats` so that its signature
actually is:
```
Map<String, Map<...>> aggregateColumnStats(records, writeSchema, ...)
```
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates all parameters required to generate metadata index for enabled
index types.
+ */
+public class MetadataRecordsGenerationParams implements Serializable {
+
+ private final HoodieTableMetaClient dataMetaClient;
Review comment:
Let's limit the scope of this component to just _parameters_ for Index
Generation. Otherwise this has a potential to become a dependency magnet, where
random dependencies will be added here to avoid threading them through.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
}
+ /**
+ * Accumulates column range metadata for the given field and updates the
column range map.
+ *
+ * @param field - column for which statistics will be computed
+ * @param filePath - data file path
+ * @param columnRangeMap - old column range statistics, which will be merged
in this computation
+ * @param columnToStats - map of column to map of each stat and its value
+ */
+ public static void accumulateColumnRanges(Schema.Field field, String
filePath,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+ Map<String, Map<String, Object>>
columnToStats) {
+ Map<String, Object> columnStats = columnToStats.get(field.name());
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ String.valueOf(columnStats.get(MIN)),
+ String.valueOf(columnStats.get(MAX)),
+ Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE,
0).toString())
+ );
+ columnRangeMap.merge(field.name(), columnRangeMetadata,
COLUMN_RANGE_MERGE_FUNCTION);
+ }
+
+ /**
+ * Aggregates column stats for each field.
+ *
+ * @param record - current record
+ * @param schema - write schema
+ * @param columnToStats - map of column to map of each
stat and its value which gets updates in this method
+ * @param consistentLogicalTimestampEnabled - flag to deal with logical
timestamp type when getting column value
+ */
+ public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+ Map<String, Map<String, Object>>
columnToStats,
+ boolean
consistentLogicalTimestampEnabled) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+
+ schema.getFields().forEach(field -> {
+ Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
+ final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ // update stats
+ final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ columnStats.put(TOTAL_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+ columnStats.put(TOTAL_UNCOMPRESSED_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+ fieldSize);
+
+ if (!StringUtils.isNullOrEmpty(fieldVal)) {
+ // set the min value of the field
+ if (!columnStats.containsKey(MIN)) {
+ columnStats.put(MIN, fieldVal);
+ }
+ if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
Review comment:
We can't compare values as strings this is incorrect ("12" < "2")
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,14 +332,16 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
/**
* Convert clean metadata to bloom filter index records.
*
- * @param cleanMetadata - Clean action metadata
- * @param engineContext - Engine context
- * @param instantTime - Clean action instant time
+ * @param cleanMetadata - Clean action metadata
+ * @param engineContext - Engine context
+ * @param instantTime - Clean action instant time
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
* @return List of bloom filter index records for the clean metadata
*/
- public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
-
HoodieEngineContext engineContext,
- String
instantTime) {
+ public static HoodieData<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
Review comment:
nit: There's general convention that "context" objects are usually
passed as first arg
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates all parameters required to generate metadata index for enabled
index types.
+ */
+public class MetadataRecordsGenerationParams implements Serializable {
+
+ private final HoodieTableMetaClient dataMetaClient;
Review comment:
BTW, i see it as `Serializable`, how are we serializing the `metaClient`?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,14 +332,16 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
/**
* Convert clean metadata to bloom filter index records.
*
- * @param cleanMetadata - Clean action metadata
- * @param engineContext - Engine context
- * @param instantTime - Clean action instant time
+ * @param cleanMetadata - Clean action metadata
+ * @param engineContext - Engine context
+ * @param instantTime - Clean action instant time
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
* @return List of bloom filter index records for the clean metadata
*/
- public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
-
HoodieEngineContext engineContext,
- String
instantTime) {
+ public static HoodieData<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
Review comment:
Just FYI, no need to fix this
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
}
+ /**
+ * Accumulates column range metadata for the given field and updates the
column range map.
+ *
+ * @param field - column for which statistics will be computed
+ * @param filePath - data file path
+ * @param columnRangeMap - old column range statistics, which will be merged
in this computation
+ * @param columnToStats - map of column to map of each stat and its value
+ */
+ public static void accumulateColumnRanges(Schema.Field field, String
filePath,
Review comment:
Can we unify both of these methods into one?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -867,41 +889,56 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
}
}
- private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient) {
- return getLatestColumns(datasetMetaClient, false);
+ public static HoodieMetadataColumnStats
mergeColumnStats(HoodieMetadataColumnStats oldColumnStats,
HoodieMetadataColumnStats newColumnStats) {
+
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+ if (newColumnStats.getIsDeleted()) {
+ return newColumnStats;
+ }
+ return HoodieMetadataColumnStats.newBuilder()
+ .setFileName(newColumnStats.getFileName())
+ .setMinValue(Stream.of(oldColumnStats.getMinValue(),
newColumnStats.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
+ .setMaxValue(Stream.of(oldColumnStats.getMinValue(),
newColumnStats.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
+ .setValueCount(oldColumnStats.getValueCount() +
newColumnStats.getValueCount())
+ .setNullCount(oldColumnStats.getNullCount() +
newColumnStats.getNullCount())
+ .setTotalSize(oldColumnStats.getTotalSize() +
newColumnStats.getTotalSize())
+ .setTotalUncompressedSize(oldColumnStats.getTotalUncompressedSize() +
newColumnStats.getTotalUncompressedSize())
+ .setIsDeleted(newColumnStats.getIsDeleted())
+ .build();
}
public static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
HoodieTableMetaClient datasetMetaClient,
-
List<String> latestColumns) {
- return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, latestColumns, false);
-
+
List<String> columnsToIndex) {
+ if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)
writeStat).getRecordsStats().isPresent()) {
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats();
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ArrayList<>(columnRangeMap.values());
+ return
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(),
columnRangeMetadataList, false);
+ }
+ return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, columnsToIndex,false);
}
private static Stream<HoodieRecord> getColumnStats(final String
partitionPath, final String filePathWithPartition,
HoodieTableMetaClient
datasetMetaClient,
- List<String> columns,
boolean isDeleted) {
- final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionPath;
+ List<String>
columnsToIndex,
+ boolean isDeleted) {
+ final String partition = getPartition(partitionPath);
final int offset = partition.equals(NON_PARTITIONED_NAME) ?
(filePathWithPartition.startsWith("/") ? 1 : 0)
: partition.length() + 1;
final String fileName = filePathWithPartition.substring(offset);
- if (!FSUtils.isBaseFile(new Path(fileName))) {
- return Stream.empty();
- }
if
(filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ArrayList<>();
final Path fullFilePath = new Path(datasetMetaClient.getBasePath(),
filePathWithPartition);
if (!isDeleted) {
Review comment:
Deleted files handling is invariant of the file format, right?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
}
+ /**
+ * Accumulates column range metadata for the given field and updates the
column range map.
+ *
+ * @param field - column for which statistics will be computed
+ * @param filePath - data file path
+ * @param columnRangeMap - old column range statistics, which will be merged
in this computation
+ * @param columnToStats - map of column to map of each stat and its value
+ */
+ public static void accumulateColumnRanges(Schema.Field field, String
filePath,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+ Map<String, Map<String, Object>>
columnToStats) {
+ Map<String, Object> columnStats = columnToStats.get(field.name());
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ String.valueOf(columnStats.get(MIN)),
+ String.valueOf(columnStats.get(MAX)),
+ Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE,
0).toString())
+ );
+ columnRangeMap.merge(field.name(), columnRangeMetadata,
COLUMN_RANGE_MERGE_FUNCTION);
+ }
+
+ /**
+ * Aggregates column stats for each field.
+ *
+ * @param record - current record
+ * @param schema - write schema
+ * @param columnToStats - map of column to map of each
stat and its value which gets updates in this method
+ * @param consistentLogicalTimestampEnabled - flag to deal with logical
timestamp type when getting column value
+ */
+ public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+ Map<String, Map<String, Object>>
columnToStats,
+ boolean
consistentLogicalTimestampEnabled) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+
+ schema.getFields().forEach(field -> {
+ Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
+ final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ // update stats
+ final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ columnStats.put(TOTAL_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+ columnStats.put(TOTAL_UNCOMPRESSED_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+ fieldSize);
+
+ if (!StringUtils.isNullOrEmpty(fieldVal)) {
+ // set the min value of the field
+ if (!columnStats.containsKey(MIN)) {
+ columnStats.put(MIN, fieldVal);
+ }
+ if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
+ columnStats.put(MIN, fieldVal);
+ }
+ // set the max value of the field
+ if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX,
""))) > 0) {
+ columnStats.put(MAX, fieldVal);
Review comment:
We don't need Map for that, right? Let's instead create mutable object
with all the statistics that we're collecting:
```
class FileColumnStats {
Object min, max;
long count, totalSize;
// ...
}
```
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
}
+ /**
+ * Accumulates column range metadata for the given field and updates the
column range map.
+ *
+ * @param field - column for which statistics will be computed
+ * @param filePath - data file path
+ * @param columnRangeMap - old column range statistics, which will be merged
in this computation
+ * @param columnToStats - map of column to map of each stat and its value
+ */
+ public static void accumulateColumnRanges(Schema.Field field, String
filePath,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+ Map<String, Map<String, Object>>
columnToStats) {
+ Map<String, Object> columnStats = columnToStats.get(field.name());
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ String.valueOf(columnStats.get(MIN)),
+ String.valueOf(columnStats.get(MAX)),
+ Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE,
0).toString())
+ );
+ columnRangeMap.merge(field.name(), columnRangeMetadata,
COLUMN_RANGE_MERGE_FUNCTION);
+ }
+
+ /**
+ * Aggregates column stats for each field.
+ *
+ * @param record - current record
+ * @param schema - write schema
+ * @param columnToStats - map of column to map of each
stat and its value which gets updates in this method
+ * @param consistentLogicalTimestampEnabled - flag to deal with logical
timestamp type when getting column value
+ */
+ public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+ Map<String, Map<String, Object>>
columnToStats,
+ boolean
consistentLogicalTimestampEnabled) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+
+ schema.getFields().forEach(field -> {
+ Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
Review comment:
Please avoid such `HashMap` allocations, since this is just churning
objects
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
##########
@@ -69,4 +73,24 @@ public void addLogFiles(String logFile) {
public List<String> getLogFiles() {
return logFiles;
}
+
+ public void setRecordsStats(RecordsStats<? extends Map> stats) {
+ recordsStats = Option.of(stats);
+ }
+
+ public Option<RecordsStats<? extends Map>> getRecordsStats() {
+ return recordsStats;
+ }
+
+ public static class RecordsStats<T> implements Serializable {
Review comment:
@codope i'm concerned about it as an abstraction that isn't bringing
much value, while increasing complexity: It adds cognitive load to understand
what it does for anybody interacting with it.
In general, i'd suggest to follow the principle to _keep things as simple as
possible, but no simpler than needed to solve the problem_. It helps on many
fronts:
1. Makes the code easier to comprehend
2. Makes component evolution easier (the simpler things are, the easier it
is to evolve them)
3. Makes component age better: if things change and we need to refactor it
-- the simpler the system is, the easier the refactoring will be
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -651,6 +641,14 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
+ return new MetadataRecordsGenerationParams(
Review comment:
BTW, why do we even need this component if we can just get all of this
from the Writer Config?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
}
+ /**
+ * Accumulates column range metadata for the given field and updates the
column range map.
+ *
+ * @param field - column for which statistics will be computed
+ * @param filePath - data file path
+ * @param columnRangeMap - old column range statistics, which will be merged
in this computation
+ * @param columnToStats - map of column to map of each stat and its value
+ */
+ public static void accumulateColumnRanges(Schema.Field field, String
filePath,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+ Map<String, Map<String, Object>>
columnToStats) {
+ Map<String, Object> columnStats = columnToStats.get(field.name());
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ String.valueOf(columnStats.get(MIN)),
+ String.valueOf(columnStats.get(MAX)),
+ Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE,
0).toString())
+ );
+ columnRangeMap.merge(field.name(), columnRangeMetadata,
COLUMN_RANGE_MERGE_FUNCTION);
+ }
+
+ /**
+ * Aggregates column stats for each field.
+ *
+ * @param record - current record
+ * @param schema - write schema
+ * @param columnToStats - map of column to map of each
stat and its value which gets updates in this method
+ * @param consistentLogicalTimestampEnabled - flag to deal with logical
timestamp type when getting column value
+ */
+ public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+ Map<String, Map<String, Object>>
columnToStats,
+ boolean
consistentLogicalTimestampEnabled) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+
+ schema.getFields().forEach(field -> {
+ Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
+ final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ // update stats
+ final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ columnStats.put(TOTAL_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
Review comment:
Why do we need to `parseLong` every time?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -867,41 +889,56 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
}
}
- private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient) {
- return getLatestColumns(datasetMetaClient, false);
+ public static HoodieMetadataColumnStats
mergeColumnStats(HoodieMetadataColumnStats oldColumnStats,
HoodieMetadataColumnStats newColumnStats) {
+
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+ if (newColumnStats.getIsDeleted()) {
Review comment:
We need to handle inverse case as well -- when existing records is a
deleted one, otherwise we will merge incorrectly
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
}
+ /**
+ * Accumulates column range metadata for the given field and updates the
column range map.
+ *
+ * @param field - column for which statistics will be computed
+ * @param filePath - data file path
+ * @param columnRangeMap - old column range statistics, which will be merged
in this computation
+ * @param columnToStats - map of column to map of each stat and its value
+ */
+ public static void accumulateColumnRanges(Schema.Field field, String
filePath,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+ Map<String, Map<String, Object>>
columnToStats) {
+ Map<String, Object> columnStats = columnToStats.get(field.name());
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ String.valueOf(columnStats.get(MIN)),
+ String.valueOf(columnStats.get(MAX)),
+ Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+ Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE,
0).toString())
+ );
+ columnRangeMap.merge(field.name(), columnRangeMetadata,
COLUMN_RANGE_MERGE_FUNCTION);
+ }
+
+ /**
+ * Aggregates column stats for each field.
+ *
+ * @param record - current record
+ * @param schema - write schema
+ * @param columnToStats - map of column to map of each
stat and its value which gets updates in this method
+ * @param consistentLogicalTimestampEnabled - flag to deal with logical
timestamp type when getting column value
+ */
+ public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+ Map<String, Map<String, Object>>
columnToStats,
+ boolean
consistentLogicalTimestampEnabled) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+
+ schema.getFields().forEach(field -> {
+ Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
+ final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ // update stats
+ final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ columnStats.put(TOTAL_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+ columnStats.put(TOTAL_UNCOMPRESSED_SIZE,
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+ fieldSize);
+
+ if (!StringUtils.isNullOrEmpty(fieldVal)) {
+ // set the min value of the field
+ if (!columnStats.containsKey(MIN)) {
+ columnStats.put(MIN, fieldVal);
+ }
+ if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
Review comment:
We can leverage Parquet's comparators for that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]