lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1908718391


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -121,37 +157,53 @@ public static HoodieData<HoodieRecord> 
getExpressionIndexRecordsUsingColumnStats
             functions.max(columnToIndex).alias("maxValue"),
             functions.count(columnToIndex).alias("valueCount"));
     // Generate column stat records using the aggregated data
-    return 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row,
 Iterator<HoodieRecord>>)
-        row -> {
-          int baseAggregatePosition = 
SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
-          long nullCount = row.getLong(baseAggregatePosition);
-          Comparable minValue = (Comparable) row.get(baseAggregatePosition + 
1);
-          Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 
2);
-          long valueCount = row.getLong(baseAggregatePosition + 3);
-
-          String partitionName = row.getString(0);
-          String relativeFilePath = row.getString(1);
-          long totalFileSize = row.getLong(2);
-          // Total uncompressed size is harder to get directly. This is just 
an approximation to maintain the order.
-          long totalUncompressedSize = totalFileSize * 2;
-
-          HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
-              relativeFilePath,
-              columnToIndex,
-              minValue,
-              maxValue,
-              nullCount,
-              valueCount,
-              totalFileSize,
-              totalUncompressedSize
-          );
-          return createColumnStatsRecords(partitionName, 
Collections.singletonList(rangeMetadata), false, expressionIndex.getIndexName(),
-              
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();
-        });
+    HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
rangeMetadataHoodieJavaRDD = 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD())
+        .flatMapToPair((SerializableFunction<Row, Iterator<? extends 
Pair<String, HoodieColumnRangeMetadata<Comparable>>>>)
+            row -> {
+              int baseAggregatePosition = 
SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
+              long nullCount = row.getLong(baseAggregatePosition);
+              Comparable minValue = (Comparable) row.get(baseAggregatePosition 
+ 1);
+              Comparable maxValue = (Comparable) row.get(baseAggregatePosition 
+ 2);
+              long valueCount = row.getLong(baseAggregatePosition + 3);
+
+              String partitionName = row.getString(0);
+              String relativeFilePath = row.getString(1);
+              long totalFileSize = row.getLong(2);
+              // Total uncompressed size is harder to get directly. This is 
just an approximation to maintain the order.
+              long totalUncompressedSize = totalFileSize * 2;
+
+              HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
+                  relativeFilePath,
+                  columnToIndex,
+                  minValue,
+                  maxValue,
+                  nullCount,
+                  valueCount,
+                  totalFileSize,
+                  totalUncompressedSize
+              );
+              return Collections.singletonList(Pair.of(partitionName, 
rangeMetadata)).iterator();
+            });
+
+    if (partitionRecordsFunctionOpt.isPresent()) {
+      rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to