yihua commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1569988592


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -651,6 +652,36 @@ public static Stream<HoodieRecord> 
createColumnStatsRecords(String partitionName
     });
   }
 
+  public static Stream<HoodieRecord> createPartitionStatsRecords(String 
partitionPath,
+                                                                 
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                                                 boolean 
isDeleted) {
+    return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
+      HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath, 
columnRangeMetadata),
+          MetadataPartitionType.PARTITION_STATS.getPartitionPath());
+
+      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(),
+          HoodieMetadataColumnStats.newBuilder()
+              .setFileName(null)
+              .setColumnName(columnRangeMetadata.getColumnName())
+              
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
+              
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
+              .setNullCount(columnRangeMetadata.getNullCount())
+              .setValueCount(columnRangeMetadata.getValueCount())
+              .setTotalSize(columnRangeMetadata.getTotalSize())
+              
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
+              .setIsDeleted(isDeleted)
+              .build());
+
+      return new HoodieAvroRecord<>(key, payload);
+    });
+  }
+
+  public static String getPartitionStatsIndexKey(String partitionPath, 
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {

Review Comment:
   nit: have the column name as the method argument directly to avoid confusion?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -351,6 +356,12 @@ case class HoodieFileIndex(spark: SparkSession,
       Option.empty
     } else if (recordKeys.nonEmpty) {
       Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
recordKeys))
+    } else if (partitionStatsIndex.isIndexAvailable && queryFilters.nonEmpty) {
+      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+      val shouldReadInMemory = partitionStatsIndex.shouldReadInMemory(this, 
queryReferencedColumns)
+      partitionStatsIndex.loadTransposed(queryReferencedColumns, 
shouldReadInMemory) { transposedColStatsDF =>

Review Comment:
   Could we avoid transposing here as it has overhead and directly leverage the 
index entry for filtering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to