alexeykudinkin commented on a change in pull request #4060:
URL: https://github.com/apache/hudi/pull/4060#discussion_r757232925
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
/**
* Parse min/max statistics stored in parquet footers for all columns.
- * ParquetRead.readFooter is not a thread safe method.
- *
- * @param conf hadoop conf.
- * @param parquetFilePath file to be read.
- * @param cols cols which need to collect statistics.
- * @return a HoodieColumnRangeMetadata instance.
*/
- public Collection<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
- Configuration conf,
- Path parquetFilePath,
- List<String> cols) {
+ public List<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
+ @Nonnull Configuration conf,
+ @Nonnull Path parquetFilePath,
+ @Nonnull List<String> cols
+ ) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
- // collect stats from all parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
- return blockMetaData.getColumns().stream().filter(f ->
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
- String minAsString;
- String maxAsString;
- if (columnChunkMetaData.getPrimitiveType().getOriginalType() ==
OriginalType.DATE) {
- synchronized (lock) {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- } else {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
- columnChunkMetaData.getStatistics().genericGetMin(),
- columnChunkMetaData.getStatistics().genericGetMax(),
- columnChunkMetaData.getStatistics().getNumNulls(),
- minAsString, maxAsString);
- });
- }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
- // we only intend to keep file level statistics.
- return new ArrayList<>(columnToStatsListMap.values().stream()
- .map(blocks -> getColumnRangeInFile(blocks))
- .collect(Collectors.toList()));
+ // Collect stats from all individual Parquet blocks
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
+ metadata.getBlocks()
+ .stream()
+ .sequential()
+ .flatMap(blockMetaData ->
+ blockMetaData.getColumns()
+ .stream()
+ .filter(f -> cols.contains(f.getPath().toDotString()))
+ .map(columnChunkMetaData ->
+ new HoodieColumnRangeMetadata<Comparable>(
+ parquetFilePath.getName(),
+ columnChunkMetaData.getPath().toDotString(),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMin()),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMax()),
+ columnChunkMetaData.getStatistics().getNumNulls(),
+
columnChunkMetaData.getPrimitiveType().stringifier()))
+ )
+
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+ // Combine those into file-level statistics
+ // NOTE: Inlining this var makes javac (1.8) upset (due to its inability
to infer
+ // expression type correctly)
+ Stream<HoodieColumnRangeMetadata<Comparable>> stream =
columnToStatsListMap.values()
Review comment:
Tried to clarify it with a comment: some weird issues w/ javac not being
able to deduce types appropriately, resuling into compilation failures if you
inline -- so either had to cast or do a var
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
/**
* Parse min/max statistics stored in parquet footers for all columns.
- * ParquetRead.readFooter is not a thread safe method.
- *
- * @param conf hadoop conf.
- * @param parquetFilePath file to be read.
- * @param cols cols which need to collect statistics.
- * @return a HoodieColumnRangeMetadata instance.
*/
- public Collection<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
- Configuration conf,
- Path parquetFilePath,
- List<String> cols) {
+ public List<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
+ @Nonnull Configuration conf,
+ @Nonnull Path parquetFilePath,
+ @Nonnull List<String> cols
+ ) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
- // collect stats from all parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
- return blockMetaData.getColumns().stream().filter(f ->
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
- String minAsString;
- String maxAsString;
- if (columnChunkMetaData.getPrimitiveType().getOriginalType() ==
OriginalType.DATE) {
- synchronized (lock) {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- } else {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
- columnChunkMetaData.getStatistics().genericGetMin(),
- columnChunkMetaData.getStatistics().genericGetMax(),
- columnChunkMetaData.getStatistics().getNumNulls(),
- minAsString, maxAsString);
- });
- }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
- // we only intend to keep file level statistics.
- return new ArrayList<>(columnToStatsListMap.values().stream()
- .map(blocks -> getColumnRangeInFile(blocks))
- .collect(Collectors.toList()));
+ // Collect stats from all individual Parquet blocks
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
Review comment:
Agreed, that it bloats diffs unnecessarily, but i actually find it hard
to read when it's just treaded into a single line -- much harder to understand
the nesting and scoping of things. With stacking it's crystal clear what scope
it is, and where it belongs. We also need to keep in mind that not everyone is
using wide monitors (i myself do use laptops)
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
/**
* Parse min/max statistics stored in parquet footers for all columns.
- * ParquetRead.readFooter is not a thread safe method.
- *
- * @param conf hadoop conf.
- * @param parquetFilePath file to be read.
- * @param cols cols which need to collect statistics.
- * @return a HoodieColumnRangeMetadata instance.
*/
- public Collection<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
- Configuration conf,
- Path parquetFilePath,
- List<String> cols) {
+ public List<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
+ @Nonnull Configuration conf,
+ @Nonnull Path parquetFilePath,
+ @Nonnull List<String> cols
+ ) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
- // collect stats from all parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
- return blockMetaData.getColumns().stream().filter(f ->
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
- String minAsString;
- String maxAsString;
- if (columnChunkMetaData.getPrimitiveType().getOriginalType() ==
OriginalType.DATE) {
- synchronized (lock) {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- } else {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
- columnChunkMetaData.getStatistics().genericGetMin(),
- columnChunkMetaData.getStatistics().genericGetMax(),
- columnChunkMetaData.getStatistics().getNumNulls(),
- minAsString, maxAsString);
- });
- }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
- // we only intend to keep file level statistics.
- return new ArrayList<>(columnToStatsListMap.values().stream()
- .map(blocks -> getColumnRangeInFile(blocks))
- .collect(Collectors.toList()));
+ // Collect stats from all individual Parquet blocks
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
+ metadata.getBlocks()
+ .stream()
+ .sequential()
+ .flatMap(blockMetaData ->
+ blockMetaData.getColumns()
+ .stream()
+ .filter(f -> cols.contains(f.getPath().toDotString()))
+ .map(columnChunkMetaData ->
+ new HoodieColumnRangeMetadata<Comparable>(
+ parquetFilePath.getName(),
+ columnChunkMetaData.getPath().toDotString(),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMin()),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMax()),
+ columnChunkMetaData.getStatistics().getNumNulls(),
+
columnChunkMetaData.getPrimitiveType().stringifier()))
+ )
+
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+ // Combine those into file-level statistics
+ // NOTE: Inlining this var makes javac (1.8) upset (due to its inability
to infer
+ // expression type correctly)
+ Stream<HoodieColumnRangeMetadata<Comparable>> stream =
columnToStatsListMap.values()
+ .stream()
+ .map(this::getColumnRangeInFile);
+
+ return stream.collect(Collectors.toList());
}
- private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInFile(
+ @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+ ) {
if (blockRanges.size() == 1) {
// only one block in parquet file. we can just return that range.
return blockRanges.get(0);
- } else {
- // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
- return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1,
b2)).get();
}
+
+ // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+ return blockRanges.stream()
+ .sequential()
+ .reduce(this::combineRanges).get();
}
- private HoodieColumnRangeMetadata<Comparable>
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-
HoodieColumnRangeMetadata<Comparable> range2) {
- final Comparable minValue;
- final Comparable maxValue;
- final String minValueAsString;
- final String maxValueAsString;
- if (range1.getMinValue() != null && range2.getMinValue() != null) {
- if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
- minValue = range1.getMinValue();
- minValueAsString = range1.getMinValueAsString();
- } else {
- minValue = range2.getMinValue();
- minValueAsString = range2.getMinValueAsString();
- }
- } else if (range1.getMinValue() == null) {
- minValue = range2.getMinValue();
- minValueAsString = range2.getMinValueAsString();
+ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+ HoodieColumnRangeMetadata<T> one,
+ HoodieColumnRangeMetadata<T> another
+ ) {
+ final T minValue;
+ final T maxValue;
+ if (one.getMinValue() != null && another.getMinValue() != null) {
+ minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ?
one.getMinValue() : another.getMinValue();
+ } else if (one.getMinValue() == null) {
+ minValue = another.getMinValue();
} else {
- minValue = range1.getMinValue();
- minValueAsString = range1.getMinValueAsString();
+ minValue = one.getMinValue();
}
- if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
- if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) {
- maxValue = range2.getMaxValue();
- maxValueAsString = range2.getMaxValueAsString();
- } else {
- maxValue = range1.getMaxValue();
- maxValueAsString = range1.getMaxValueAsString();
- }
- } else if (range1.getMaxValue() == null) {
- maxValue = range2.getMaxValue();
- maxValueAsString = range2.getMaxValueAsString();
+ if (one.getMaxValue() != null && another.getMaxValue() != null) {
+ maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ?
another.getMaxValue() : one.getMaxValue();
+ } else if (one.getMaxValue() == null) {
+ maxValue = another.getMaxValue();
} else {
- maxValue = range1.getMaxValue();
- maxValueAsString = range1.getMaxValueAsString();
+ maxValue = one.getMaxValue();
+ }
+
+ return new HoodieColumnRangeMetadata<T>(
+ one.getFilePath(),
+ one.getColumnName(), minValue, maxValue, one.getNumNulls() +
another.getNumNulls(), one.getStringifier());
+ }
+
+ private static Comparable<?> convertToNativeJavaType(PrimitiveType
primitiveType, Comparable val) {
+ if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
+ DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata();
+ return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale());
+ } else if (primitiveType.getOriginalType() == OriginalType.DATE) {
+ // NOTE: This is a workaround to address race-condition in using
+ // {@code SimpleDataFormat} concurrently (w/in {@code
DateStringifier})
+ // TODO cleanup after Parquet upgrade to 1.12
Review comment:
[HUDI-2812](https://issues.apache.org/jira/browse/HUDI-2812)
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -286,95 +287,103 @@ public Boolean apply(String recordKey) {
/**
* Parse min/max statistics stored in parquet footers for all columns.
- * ParquetRead.readFooter is not a thread safe method.
- *
- * @param conf hadoop conf.
- * @param parquetFilePath file to be read.
- * @param cols cols which need to collect statistics.
- * @return a HoodieColumnRangeMetadata instance.
*/
- public Collection<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
- Configuration conf,
- Path parquetFilePath,
- List<String> cols) {
+ public List<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
+ @Nonnull Configuration conf,
+ @Nonnull Path parquetFilePath,
+ @Nonnull List<String> cols
+ ) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
- // collect stats from all parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
- return blockMetaData.getColumns().stream().filter(f ->
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
- String minAsString;
- String maxAsString;
- if (columnChunkMetaData.getPrimitiveType().getOriginalType() ==
OriginalType.DATE) {
- synchronized (lock) {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- } else {
- minAsString = columnChunkMetaData.getStatistics().minAsString();
- maxAsString = columnChunkMetaData.getStatistics().maxAsString();
- }
- return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
- columnChunkMetaData.getStatistics().genericGetMin(),
- columnChunkMetaData.getStatistics().genericGetMax(),
- columnChunkMetaData.getStatistics().getNumNulls(),
- minAsString, maxAsString);
- });
- }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
- // we only intend to keep file level statistics.
- return new ArrayList<>(columnToStatsListMap.values().stream()
- .map(blocks -> getColumnRangeInFile(blocks))
- .collect(Collectors.toList()));
+ // Collect stats from all individual Parquet blocks
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
+ metadata.getBlocks()
+ .stream()
+ .sequential()
+ .flatMap(blockMetaData ->
+ blockMetaData.getColumns()
+ .stream()
+ .filter(f -> cols.contains(f.getPath().toDotString()))
+ .map(columnChunkMetaData ->
+ new HoodieColumnRangeMetadata<Comparable>(
+ parquetFilePath.getName(),
+ columnChunkMetaData.getPath().toDotString(),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMin()),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMax()),
+ columnChunkMetaData.getStatistics().getNumNulls(),
+
columnChunkMetaData.getPrimitiveType().stringifier()))
+ )
+
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+ // Combine those into file-level statistics
+ // NOTE: Inlining this var makes javac (1.8) upset (due to its inability
to infer
+ // expression type correctly)
+ Stream<HoodieColumnRangeMetadata<Comparable>> stream =
columnToStatsListMap.values()
+ .stream()
+ .map(this::getColumnRangeInFile);
+
+ return stream.collect(Collectors.toList());
}
- private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInFile(
+ @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+ ) {
if (blockRanges.size() == 1) {
// only one block in parquet file. we can just return that range.
return blockRanges.get(0);
- } else {
- // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
- return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1,
b2)).get();
}
+
+ // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+ return blockRanges.stream()
+ .sequential()
+ .reduce(this::combineRanges).get();
}
- private HoodieColumnRangeMetadata<Comparable>
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-
HoodieColumnRangeMetadata<Comparable> range2) {
- final Comparable minValue;
- final Comparable maxValue;
- final String minValueAsString;
- final String maxValueAsString;
- if (range1.getMinValue() != null && range2.getMinValue() != null) {
- if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
- minValue = range1.getMinValue();
- minValueAsString = range1.getMinValueAsString();
- } else {
- minValue = range2.getMinValue();
- minValueAsString = range2.getMinValueAsString();
- }
- } else if (range1.getMinValue() == null) {
- minValue = range2.getMinValue();
- minValueAsString = range2.getMinValueAsString();
+ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+ HoodieColumnRangeMetadata<T> one,
Review comment:
Have strong preferences regarding numerical suffixes in the variable
names
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]