lokeshj1703 commented on code in PR #12127:
URL: https://github.com/apache/hudi/pull/12127#discussion_r1807302514


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -174,24 +185,40 @@ protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, F
     String columnToIndex = indexDefinition.getSourceFields().get(0);
     SQLContext sqlContext = sparkEngineContext.getSqlContext();
 
-    // Group FileSlices by partition
-    Map<String, List<FileSlice>> partitionToFileSlicesMap = 
partitionFileSlicePairs.stream()
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
+    // Read records and append functional index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathPairs, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
+          String partition = entry.getKey();
+          Pair<String, Long> filePathSizePair = entry.getValue();
+          String filePath = filePathSizePair.getKey();
+          long fileSize = filePathSizePair.getValue();
+          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema,
+              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, filePath, fileSize);
+          return rowsWithIndexMetadata.iterator();
+        });
+
+    // Generate dataset with functional index metadata
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema);
+    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
+        
structType.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
 DataTypes.StringType, false, Metadata.empty()))

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to