alexeykudinkin commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r822126414



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
stat.getRecordsStats().isPresent()
+          ? stat.getRecordsStats().get().getStats() : new HashMap<>();

Review comment:
       @codope that's what i was referring to with my comments regarding 
increased complexity in respect to `RecordStats`. Why not just have 
`stat.getRecordsStats().get()` instead?
   
   Now, when reading this code reader actually need to understand what is this 
additional `getStats()` call is about and why it's needed, while w/o it the 
call-site is crystal clear and doesn't require scanning through of 
`getRecordStats` to understand what's going on

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
stat.getRecordsStats().isPresent()
+          ? stat.getRecordsStats().get().getStats() : new HashMap<>();
+      final String filePath = stat.getPath();
+      // initialize map of column name to map of stats name to stats value
+      Map<String, Map<String, Object>> columnToStats = new HashMap<>();
+      writeSchemaWithMetaFields.getFields().forEach(field -> 
columnToStats.putIfAbsent(field.name(), new HashMap<>()));
+      // collect stats for columns at once per record and keep iterating 
through every record to eventually find col stats for all fields.
+      recordList.forEach(record -> aggregateColumnStats(record, 
writeSchemaWithMetaFields, columnToStats, 
config.isConsistentLogicalTimestampEnabled()));

Review comment:
       Can we, instead of placing iteration and aggregation into separate 
methods, consolidate them in `aggregateColumnStats` so that its signature 
actually is:
   
   ```
   Map<String, Map<...>> aggregateColumnStats(records, writeSchema, ...)
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates all parameters required to generate metadata index for enabled 
index types.
+ */
+public class MetadataRecordsGenerationParams implements Serializable {
+
+  private final HoodieTableMetaClient dataMetaClient;

Review comment:
       Let's limit the scope of this component to just _parameters_ for Index 
Generation. Otherwise this has a potential to become a dependency magnet, where 
random dependencies will be added here to avoid threading them through.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final 
MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the 
column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged 
in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String 
filePath,
+                                            Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> 
columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new 
HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 
0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, 
COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each 
stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical 
timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> 
columnToStats,
+                                          boolean 
consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = 
columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) 
record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) 
+ fieldSize);
+
+      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+        // set the min value of the field
+        if (!columnStats.containsKey(MIN)) {
+          columnStats.put(MIN, fieldVal);
+        }
+        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {

Review comment:
       We can't compare values as strings this is incorrect ("12" < "2")

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,14 +332,16 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
   /**
    * Convert clean metadata to bloom filter index records.
    *
-   * @param cleanMetadata - Clean action metadata
-   * @param engineContext - Engine context
-   * @param instantTime   - Clean action instant time
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param instantTime             - Clean action instant time
+   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
    * @return List of bloom filter index records for the clean metadata
    */
-  public static List<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       
HoodieEngineContext engineContext,
-                                                                       String 
instantTime) {
+  public static HoodieData<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,

Review comment:
       nit: There's general convention that "context" objects are usually 
passed as first arg

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates all parameters required to generate metadata index for enabled 
index types.
+ */
+public class MetadataRecordsGenerationParams implements Serializable {
+
+  private final HoodieTableMetaClient dataMetaClient;

Review comment:
       BTW, i see it as `Serializable`, how are we serializing the `metaClient`?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,14 +332,16 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
   /**
    * Convert clean metadata to bloom filter index records.
    *
-   * @param cleanMetadata - Clean action metadata
-   * @param engineContext - Engine context
-   * @param instantTime   - Clean action instant time
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param instantTime             - Clean action instant time
+   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
    * @return List of bloom filter index records for the clean metadata
    */
-  public static List<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       
HoodieEngineContext engineContext,
-                                                                       String 
instantTime) {
+  public static HoodieData<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,

Review comment:
       Just FYI, no need to fix this 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final 
MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the 
column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged 
in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String 
filePath,

Review comment:
       Can we unify both of these methods into one?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -867,41 +889,56 @@ public static HoodieTableFileSystemView 
getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
+  public static HoodieMetadataColumnStats 
mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, 
HoodieMetadataColumnStats newColumnStats) {
+    
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+    if (newColumnStats.getIsDeleted()) {
+      return newColumnStats;
+    }
+    return HoodieMetadataColumnStats.newBuilder()
+        .setFileName(newColumnStats.getFileName())
+        .setMinValue(Stream.of(oldColumnStats.getMinValue(), 
newColumnStats.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
+        .setMaxValue(Stream.of(oldColumnStats.getMinValue(), 
newColumnStats.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
+        .setValueCount(oldColumnStats.getValueCount() + 
newColumnStats.getValueCount())
+        .setNullCount(oldColumnStats.getNullCount() + 
newColumnStats.getNullCount())
+        .setTotalSize(oldColumnStats.getTotalSize() + 
newColumnStats.getTotalSize())
+        .setTotalUncompressedSize(oldColumnStats.getTotalUncompressedSize() + 
newColumnStats.getTotalUncompressedSize())
+        .setIsDeleted(newColumnStats.getIsDeleted())
+        .build();
   }
 
   public static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
                                                                      
HoodieTableMetaClient datasetMetaClient,
-                                                                     
List<String> latestColumns) {
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, latestColumns, false);
-
+                                                                     
List<String> columnsToIndex) {
+    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getRecordsStats().isPresent()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats();
+      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
new ArrayList<>(columnRangeMap.values());
+      return 
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), 
columnRangeMetadataList, false);
+    }
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, columnsToIndex,false);
   }
 
   private static Stream<HoodieRecord> getColumnStats(final String 
partitionPath, final String filePathWithPartition,
                                                      HoodieTableMetaClient 
datasetMetaClient,
-                                                     List<String> columns, 
boolean isDeleted) {
-    final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionPath;
+                                                     List<String> 
columnsToIndex,
+                                                     boolean isDeleted) {
+    final String partition = getPartition(partitionPath);
     final int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(filePathWithPartition.startsWith("/") ? 1 : 0)
         : partition.length() + 1;
     final String fileName = filePathWithPartition.substring(offset);
-    if (!FSUtils.isBaseFile(new Path(fileName))) {
-      return Stream.empty();
-    }
 
     if 
(filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
       List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
new ArrayList<>();
       final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), 
filePathWithPartition);
       if (!isDeleted) {

Review comment:
       Deleted files handling is invariant of the file format, right?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final 
MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the 
column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged 
in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String 
filePath,
+                                            Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> 
columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new 
HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 
0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, 
COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each 
stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical 
timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> 
columnToStats,
+                                          boolean 
consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = 
columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) 
record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) 
+ fieldSize);
+
+      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+        // set the min value of the field
+        if (!columnStats.containsKey(MIN)) {
+          columnStats.put(MIN, fieldVal);
+        }
+        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
+          columnStats.put(MIN, fieldVal);
+        }
+        // set the max value of the field
+        if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX, 
""))) > 0) {
+          columnStats.put(MAX, fieldVal);

Review comment:
       We don't need Map for that, right? Let's instead create mutable object 
with all the statistics that we're collecting:
   
   ```
   class FileColumnStats {
     Object min, max;
     long count, totalSize;
     // ...
   }
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final 
MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the 
column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged 
in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String 
filePath,
+                                            Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> 
columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new 
HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 
0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, 
COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each 
stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical 
timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> 
columnToStats,
+                                          boolean 
consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = 
columnToStats.getOrDefault(field.name(), new HashMap<>());

Review comment:
       Please avoid such `HashMap` allocations, since this is just churning 
objects 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
##########
@@ -69,4 +73,24 @@ public void addLogFiles(String logFile) {
   public List<String> getLogFiles() {
     return logFiles;
   }
+
+  public void setRecordsStats(RecordsStats<? extends Map> stats) {
+    recordsStats = Option.of(stats);
+  }
+
+  public Option<RecordsStats<? extends Map>> getRecordsStats() {
+    return recordsStats;
+  }
+
+  public static class RecordsStats<T> implements Serializable {

Review comment:
       @codope i'm concerned about it as an abstraction that isn't bringing 
much value, while increasing complexity: It adds cognitive load to understand 
what it does for anybody interacting with it.
   
   In general, i'd suggest to follow the principle to _keep things as simple as 
possible, but no simpler than needed to solve the problem_. It helps on many 
fronts:
   
   1. Makes the code easier to comprehend
   2. Makes component evolution easier (the simpler things are, the easier it 
is to evolve them)
   3. Makes component age better: if things change and we need to refactor it 
-- the simpler the system is, the easier the refactoring will be
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -651,6 +641,14 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
     }
   }
 
+  private MetadataRecordsGenerationParams getRecordsGenerationParams() {
+    return new MetadataRecordsGenerationParams(

Review comment:
       BTW, why do we even need this component if we can just get all of this 
from the Writer Config?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final 
MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the 
column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged 
in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String 
filePath,
+                                            Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> 
columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new 
HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 
0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, 
COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each 
stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical 
timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> 
columnToStats,
+                                          boolean 
consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = 
columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) 
record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);

Review comment:
       Why do we need to `parseLong` every time?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -867,41 +889,56 @@ public static HoodieTableFileSystemView 
getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
+  public static HoodieMetadataColumnStats 
mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, 
HoodieMetadataColumnStats newColumnStats) {
+    
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+    if (newColumnStats.getIsDeleted()) {

Review comment:
       We need to handle inverse case as well -- when existing records is a 
deleted one, otherwise we will merge incorrectly

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final 
MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the 
column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged 
in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String 
filePath,
+                                            Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> 
columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new 
HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 
0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, 
COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each 
stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical 
timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> 
columnToStats,
+                                          boolean 
consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = 
columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) 
record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) 
+ fieldSize);
+
+      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+        // set the min value of the field
+        if (!columnStats.containsKey(MIN)) {
+          columnStats.put(MIN, fieldVal);
+        }
+        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {

Review comment:
       We can leverage Parquet's comparators for that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to