manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r796238028



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String 
recordKey, int numFileGrou
     return fileSliceStream.sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+                                                                       
HoodieEngineContext engineContext,
+                                                                       
HoodieTableMetaClient dataMetaClient,
+                                                                       boolean 
isMetaIndexColumnStatsForAllColumns,
+                                                                       String 
instantTime) {
+
+    try {
+      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+      return 
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, 
dataMetaClient, allWriteStats,
+          isMetaIndexColumnStatsForAllColumns);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table ", e);
+    }
+  }
+
+  /**
+   * Create column stats from write status.
+   *
+   * @param engineContext                       - Enging context
+   * @param datasetMetaClient                   - Dataset meta client
+   * @param allWriteStats                       - Write status to convert
+   * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for 
indexing
+   */
+  public static List<HoodieRecord> 
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+                                                                   
HoodieTableMetaClient datasetMetaClient,
+                                                                   
List<HoodieWriteStat> allWriteStats,
+                                                                   boolean 
isMetaIndexColumnStatsForAllColumns) throws Exception {
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<HoodieWriteStat> prunedWriteStats = 
allWriteStats.stream().filter(writeStat -> {
+      return !(writeStat instanceof HoodieDeltaWriteStat);
+    }).collect(Collectors.toList());
+    if (prunedWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    return engineContext.flatMap(prunedWriteStats,
+        writeStat -> translateWriteStatToColumnStats(writeStat, 
datasetMetaClient,
+            getLatestColumns(datasetMetaClient, 
isMetaIndexColumnStatsForAllColumns)),
+        prunedWriteStats.size());
+  }
+
+  /**
+   * Get the latest columns for the table for column stats indexing.
+   *
+   * @param datasetMetaClient                   - Data table meta client
+   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
+   */
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+    if (!isMetaIndexColumnStatsForAllColumns
+        || 
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 < 1) {
+      return 
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+    }
+
+    TableSchemaResolver schemaResolver = new 
TableSchemaResolver(datasetMetaClient);
+    // consider nested fields as well. if column stats is enabled only for a 
subset of columns,
+    // directly use them instead of all columns from the latest table schema
+    try {
+      return schemaResolver.getTableAvroSchema().getFields().stream()
+          .map(entry -> entry.name()).collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest columns for " + 
datasetMetaClient.getBasePath());
+    }
+  }
+
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
+    return getLatestColumns(datasetMetaClient, false);
+  }
+
+  public static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
+                                                                     
HoodieTableMetaClient datasetMetaClient,
+                                                                     
List<String> latestColumns) {
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, latestColumns, false);
+
+  }
+
+  public static Stream<HoodieRecord> getColumnStats(final String 
partitionPath, final String filePathWithPartition,

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to