vinothchandar commented on a change in pull request #4060:
URL: https://github.com/apache/hudi/pull/4060#discussion_r757243757



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =
+        metadata.getBlocks()
+            .stream()
+            .sequential()
+            .flatMap(blockMetaData ->
+                blockMetaData.getColumns()
+                    .stream()
+                    .filter(f -> cols.contains(f.getPath().toDotString()))
+                    .map(columnChunkMetaData ->
+                        new HoodieColumnRangeMetadata<Comparable>(
+                            parquetFilePath.getName(),
+                            columnChunkMetaData.getPath().toDotString(),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMin()),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMax()),
+                            columnChunkMetaData.getStatistics().getNumNulls(),
+                            
columnChunkMetaData.getPrimitiveType().stringifier()))
+            )
+            
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+    // Combine those into file-level statistics
+    // NOTE: Inlining this var makes javac (1.8) upset (due to its inability 
to infer
+    // expression type correctly)
+    Stream<HoodieColumnRangeMetadata<Comparable>> stream = 
columnToStatsListMap.values()
+        .stream()
+        .map(this::getColumnRangeInFile);
+
+    return stream.collect(Collectors.toList());
   }
 
-  private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final 
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInFile(
+      @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+  ) {
     if (blockRanges.size() == 1) {
       // only one block in parquet file. we can just return that range.
       return blockRanges.get(0);
-    } else {
-      // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
-      return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, 
b2)).get();
     }
+
+    // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+    return blockRanges.stream()
+        .sequential()
+        .reduce(this::combineRanges).get();
   }
 
-  private HoodieColumnRangeMetadata<Comparable> 
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-                                                  
HoodieColumnRangeMetadata<Comparable> range2) {
-    final Comparable minValue;
-    final Comparable maxValue;
-    final String minValueAsString;
-    final String maxValueAsString;
-    if (range1.getMinValue() != null && range2.getMinValue() != null) {
-      if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
-        minValue = range1.getMinValue();
-        minValueAsString = range1.getMinValueAsString();
-      } else {
-        minValue = range2.getMinValue();
-        minValueAsString = range2.getMinValueAsString();
-      }
-    } else if (range1.getMinValue() == null) {
-      minValue = range2.getMinValue();
-      minValueAsString = range2.getMinValueAsString();
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+      HoodieColumnRangeMetadata<T> one,

Review comment:
       This is kind of minor. but please remember to keep larger renames in 
separate PRs. again subjective preferences need a broader agreement. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to