yihua commented on code in PR #12127:
URL: https://github.com/apache/hudi/pull/12127#discussion_r1807469156


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -570,6 +584,9 @@ private HoodieIndexDefinition 
getFunctionalIndexDefinition(String indexName) {
   }
 
   private Set<String> getIndexPartitionsToInit(MetadataPartitionType 
partitionType) {
+    if (dataMetaClient.getIndexMetadata().isEmpty()) {

Review Comment:
   ```suggestion
       if (dataMetaClient.getFunctionalAndSecondaryIndexMetadata().isEmpty()) {
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -174,24 +185,40 @@ protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, F
     String columnToIndex = indexDefinition.getSourceFields().get(0);
     SQLContext sqlContext = sparkEngineContext.getSqlContext();
 
-    // Group FileSlices by partition
-    Map<String, List<FileSlice>> partitionToFileSlicesMap = 
partitionFileSlicePairs.stream()
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
+    // Read records and append functional index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathPairs, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
+          String partition = entry.getKey();
+          Pair<String, Long> filePathSizePair = entry.getValue();
+          String filePath = filePathSizePair.getKey();
+          long fileSize = filePathSizePair.getValue();
+          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, filePath, fileSize);
+          return rowsWithIndexMetadata.iterator();

Review Comment:
   Let's create a follow-up to use `HoodieData` and extract common logic to 
`HoodieBackedTableMetadataWriter` (reading stats, files, etc.) and only leave 
Spark specific logic (e.g., Spark transformations) in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to