codope commented on code in PR #12001:
URL: https://github.com/apache/hudi/pull/12001#discussion_r1776665013


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        long fileSize = baseFile.getFileSize();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,

Review Comment:
   same here.. collect bloom filter metadata for all file slices in a partition.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1847,9 +1847,15 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
   }
 
   public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
-    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-    Schema tableSchema = schemaResolver.getTableAvroSchema();
-    return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
+    Schema tableSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    List<String> partitionFields = 
metaClient.getTableConfig().getPartitionFields()
+        .map(Arrays::asList)
+        .orElse(Collections.emptyList());
+    List<String> sourceFields = indexDefinition.getSourceFields();
+    List<String> mergedFields = new ArrayList<>(partitionFields.size() + 
sourceFields.size());
+    mergedFields.addAll(partitionFields);
+    mergedFields.addAll(sourceFields);

Review Comment:
   this is a bug fix which appeared after building functional index for 
non-parititon column. While loading file slices, the relation expects partition 
field as well. So, we are projecting metadat fields + partition fields + fields 
on which func index is enabled.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,

Review Comment:
   Please ignore indentation changes. I had to bring the style in line with the 
convention we follow. The main change here is a refactoring that now instead of 
collecting stats for single file slice, it does so for all file slices in a 
given partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to