codope commented on a change in pull request #5070:
URL: https://github.com/apache/hudi/pull/5070#discussion_r830436453



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -838,55 +857,52 @@ public static HoodieTableFileSystemView 
getFileSystemView(HoodieTableMetaClient
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
                                                                              
HoodieEngineContext engineContext,
                                                                              
MetadataRecordsGenerationParams recordsGenerationParams) {
-    try {
-      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
-          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
-      return 
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, 
allWriteStats, recordsGenerationParams);
-    } catch (Exception e) {
-      throw new HoodieException("Failed to generate column stats records for 
metadata table ", e);
-    }
-  }
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
 
-  /**
-   * Create column stats from write status.
-   *
-   * @param engineContext           - Engine context
-   * @param allWriteStats           - Write status to convert
-   * @param recordsGenerationParams - Parameters for columns stats record 
generation
-   */
-  public static HoodieData<HoodieRecord> 
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
-                                                                         
List<HoodieWriteStat> allWriteStats,
-                                                                         
MetadataRecordsGenerationParams recordsGenerationParams) {
     if (allWriteStats.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
-    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), 
recordsGenerationParams.isAllColumnStatsIndexEnabled());
-    final int parallelism = Math.max(Math.min(allWriteStats.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    HoodieData<HoodieWriteStat> allWriteStatsRDD = 
engineContext.parallelize(allWriteStats, parallelism);
-    return allWriteStatsRDD.flatMap(writeStat -> 
translateWriteStatToColumnStats(writeStat, 
recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator());
+
+    try {
+      Option<Schema> writerSchema =
+          
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+              .map(writerSchemaStr -> new 
Schema.Parser().parse(writerSchemaStr));
+
+      HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+
+      List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+          dataTableMetaClient.getTableConfig(), writerSchema);
+
+      if (columnsToIndex.isEmpty()) {
+        // In case there are no columns to index, bail
+        return engineContext.emptyHoodieData();
+      }
+
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+      return engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat ->
+              translateWriteStatToColumnStats(writeStat, dataTableMetaClient, 
columnsToIndex).iterator());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table", e);
+    }
   }
 
   /**
    * Get the latest columns for the table for column stats indexing.
-   *
-   * @param datasetMetaClient                   - Data table meta client
-   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
    */
-  private static List<String> getColumnsToIndex(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
-    if (!isMetaIndexColumnStatsForAllColumns
-        || 
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 < 1) {
-      return 
Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(","));
+  private static List<String> 
getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
+                                                HoodieTableConfig tableConfig,
+                                                Option<Schema> writerSchema) {
+    if (!recordsGenParams.isAllColumnStatsIndexEnabled()) {
+      // TODO why are we only indexing primary key? revisit fallback

Review comment:
       This will change as we'll be adding support to index multiple columns: 
https://github.com/apache/hudi/pull/4693/files#diff-11e9ef6bd53ef1001b669a1dc68dde2aba9b33c9eb72cc1e4198750336d79772
   No change needed from your side. I'll take care of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to