jonvex commented on code in PR #12001:
URL: https://github.com/apache/hudi/pull/12001#discussion_r1777307915


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -179,40 +176,27 @@ protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, F
     String columnToIndex = indexDefinition.getSourceFields().get(0);
     SQLContext sqlContext = sparkEngineContext.getSqlContext();
     String basePath = metaClient.getBasePath().toString();
-    for (Pair<String, FileSlice> pair : partitionFileSlicePairs) {
-      String partition = pair.getKey();
-      FileSlice fileSlice = pair.getValue();
+
+    // Group FileSlices by partition
+    Map<String, List<FileSlice>> partitionToFileSlicesMap = 
partitionFileSlicePairs.stream()
+        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
+    List<HoodieRecord> allRecords = new ArrayList<>();
+    for (Map.Entry<String, List<FileSlice>> entry : 
partitionToFileSlicesMap.entrySet()) {
+      String partition = entry.getKey();
+      List<FileSlice> fileSlices = entry.getValue();
+      List<HoodieRecord> recordsForPartition = Collections.emptyList();
       // For functional index using column_stats
       if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
-        return getFunctionalIndexRecordsUsingColumnStats(
-            metaClient,
-            parallelism,
-            readerSchema,
-            fileSlice,
-            basePath,
-            partition,
-            functionalIndex,
-            columnToIndex,
-            sqlContext,
-            sparkEngineContext);
+        recordsForPartition = 
getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices, 
basePath, partition, functionalIndex, columnToIndex, sqlContext);
       }
       // For functional index using bloom_filters
       if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{

Review Comment:
   Change this to an else if.
   The two ifs are not both possible to be true. And since we are overwriting 
recordsForPartition in both of them it makes even more sense to not execute 
both of them.
   ```
   indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)
   indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        long fileSize = baseFile.getFileSize();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext,
+                                                                             
HoodieWriteConfig metadataWriteConfig) {
     List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          baseFilePath,
-          metadataWriteConfig,
-          partition,
-          baseFile.getCommitTime());
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFilePath, 
metadataWriteConfig, partition, baseFile.getCommitTime());
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, logFilePath, 
metadataWriteConfig, partition,
+            logFile.getDeltaCommitTime());
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          logFilePath,
-          metadataWriteConfig,
-          partition,
-          logFile.getDeltaCommitTime());
-    });
-    return HoodieJavaRDD.of(bloomFilterMetadataList, sparkEngineContext, 
parallelism);
+    return bloomFilterMetadataList;
   }
 
-  private static void buildColumnRangeMetadata(
-      HoodieTableMetaClient metaClient,
-      Schema readerSchema,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-      long fileSize,
-      Path filePath) {
-    Dataset<Row> fileDf = readRecordsAsRow(
-        new StoragePath[] {convertToStoragePath(filePath)},
-        sqlContext,
-        metaClient,
-        readerSchema);
+  private static void buildColumnRangeMetadata(HoodieTableMetaClient 
metaClient,
+                                               Schema readerSchema,
+                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
+                                               String columnToIndex,
+                                               SQLContext sqlContext,
+                                               
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                               long fileSize,
+                                               Path filePath) {

Review Comment:
   Should we just change this to storagepath?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,

Review Comment:
   This and getFunctionalIndexRecordsUsingBloomFilter look very similar. Is 
there a clean way to reuse code? There might not.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        long fileSize = baseFile.getFileSize();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext,
+                                                                             
HoodieWriteConfig metadataWriteConfig) {
     List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          baseFilePath,
-          metadataWriteConfig,
-          partition,
-          baseFile.getCommitTime());
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {

Review Comment:
   No bootstrap base handling? Do we need to create a followup ticket for that?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        long fileSize = baseFile.getFileSize();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String basePath,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext,
+                                                                             
HoodieWriteConfig metadataWriteConfig) {
     List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          baseFilePath,
-          metadataWriteConfig,
-          partition,
-          baseFile.getCommitTime());
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String filename = baseFile.getFileName();
+        Path baseFilePath = filePath(basePath, partition, filename);
+        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFilePath, 
metadataWriteConfig, partition, baseFile.getCommitTime());
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        String fileName = logFile.getFileName();
+        Path logFilePath = filePath(basePath, partition, fileName);

Review Comment:
   Shouldn't the log file already have the storagepath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to