codope commented on code in PR #12001:
URL: https://github.com/apache/hudi/pull/12001#discussion_r1777522927


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        long fileSize = baseFile.getFileSize();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext,
+                                                                             
HoodieWriteConfig metadataWriteConfig) {
     List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          baseFilePath,
-          metadataWriteConfig,
-          partition,
-          baseFile.getCommitTime());
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {

Review Comment:
   Currently, functional index cannot be created for bootstrap tables. Created 
HUDI-8265 to track.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        long fileSize = baseFile.getFileSize();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext,
+                                                                             
HoodieWriteConfig metadataWriteConfig) {
     List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          baseFilePath,
-          metadataWriteConfig,
-          partition,
-          baseFile.getCommitTime());
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFilePath, 
metadataWriteConfig, partition, baseFile.getCommitTime());
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, logFilePath, 
metadataWriteConfig, partition,
+            logFile.getDeltaCommitTime());
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          logFilePath,
-          metadataWriteConfig,
-          partition,
-          logFile.getDeltaCommitTime());
-    });
-    return HoodieJavaRDD.of(bloomFilterMetadataList, sparkEngineContext, 
parallelism);
+    return bloomFilterMetadataList;
   }
 
-  private static void buildColumnRangeMetadata(
-      HoodieTableMetaClient metaClient,
-      Schema readerSchema,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-      long fileSize,
-      Path filePath) {
-    Dataset<Row> fileDf = readRecordsAsRow(
-        new StoragePath[] {convertToStoragePath(filePath)},
-        sqlContext,
-        metaClient,
-        readerSchema);
+  private static void buildColumnRangeMetadata(HoodieTableMetaClient 
metaClient,
+                                               Schema readerSchema,
+                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
+                                               String columnToIndex,
+                                               SQLContext sqlContext,
+                                               
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                               long fileSize,
+                                               Path filePath) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to