vinothchandar commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1498494219


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1915,4 +1924,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + StoragePath.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+    // Find the columns to index
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+    final List<String> columnsToIndex = getColumnsToIndex(
+        recordsGenerationParams,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+    if (columnsToIndex.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    LOG.debug(String.format("Indexing %d columns for partition stats index", 
columnsToIndex.size()));
+    // Create records for MDT
+    int parallelism = Math.max(Math.min(partitionInfoList.size(), 
recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionFiles -> {
+      final String partitionName = partitionFiles.getRelativePath();
+      Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = 
partitionFiles.getFileNameToSizeMap().keySet().stream()
+          .map(fileName -> getFileStatsRangeMetadata(partitionName, 
partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
+          .map(BaseFileUtils::getColumnRangeInPartition);

Review Comment:
   IIUC are n't you passing all column ranges for a given file into this? Would 
n't that merge ranges across different columns within a given file?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -67,6 +70,50 @@ public static BaseFileUtils getInstance(HoodieFileFormat 
fileFormat) {
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
 
+  /**
+   * Aggregate column range statistics across files in a partition.
+   *
+   * @param fileRanges List of column range statistics for each file in a 
partition
+   */
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> 
fileRanges) {
+    if (fileRanges.size() == 1) {
+      // Only one parquet file, we can just return that range.
+      return fileRanges.get(0);
+    }
+    // There are multiple files. Compute min(file_mins) and max(file_maxs)
+    return fileRanges.stream()
+        .sequential()
+        .reduce(BaseFileUtils::mergeRanges).get();

Review Comment:
   we should be merging across files on a given column



##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -67,6 +70,50 @@ public static BaseFileUtils getInstance(HoodieFileFormat 
fileFormat) {
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
 
+  /**
+   * Aggregate column range statistics across files in a partition.
+   *
+   * @param fileRanges List of column range statistics for each file in a 
partition
+   */
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> 
fileRanges) {
+    if (fileRanges.size() == 1) {

Review Comment:
   I think this is column range, and not multiple files? i.e list has an entry 
for each column for a single file. no?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1915,4 +1924,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + StoragePath.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+    // Find the columns to index
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+    final List<String> columnsToIndex = getColumnsToIndex(
+        recordsGenerationParams,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+    if (columnsToIndex.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    LOG.debug(String.format("Indexing %d columns for partition stats index", 
columnsToIndex.size()));
+    // Create records for MDT
+    int parallelism = Math.max(Math.min(partitionInfoList.size(), 
recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionFiles -> {

Review Comment:
   rename: partitionFiles -> partitionInfo



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1915,4 +1924,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + StoragePath.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+    // Find the columns to index
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+    final List<String> columnsToIndex = getColumnsToIndex(
+        recordsGenerationParams,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+    if (columnsToIndex.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    LOG.debug(String.format("Indexing %d columns for partition stats index", 
columnsToIndex.size()));
+    // Create records for MDT
+    int parallelism = Math.max(Math.min(partitionInfoList.size(), 
recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionFiles -> {
+      final String partitionName = partitionFiles.getRelativePath();
+      Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = 
partitionFiles.getFileNameToSizeMap().keySet().stream()
+          .map(fileName -> getFileStatsRangeMetadata(partitionName, 
partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false))

Review Comment:
   are we reading all the column stats again to generate partition stats? I 
think we should avoid doing this extra work and piggy back off the column stats 
index values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to