vinothchandar commented on a change in pull request #4060:
URL: https://github.com/apache/hudi/pull/4060#discussion_r757200411



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =
+        metadata.getBlocks()
+            .stream()
+            .sequential()
+            .flatMap(blockMetaData ->
+                blockMetaData.getColumns()
+                    .stream()
+                    .filter(f -> cols.contains(f.getPath().toDotString()))
+                    .map(columnChunkMetaData ->
+                        new HoodieColumnRangeMetadata<Comparable>(
+                            parquetFilePath.getName(),
+                            columnChunkMetaData.getPath().toDotString(),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMin()),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMax()),
+                            columnChunkMetaData.getStatistics().getNumNulls(),
+                            
columnChunkMetaData.getPrimitiveType().stringifier()))
+            )
+            
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+    // Combine those into file-level statistics
+    // NOTE: Inlining this var makes javac (1.8) upset (due to its inability 
to infer
+    // expression type correctly)
+    Stream<HoodieColumnRangeMetadata<Comparable>> stream = 
columnToStatsListMap.values()

Review comment:
       why the variable? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =

Review comment:
       we need to align on this style of formatting at some point. This 
increases lines of code by a lot. with the modern wide monitors, this does not 
make good use of screen width. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =
+        metadata.getBlocks()
+            .stream()
+            .sequential()
+            .flatMap(blockMetaData ->
+                blockMetaData.getColumns()
+                    .stream()
+                    .filter(f -> cols.contains(f.getPath().toDotString()))
+                    .map(columnChunkMetaData ->
+                        new HoodieColumnRangeMetadata<Comparable>(
+                            parquetFilePath.getName(),
+                            columnChunkMetaData.getPath().toDotString(),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMin()),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMax()),
+                            columnChunkMetaData.getStatistics().getNumNulls(),
+                            
columnChunkMetaData.getPrimitiveType().stringifier()))
+            )
+            
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+    // Combine those into file-level statistics
+    // NOTE: Inlining this var makes javac (1.8) upset (due to its inability 
to infer
+    // expression type correctly)
+    Stream<HoodieColumnRangeMetadata<Comparable>> stream = 
columnToStatsListMap.values()
+        .stream()
+        .map(this::getColumnRangeInFile);
+
+    return stream.collect(Collectors.toList());
   }
 
-  private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final 
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInFile(
+      @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+  ) {
     if (blockRanges.size() == 1) {
       // only one block in parquet file. we can just return that range.
       return blockRanges.get(0);
-    } else {
-      // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
-      return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, 
b2)).get();
     }
+
+    // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+    return blockRanges.stream()
+        .sequential()
+        .reduce(this::combineRanges).get();
   }
 
-  private HoodieColumnRangeMetadata<Comparable> 
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-                                                  
HoodieColumnRangeMetadata<Comparable> range2) {
-    final Comparable minValue;
-    final Comparable maxValue;
-    final String minValueAsString;
-    final String maxValueAsString;
-    if (range1.getMinValue() != null && range2.getMinValue() != null) {
-      if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
-        minValue = range1.getMinValue();
-        minValueAsString = range1.getMinValueAsString();
-      } else {
-        minValue = range2.getMinValue();
-        minValueAsString = range2.getMinValueAsString();
-      }
-    } else if (range1.getMinValue() == null) {
-      minValue = range2.getMinValue();
-      minValueAsString = range2.getMinValueAsString();
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+      HoodieColumnRangeMetadata<T> one,
+      HoodieColumnRangeMetadata<T> another
+  ) {
+    final T minValue;
+    final T maxValue;
+    if (one.getMinValue() != null && another.getMinValue() != null) {
+      minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? 
one.getMinValue() : another.getMinValue();
+    } else if (one.getMinValue() == null) {
+      minValue = another.getMinValue();
     } else {
-      minValue = range1.getMinValue();
-      minValueAsString = range1.getMinValueAsString();
+      minValue = one.getMinValue();
     }
 
-    if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
-      if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) {
-        maxValue = range2.getMaxValue();
-        maxValueAsString = range2.getMaxValueAsString();
-      } else {
-        maxValue = range1.getMaxValue();
-        maxValueAsString = range1.getMaxValueAsString();
-      }
-    } else if (range1.getMaxValue() == null) {
-      maxValue = range2.getMaxValue();
-      maxValueAsString = range2.getMaxValueAsString();
+    if (one.getMaxValue() != null && another.getMaxValue() != null) {
+      maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? 
another.getMaxValue() : one.getMaxValue();
+    } else if (one.getMaxValue() == null) {
+      maxValue = another.getMaxValue();
     } else  {
-      maxValue = range1.getMaxValue();
-      maxValueAsString = range1.getMaxValueAsString();
+      maxValue = one.getMaxValue();
+    }
+
+    return new HoodieColumnRangeMetadata<T>(
+        one.getFilePath(),
+        one.getColumnName(), minValue, maxValue, one.getNumNulls() + 
another.getNumNulls(), one.getStringifier());
+  }
+
+  private static Comparable<?> convertToNativeJavaType(PrimitiveType 
primitiveType, Comparable val) {
+    if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
+      DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata();
+      return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale());
+    } else if (primitiveType.getOriginalType() == OriginalType.DATE) {
+      // NOTE: This is a workaround to address race-condition in using
+      //       {@code SimpleDataFormat} concurrently (w/in {@code 
DateStringifier})
+      // TODO cleanup after Parquet upgrade to 1.12

Review comment:
       file a follow up JIRA?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
##########
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.zorder;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.optimize.ZOrderingUtil;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.io.api.Binary;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Row$;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
+import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.collection.JavaConversions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.util.DataTypeUtils.areCompatible;
+
+public class ZOrderingIndexHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(ZOrderingIndexHelper.class);
+
+  private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";

Review comment:
       this got moved and changed in the same commit. which means, its very 
hard for the reviewer to understand the delta changes?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =
+        metadata.getBlocks()
+            .stream()
+            .sequential()
+            .flatMap(blockMetaData ->
+                blockMetaData.getColumns()
+                    .stream()
+                    .filter(f -> cols.contains(f.getPath().toDotString()))
+                    .map(columnChunkMetaData ->
+                        new HoodieColumnRangeMetadata<Comparable>(
+                            parquetFilePath.getName(),
+                            columnChunkMetaData.getPath().toDotString(),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMin()),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMax()),
+                            columnChunkMetaData.getStatistics().getNumNulls(),
+                            
columnChunkMetaData.getPrimitiveType().stringifier()))
+            )
+            
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+    // Combine those into file-level statistics
+    // NOTE: Inlining this var makes javac (1.8) upset (due to its inability 
to infer
+    // expression type correctly)
+    Stream<HoodieColumnRangeMetadata<Comparable>> stream = 
columnToStatsListMap.values()
+        .stream()
+        .map(this::getColumnRangeInFile);
+
+    return stream.collect(Collectors.toList());
   }
 
-  private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final 
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInFile(
+      @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+  ) {
     if (blockRanges.size() == 1) {
       // only one block in parquet file. we can just return that range.
       return blockRanges.get(0);
-    } else {
-      // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
-      return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, 
b2)).get();
     }
+
+    // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+    return blockRanges.stream()

Review comment:
       another example. this line is probably better left alone.  adds more 
review overhead. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -163,29 +169,61 @@ public HoodieWriteMetadata 
insertOverwrite(HoodieEngineContext context, String i
   }
 
   @Override
-  public void updateStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
-    // deal with z-order/hilbert statistic info
-    if (isOptimizeOperation) {
-      updateOptimizeOperationStatistics(context, stats, instantTime);
-    }
+  public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, 
@Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) throws 
Exception {
+    // Updates Z-ordering Index
+    updateZIndex(context, stats, instantTime);
   }
 
-  private void updateOptimizeOperationStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime) {
-    String cols = config.getClusteringSortColumns();
+  private void updateZIndex(
+      @Nonnull HoodieEngineContext context,
+      @Nonnull List<HoodieWriteStat> updatedFilesStats,
+      @Nonnull String instantTime
+  ) throws Exception {
+    String sortColsList = config.getClusteringSortColumns();
     String basePath = metaClient.getBasePath();
     String indexPath = metaClient.getZindexPath();
-    List<String> validateCommits = metaClient.getCommitsTimeline()
-        .filterCompletedInstants().getInstants().map(f -> 
f.getTimestamp()).collect(Collectors.toList());
-    List<String> touchFiles = stats.stream().map(s -> new Path(basePath, 
s.getPath()).toString()).collect(Collectors.toList());
-    if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
-      LOG.warn("save nothing to index table");
+
+    List<String> completedCommits =
+        metaClient.getCommitsTimeline()
+            .filterCompletedInstants()
+            .getInstants()
+            .map(HoodieInstant::getTimestamp)
+            .collect(Collectors.toList());
+
+    List<String> touchedFiles =
+        updatedFilesStats.stream()
+            .map(s -> new Path(basePath, s.getPath()).toString())
+            .collect(Collectors.toList());
+
+    if (touchedFiles.isEmpty() || StringUtils.isNullOrEmpty(sortColsList) || 
StringUtils.isNullOrEmpty(indexPath)) {
       return;
     }
+
+    LOG.info(String.format("Updating Z-index table (%s)", indexPath));
+
+    List<String> sortCols = Arrays.stream(sortColsList.split(","))
+        .map(String::trim)
+        .collect(Collectors.toList());
+
     HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext)context;
-    ZCurveOptimizeHelper.saveStatisticsInfo(sparkEngineContext
-        
.getSqlContext().sparkSession().read().load(JavaConversions.asScalaBuffer(touchFiles)),
-        cols, indexPath, instantTime, validateCommits);
-    LOG.info(String.format("save statistic info sucessfully at commitTime: 
%s", instantTime));
+
+    // Fetch table schema to appropriately construct Z-index schema
+    Schema tableWriteSchema =
+        HoodieAvroUtils.createHoodieWriteSchema(
+            new 
TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields()

Review comment:
       indexing metadata fields is also useful actually. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -163,29 +169,61 @@ public HoodieWriteMetadata 
insertOverwrite(HoodieEngineContext context, String i
   }
 
   @Override
-  public void updateStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
-    // deal with z-order/hilbert statistic info
-    if (isOptimizeOperation) {
-      updateOptimizeOperationStatistics(context, stats, instantTime);
-    }
+  public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, 
@Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) throws 
Exception {
+    // Updates Z-ordering Index
+    updateZIndex(context, stats, instantTime);
   }
 
-  private void updateOptimizeOperationStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime) {
-    String cols = config.getClusteringSortColumns();
+  private void updateZIndex(

Review comment:
       Wondering if this naming should be tied to zindex. Probably good to 
actually make even the `/zindex` index path name more generic, given we have 
hilbert curves

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =

Review comment:
       I'd prefer if we kept the formatting same especially changing existing 
code. it just make review so much harder.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
 
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
-   * ParquetRead.readFooter is not a thread safe method.
-   *
-   * @param conf hadoop conf.
-   * @param parquetFilePath file to be read.
-   * @param cols cols which need to collect statistics.
-   * @return a HoodieColumnRangeMetadata instance.
    */
-  public Collection<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
-      Configuration conf,
-      Path parquetFilePath,
-      List<String> cols) {
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readRangeFromParquetMetadata(
+      @Nonnull Configuration conf,
+      @Nonnull Path parquetFilePath,
+      @Nonnull List<String> cols
+  ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-    // collect stats from all parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
-      return blockMetaData.getColumns().stream().filter(f -> 
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
-        String minAsString;
-        String maxAsString;
-        if (columnChunkMetaData.getPrimitiveType().getOriginalType() == 
OriginalType.DATE) {
-          synchronized (lock) {
-            minAsString = columnChunkMetaData.getStatistics().minAsString();
-            maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-          }
-        } else {
-          minAsString = columnChunkMetaData.getStatistics().minAsString();
-          maxAsString = columnChunkMetaData.getStatistics().maxAsString();
-        }
-        return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), 
columnChunkMetaData.getPath().toDotString(),
-            columnChunkMetaData.getStatistics().genericGetMin(),
-            columnChunkMetaData.getStatistics().genericGetMax(),
-            columnChunkMetaData.getStatistics().getNumNulls(),
-            minAsString, maxAsString);
-      });
-    }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
-    // we only intend to keep file level statistics.
-    return new ArrayList<>(columnToStatsListMap.values().stream()
-        .map(blocks -> getColumnRangeInFile(blocks))
-        .collect(Collectors.toList()));
+    // Collect stats from all individual Parquet blocks
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnToStatsListMap =
+        metadata.getBlocks()
+            .stream()
+            .sequential()
+            .flatMap(blockMetaData ->
+                blockMetaData.getColumns()
+                    .stream()
+                    .filter(f -> cols.contains(f.getPath().toDotString()))
+                    .map(columnChunkMetaData ->
+                        new HoodieColumnRangeMetadata<Comparable>(
+                            parquetFilePath.getName(),
+                            columnChunkMetaData.getPath().toDotString(),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMin()),
+                            convertToNativeJavaType(
+                                columnChunkMetaData.getPrimitiveType(),
+                                
columnChunkMetaData.getStatistics().genericGetMax()),
+                            columnChunkMetaData.getStatistics().getNumNulls(),
+                            
columnChunkMetaData.getPrimitiveType().stringifier()))
+            )
+            
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+    // Combine those into file-level statistics
+    // NOTE: Inlining this var makes javac (1.8) upset (due to its inability 
to infer
+    // expression type correctly)
+    Stream<HoodieColumnRangeMetadata<Comparable>> stream = 
columnToStatsListMap.values()
+        .stream()
+        .map(this::getColumnRangeInFile);
+
+    return stream.collect(Collectors.toList());
   }
 
-  private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final 
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInFile(
+      @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+  ) {
     if (blockRanges.size() == 1) {
       // only one block in parquet file. we can just return that range.
       return blockRanges.get(0);
-    } else {
-      // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
-      return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, 
b2)).get();
     }
+
+    // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+    return blockRanges.stream()
+        .sequential()
+        .reduce(this::combineRanges).get();
   }
 
-  private HoodieColumnRangeMetadata<Comparable> 
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-                                                  
HoodieColumnRangeMetadata<Comparable> range2) {
-    final Comparable minValue;
-    final Comparable maxValue;
-    final String minValueAsString;
-    final String maxValueAsString;
-    if (range1.getMinValue() != null && range2.getMinValue() != null) {
-      if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
-        minValue = range1.getMinValue();
-        minValueAsString = range1.getMinValueAsString();
-      } else {
-        minValue = range2.getMinValue();
-        minValueAsString = range2.getMinValueAsString();
-      }
-    } else if (range1.getMinValue() == null) {
-      minValue = range2.getMinValue();
-      minValueAsString = range2.getMinValueAsString();
+  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+      HoodieColumnRangeMetadata<T> one,

Review comment:
       why the renaming?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -389,15 +389,16 @@ private void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
     writeTableMetadata(table, metadata, new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
     finalizeWrite(table, clusteringCommitTime, writeStats);
     try {
-      // try to save statistics info to hudi
-      if (config.isDataSkippingEnabled() && 
config.isLayoutOptimizationEnabled() && 
!config.getClusteringSortColumns().isEmpty()) {
-        table.updateStatistics(context, writeStats, clusteringCommitTime, 
true);
+      // Update outstanding metadata indexes
+      if (config.isLayoutOptimizationEnabled()

Review comment:
       For now, I think this is okay to be decoupled? I see what you are saying 
though. I will try adding a new config to just create data skipping indexes 
decoupled from space curves or linear sorting (time is the only issue)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to