lokeshj1703 commented on code in PR #12127:
URL: https://github.com/apache/hudi/pull/12127#discussion_r1807283537


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -72,97 +81,101 @@
  */
 public class SparkMetadataWriterUtils {
 
-  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
-                                                                             
Schema readerSchema,
-                                                                             
List<FileSlice> fileSlices,
-                                                                             
String partition,
-                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
-                                                                             
String columnToIndex,
-                                                                             
SQLContext sqlContext) {
-    List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    for (FileSlice fileSlice : fileSlices) {
-      if (fileSlice.getBaseFile().isPresent()) {
-        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-        long fileSize = baseFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
baseFile.getStoragePath(), true);
-      }
-      // Handle log files
-      fileSlice.getLogFiles().forEach(logFile -> {
-        long fileSize = logFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath(), false);
-      });
-    }
-    return createColumnStatsRecords(partition, columnRangeMetadataList, false, 
functionalIndex.getIndexName(), 
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
+  public static Column[] getFunctionalIndexColumns() {
+    return new Column[] {
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION),
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH),
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE)
+    };
   }
 
-  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
-                                                                             
Schema readerSchema,
-                                                                             
List<FileSlice> fileSlices,
-                                                                             
String partition,
-                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
-                                                                             
String columnToIndex,
-                                                                             
SQLContext sqlContext,
-                                                                             
HoodieWriteConfig metadataWriteConfig) {
-    List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    for (FileSlice fileSlice : fileSlices) {
-      if (fileSlice.getBaseFile().isPresent()) {
-        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(), 
metadataWriteConfig, partition,
-            baseFile.getCommitTime(), true);
-      }
-      // Handle log files
-      fileSlice.getLogFiles().forEach(
-          logFile -> buildBloomFilterMetadata(metaClient, readerSchema, 
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, 
logFile.getPath(), metadataWriteConfig, partition,
-              logFile.getDeltaCommitTime(), false));
-    }
-    return bloomFilterMetadataList;
+  public static String[] getFunctionalIndexColumnNames() {
+    return new String[] {
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE
+    };
   }
 
-  private static void buildColumnRangeMetadata(HoodieTableMetaClient 
metaClient,
-                                               Schema readerSchema,
-                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
-                                               String columnToIndex,
-                                               SQLContext sqlContext,
-                                               
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-                                               long fileSize,
-                                               StoragePath filePath,
-                                               boolean isBaseFile) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
-    fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = 
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), 
fileSize);
-    columnRangeMetadataList.add(columnRangeMetadata);
+  @NotNull
+  public static List<Row> getRowsWithFunctionalIndexMetadata(List<Row> 
rowsForFilePath, String partition, String filePath, long fileSize) {
+    return rowsForFilePath.stream().map(row -> {
+      Seq<Object> indexMetadata = 
JavaScalaConverters.convertJavaListToScalaSeq(Arrays.asList(partition, 
filePath, fileSize));
+      Row functionalIndexRow = Row.fromSeq(indexMetadata);
+      List<Row> rows = new ArrayList<>(2);
+      rows.add(row);
+      rows.add(functionalIndexRow);
+      Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaSeq(rows);
+      return Row.merge(rowSeq);
+    }).collect(Collectors.toList());
   }
 
-  private static void buildBloomFilterMetadata(HoodieTableMetaClient 
metaClient,
-                                               Schema readerSchema,
-                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
-                                               String columnToIndex,
-                                               SQLContext sqlContext,
-                                               List<HoodieRecord> 
bloomFilterMetadataList,
-                                               StoragePath filePath,
-                                               HoodieWriteConfig writeConfig,
-                                               String partitionName,
-                                               String instantTime,
-                                               boolean isBaseFile) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
-    fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(writeConfig);
-    fileDf.foreach(row -> {
-      byte[] key = row.getAs(columnToIndex).toString().getBytes();
-      bloomFilter.add(key);
-    });
-    ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
-    bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, 
filePath.toString(), instantTime, writeConfig.getBloomFilterType(), 
bloomByteBuffer, false));
+  public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(Dataset<Row> dataset,
+                                                                               
    HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                               
    String columnToIndex) {
+    // Aggregate col stats related data for the column to index
+    Dataset<Row> columnRangeMetadataDataset = dataset
+        .select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
+        .groupBy(SparkMetadataWriterUtils.getFunctionalIndexColumns())
+        
.agg(functions.count(functions.when(functions.col(columnToIndex).isNull(), 
1)).alias("nullCount"),
+            functions.min(columnToIndex).alias("minValue"),
+            functions.max(columnToIndex).alias("maxValue"),
+            functions.count(columnToIndex).alias("valueCount"));
+    // Generate column stat records using the aggregated data
+    return 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row,
 Iterator<HoodieRecord>>)
+        row -> {
+          int baseAggregatePosition = 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames().length;
+          long nullCount = row.getLong(baseAggregatePosition);
+          Comparable minValue = (Comparable) row.get(baseAggregatePosition + 
1);
+          Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 
2);
+          long valueCount = row.getLong(baseAggregatePosition + 3);
+
+          String partitionName = row.getString(0);
+          String filePath = row.getString(1);
+          long totalFileSize = row.getLong(2);
+          // Total uncompressed size is harder to get directly. This is just 
an approximation to maintain the order.
+          long totalUncompressedSize = totalFileSize * 2;
+
+          HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
+              filePath,
+              columnToIndex,
+              minValue,
+              maxValue,
+              nullCount,
+              valueCount,
+              totalFileSize,
+              totalUncompressedSize
+          );
+          return createColumnStatsRecords(partitionName, 
Collections.singletonList(rangeMetadata), false, functionalIndex.getIndexName(),
+              
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();}
+    );
   }
 
-  private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext 
sqlContext,
-                                               HoodieTableMetaClient 
metaClient, Schema schema,
-                                               boolean isBaseFile) {
+  public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex,
+                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime) {
+    // Group data using functional index metadata and then create bloom filter 
on the group
+    Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
+        .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(0), 
row.getString(1)), Encoders.kryo(Pair.class))
+        .flatMapGroups((FlatMapGroupsFunction<Pair, Row, HoodieRecord>)  
((pair, iterator) -> {
+          String partition = pair.getLeft().toString();
+          String filePath = pair.getRight().toString();
+          BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);

Review Comment:
   Do you mean column names as in functionalIndexPartition, 
functionalIndexFilePath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to