alexeykudinkin commented on a change in pull request #4060:
URL: https://github.com/apache/hudi/pull/4060#discussion_r754542316



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -284,55 +288,102 @@ public Boolean apply(String recordKey) {
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath, 
List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData ->
-          new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-              columnChunkMetaData.getStatistics().genericGetMin(),
-              columnChunkMetaData.getStatistics().genericGetMax(),
-              columnChunkMetaData.getStatistics().getNumNulls(),
-              columnChunkMetaData.getPrimitiveType().stringifier()));
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =
+        metadata.getBlocks()
+            .stream()
+            .sequential()
+            .flatMap(blockMetaData ->
+                blockMetaData.getColumns()
+                    .stream()
+                    .filter(f -> cols.contains(f.getPath().toDotString()))
+                    .map(columnChunkMetaData ->
+                        new HoodieColumnRangeMetadata<Comparable>(
+                            parquetFilePath.getName(),
+                            columnChunkMetaData.getPath().toDotString(),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMin()),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMax()),
+                            columnChunkMetaData.getStatistics().getNumNulls(),
+                            
columnChunkMetaData.getPrimitiveType().stringifier()))
+            )
+            
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+    // Combine those into file-level statistics
+    // NOTE: Inlining this var makes javac (1.8) upset (due to its inability 
to infer
+    // expression type correctly)
+    Stream<HoodieColumnRangeMetadata<Comparable>> stream = 
columnToStatsListMap.values()
+        .stream()
+        .map(this::getColumnRangeInFile);
+
+    return stream.collect(Collectors.toList());
   }
 
-  private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final 
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInFile(
+      @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+  ) {
     if (blockRanges.size() == 1) {
       // only one block in parquet file. we can just return that range.
       return blockRanges.get(0);
-    } else {
-      // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
-      return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, 
b2)).get();
     }
+
+    // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+    return blockRanges.stream()
+        .sequential()
+        .reduce(this::combineRanges).get();
   }
 
-  private HoodieColumnRangeMetadata<Comparable> 
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-                                                  
HoodieColumnRangeMetadata<Comparable> range2) {
-    final Comparable minValue;
-    final Comparable maxValue;
-    if (range1.getMinValue() != null && range2.getMinValue() != null) {
-      minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? 
range1.getMinValue() : range2.getMinValue();
-    } else if (range1.getMinValue() == null) {
-      minValue = range2.getMinValue();
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+      HoodieColumnRangeMetadata<T> one,
+      HoodieColumnRangeMetadata<T> another
+  ) {
+    final T minValue;
+    final T maxValue;
+    if (one.getMinValue() != null && another.getMinValue() != null) {
+      minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? 
one.getMinValue() : another.getMinValue();
+    } else if (one.getMinValue() == null) {
+      minValue = another.getMinValue();
     } else {
-      minValue = range1.getMinValue();
+      minValue = one.getMinValue();
     }
 
-    if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
-      maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? 
range2.getMaxValue() : range1.getMaxValue();
-    } else if (range1.getMaxValue() == null) {
-      maxValue = range2.getMaxValue();
+    if (one.getMaxValue() != null && another.getMaxValue() != null) {
+      maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? 
another.getMaxValue() : one.getMaxValue();
+    } else if (one.getMaxValue() == null) {
+      maxValue = another.getMaxValue();
     } else  {
-      maxValue = range1.getMaxValue();
+      maxValue = one.getMaxValue();
+    }
+
+    return new HoodieColumnRangeMetadata<T>(
+        one.getFilePath(),
+        one.getColumnName(), minValue, maxValue, one.getNumNulls() + 
another.getNumNulls(), one.getStringifier());
+  }
+
+  private static Comparable<?> convertToNativeJavaType(PrimitiveType 
primitiveType, Comparable val) {
+    if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {

Review comment:
       Please check the comment below  -- in current version of the Parquet 
Hudi is using concurrent access to `minAsString` leads to non-deterministic 
results (b/c it uses `SimpleDataFormat` internally which is not thread-safe.
   
   Until we update Parquet to the version bearing the fix we will have to 
synchronize externally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to