lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1908718391
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -121,37 +157,53 @@ public static HoodieData<HoodieRecord>
getExpressionIndexRecordsUsingColumnStats
functions.max(columnToIndex).alias("maxValue"),
functions.count(columnToIndex).alias("valueCount"));
// Generate column stat records using the aggregated data
- return
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row,
Iterator<HoodieRecord>>)
- row -> {
- int baseAggregatePosition =
SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
- long nullCount = row.getLong(baseAggregatePosition);
- Comparable minValue = (Comparable) row.get(baseAggregatePosition +
1);
- Comparable maxValue = (Comparable) row.get(baseAggregatePosition +
2);
- long valueCount = row.getLong(baseAggregatePosition + 3);
-
- String partitionName = row.getString(0);
- String relativeFilePath = row.getString(1);
- long totalFileSize = row.getLong(2);
- // Total uncompressed size is harder to get directly. This is just
an approximation to maintain the order.
- long totalUncompressedSize = totalFileSize * 2;
-
- HoodieColumnRangeMetadata<Comparable> rangeMetadata =
HoodieColumnRangeMetadata.create(
- relativeFilePath,
- columnToIndex,
- minValue,
- maxValue,
- nullCount,
- valueCount,
- totalFileSize,
- totalUncompressedSize
- );
- return createColumnStatsRecords(partitionName,
Collections.singletonList(rangeMetadata), false, expressionIndex.getIndexName(),
-
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();
- });
+ HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>
rangeMetadataHoodieJavaRDD =
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD())
+ .flatMapToPair((SerializableFunction<Row, Iterator<? extends
Pair<String, HoodieColumnRangeMetadata<Comparable>>>>)
+ row -> {
+ int baseAggregatePosition =
SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
+ long nullCount = row.getLong(baseAggregatePosition);
+ Comparable minValue = (Comparable) row.get(baseAggregatePosition
+ 1);
+ Comparable maxValue = (Comparable) row.get(baseAggregatePosition
+ 2);
+ long valueCount = row.getLong(baseAggregatePosition + 3);
+
+ String partitionName = row.getString(0);
+ String relativeFilePath = row.getString(1);
+ long totalFileSize = row.getLong(2);
+ // Total uncompressed size is harder to get directly. This is
just an approximation to maintain the order.
+ long totalUncompressedSize = totalFileSize * 2;
+
+ HoodieColumnRangeMetadata<Comparable> rangeMetadata =
HoodieColumnRangeMetadata.create(
+ relativeFilePath,
+ columnToIndex,
+ minValue,
+ maxValue,
+ nullCount,
+ valueCount,
+ totalFileSize,
+ totalUncompressedSize
+ );
+ return Collections.singletonList(Pair.of(partitionName,
rangeMetadata)).iterator();
+ });
+
+ if (partitionRecordsFunctionOpt.isPresent()) {
+ rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]