nsivabalan commented on code in PR #12127:
URL: https://github.com/apache/hudi/pull/12127#discussion_r1807127144


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -194,66 +207,21 @@ private static List<HoodieRecord> 
getBaseFileRecords(HoodieBaseFile baseFile, Ho
     }
   }
 
-  private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema 
schema, SQLContext sqlContext, boolean isBaseFile) {
-    List<GenericRecord> avroRecords = records.stream()
+  private static List<Row> toRows(List<HoodieRecord> records, Schema schema, 
SQLContext sqlContext, String path) {
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    Function1<GenericRecord, Row> converterToRow = 
AvroConversionUtils.createConverterToRow(schema, structType);
+    List<Row> avroRecords = records.stream()
         .map(r -> {
-          if (isBaseFile) {
-            return (GenericRecord) r.getData();
-          }
-          HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
           try {
-            return (GenericRecord) payload.getInsertValue(schema).get();
+            return (GenericRecord) (r.getData() instanceof GenericRecord ? 
r.getData()
+                : ((HoodieRecordPayload) r.getData()).getInsertValue(schema, 
new Properties()).get());
           } catch (IOException e) {
-            throw new HoodieIOException("Failed to extract Avro payload", e);
+            throw new HoodieIOException("Could not fetch record payload");
           }
         })
+        .map(converterToRow::apply)
+        // .map(row -> RowFactory.create(path, row))

Review Comment:
   can we remove L 223. (the commented out line)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -194,66 +207,21 @@ private static List<HoodieRecord> 
getBaseFileRecords(HoodieBaseFile baseFile, Ho
     }
   }
 
-  private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema 
schema, SQLContext sqlContext, boolean isBaseFile) {
-    List<GenericRecord> avroRecords = records.stream()
+  private static List<Row> toRows(List<HoodieRecord> records, Schema schema, 
SQLContext sqlContext, String path) {
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    Function1<GenericRecord, Row> converterToRow = 
AvroConversionUtils.createConverterToRow(schema, structType);
+    List<Row> avroRecords = records.stream()
         .map(r -> {
-          if (isBaseFile) {
-            return (GenericRecord) r.getData();
-          }
-          HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
           try {
-            return (GenericRecord) payload.getInsertValue(schema).get();
+            return (GenericRecord) (r.getData() instanceof GenericRecord ? 
r.getData()
+                : ((HoodieRecordPayload) r.getData()).getInsertValue(schema, 
new Properties()).get());

Review Comment:
   not sure I get your question. We pass it in from driver only 
(hoodieWriteConfig.getProps()) 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -174,24 +185,40 @@ protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, F
     String columnToIndex = indexDefinition.getSourceFields().get(0);
     SQLContext sqlContext = sparkEngineContext.getSqlContext();
 
-    // Group FileSlices by partition
-    Map<String, List<FileSlice>> partitionToFileSlicesMap = 
partitionFileSlicePairs.stream()
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
+    // Read records and append functional index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathPairs, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
+          String partition = entry.getKey();
+          Pair<String, Long> filePathSizePair = entry.getValue();
+          String filePath = filePathSizePair.getKey();
+          long fileSize = filePathSizePair.getValue();
+          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema,
+              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, filePath, fileSize);
+          return rowsWithIndexMetadata.iterator();
+        });
+
+    // Generate dataset with functional index metadata
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema);
+    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
+        
structType.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
 DataTypes.StringType, false, Metadata.empty()))

Review Comment:
   can you fix the schema at L 202 only. i..e adding these 3 additional fields, 
so that its more clear



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -72,97 +81,101 @@
  */
 public class SparkMetadataWriterUtils {
 
-  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
-                                                                             
Schema readerSchema,
-                                                                             
List<FileSlice> fileSlices,
-                                                                             
String partition,
-                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
-                                                                             
String columnToIndex,
-                                                                             
SQLContext sqlContext) {
-    List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    for (FileSlice fileSlice : fileSlices) {
-      if (fileSlice.getBaseFile().isPresent()) {
-        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-        long fileSize = baseFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
baseFile.getStoragePath(), true);
-      }
-      // Handle log files
-      fileSlice.getLogFiles().forEach(logFile -> {
-        long fileSize = logFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath(), false);
-      });
-    }
-    return createColumnStatsRecords(partition, columnRangeMetadataList, false, 
functionalIndex.getIndexName(), 
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
+  public static Column[] getFunctionalIndexColumns() {
+    return new Column[] {
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION),
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH),
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE)
+    };
   }
 
-  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
-                                                                             
Schema readerSchema,
-                                                                             
List<FileSlice> fileSlices,
-                                                                             
String partition,
-                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
-                                                                             
String columnToIndex,
-                                                                             
SQLContext sqlContext,
-                                                                             
HoodieWriteConfig metadataWriteConfig) {
-    List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    for (FileSlice fileSlice : fileSlices) {
-      if (fileSlice.getBaseFile().isPresent()) {
-        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(), 
metadataWriteConfig, partition,
-            baseFile.getCommitTime(), true);
-      }
-      // Handle log files
-      fileSlice.getLogFiles().forEach(
-          logFile -> buildBloomFilterMetadata(metaClient, readerSchema, 
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, 
logFile.getPath(), metadataWriteConfig, partition,
-              logFile.getDeltaCommitTime(), false));
-    }
-    return bloomFilterMetadataList;
+  public static String[] getFunctionalIndexColumnNames() {
+    return new String[] {
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE
+    };
   }
 
-  private static void buildColumnRangeMetadata(HoodieTableMetaClient 
metaClient,
-                                               Schema readerSchema,
-                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
-                                               String columnToIndex,
-                                               SQLContext sqlContext,
-                                               
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-                                               long fileSize,
-                                               StoragePath filePath,
-                                               boolean isBaseFile) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
-    fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = 
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), 
fileSize);
-    columnRangeMetadataList.add(columnRangeMetadata);
+  @NotNull
+  public static List<Row> getRowsWithFunctionalIndexMetadata(List<Row> 
rowsForFilePath, String partition, String filePath, long fileSize) {
+    return rowsForFilePath.stream().map(row -> {
+      Seq<Object> indexMetadata = 
JavaScalaConverters.convertJavaListToScalaSeq(Arrays.asList(partition, 
filePath, fileSize));
+      Row functionalIndexRow = Row.fromSeq(indexMetadata);
+      List<Row> rows = new ArrayList<>(2);
+      rows.add(row);
+      rows.add(functionalIndexRow);
+      Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaSeq(rows);
+      return Row.merge(rowSeq);
+    }).collect(Collectors.toList());
   }
 
-  private static void buildBloomFilterMetadata(HoodieTableMetaClient 
metaClient,
-                                               Schema readerSchema,
-                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
-                                               String columnToIndex,
-                                               SQLContext sqlContext,
-                                               List<HoodieRecord> 
bloomFilterMetadataList,
-                                               StoragePath filePath,
-                                               HoodieWriteConfig writeConfig,
-                                               String partitionName,
-                                               String instantTime,
-                                               boolean isBaseFile) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
-    fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(writeConfig);
-    fileDf.foreach(row -> {
-      byte[] key = row.getAs(columnToIndex).toString().getBytes();
-      bloomFilter.add(key);
-    });
-    ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
-    bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, 
filePath.toString(), instantTime, writeConfig.getBloomFilterType(), 
bloomByteBuffer, false));
+  public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(Dataset<Row> dataset,
+                                                                               
    HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                               
    String columnToIndex) {
+    // Aggregate col stats related data for the column to index
+    Dataset<Row> columnRangeMetadataDataset = dataset
+        .select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
+        .groupBy(SparkMetadataWriterUtils.getFunctionalIndexColumns())
+        
.agg(functions.count(functions.when(functions.col(columnToIndex).isNull(), 
1)).alias("nullCount"),
+            functions.min(columnToIndex).alias("minValue"),
+            functions.max(columnToIndex).alias("maxValue"),
+            functions.count(columnToIndex).alias("valueCount"));
+    // Generate column stat records using the aggregated data
+    return 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row,
 Iterator<HoodieRecord>>)
+        row -> {
+          int baseAggregatePosition = 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames().length;
+          long nullCount = row.getLong(baseAggregatePosition);
+          Comparable minValue = (Comparable) row.get(baseAggregatePosition + 
1);
+          Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 
2);
+          long valueCount = row.getLong(baseAggregatePosition + 3);
+
+          String partitionName = row.getString(0);
+          String filePath = row.getString(1);
+          long totalFileSize = row.getLong(2);
+          // Total uncompressed size is harder to get directly. This is just 
an approximation to maintain the order.
+          long totalUncompressedSize = totalFileSize * 2;
+
+          HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
+              filePath,
+              columnToIndex,
+              minValue,
+              maxValue,
+              nullCount,
+              valueCount,
+              totalFileSize,
+              totalUncompressedSize
+          );
+          return createColumnStatsRecords(partitionName, 
Collections.singletonList(rangeMetadata), false, functionalIndex.getIndexName(),
+              
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();}
+    );
   }
 
-  private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext 
sqlContext,
-                                               HoodieTableMetaClient 
metaClient, Schema schema,
-                                               boolean isBaseFile) {
+  public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex,
+                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime) {
+    // Group data using functional index metadata and then create bloom filter 
on the group
+    Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
+        .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(0), 
row.getString(1)), Encoders.kryo(Pair.class))

Review Comment:
   why are we grouping by just 2 fields?
   for col stats, I see we are grouping by HOODIE_FUNCTIONAL_INDEX_PARTITION, 
HOODIE_FUNCTIONAL_INDEX_FILE_PATH and HOODIE_FUNCTIONAL_INDEX_FILE_SIZE. 
   
   Shouldnt' we be doing the same. 
   also, instead of groupByKey, can we do groupBy so thats its clear as to 
which cols are used to do group by. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -72,97 +81,101 @@
  */
 public class SparkMetadataWriterUtils {
 
-  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
-                                                                             
Schema readerSchema,
-                                                                             
List<FileSlice> fileSlices,
-                                                                             
String partition,
-                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
-                                                                             
String columnToIndex,
-                                                                             
SQLContext sqlContext) {
-    List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    for (FileSlice fileSlice : fileSlices) {
-      if (fileSlice.getBaseFile().isPresent()) {
-        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-        long fileSize = baseFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
baseFile.getStoragePath(), true);
-      }
-      // Handle log files
-      fileSlice.getLogFiles().forEach(logFile -> {
-        long fileSize = logFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath(), false);
-      });
-    }
-    return createColumnStatsRecords(partition, columnRangeMetadataList, false, 
functionalIndex.getIndexName(), 
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
+  public static Column[] getFunctionalIndexColumns() {
+    return new Column[] {
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION),
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH),
+        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE)
+    };
   }
 
-  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
-                                                                             
Schema readerSchema,
-                                                                             
List<FileSlice> fileSlices,
-                                                                             
String partition,
-                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
-                                                                             
String columnToIndex,
-                                                                             
SQLContext sqlContext,
-                                                                             
HoodieWriteConfig metadataWriteConfig) {
-    List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    for (FileSlice fileSlice : fileSlices) {
-      if (fileSlice.getBaseFile().isPresent()) {
-        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(), 
metadataWriteConfig, partition,
-            baseFile.getCommitTime(), true);
-      }
-      // Handle log files
-      fileSlice.getLogFiles().forEach(
-          logFile -> buildBloomFilterMetadata(metaClient, readerSchema, 
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, 
logFile.getPath(), metadataWriteConfig, partition,
-              logFile.getDeltaCommitTime(), false));
-    }
-    return bloomFilterMetadataList;
+  public static String[] getFunctionalIndexColumnNames() {
+    return new String[] {
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE
+    };
   }
 
-  private static void buildColumnRangeMetadata(HoodieTableMetaClient 
metaClient,
-                                               Schema readerSchema,
-                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
-                                               String columnToIndex,
-                                               SQLContext sqlContext,
-                                               
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-                                               long fileSize,
-                                               StoragePath filePath,
-                                               boolean isBaseFile) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
-    fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = 
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), 
fileSize);
-    columnRangeMetadataList.add(columnRangeMetadata);
+  @NotNull
+  public static List<Row> getRowsWithFunctionalIndexMetadata(List<Row> 
rowsForFilePath, String partition, String filePath, long fileSize) {
+    return rowsForFilePath.stream().map(row -> {
+      Seq<Object> indexMetadata = 
JavaScalaConverters.convertJavaListToScalaSeq(Arrays.asList(partition, 
filePath, fileSize));
+      Row functionalIndexRow = Row.fromSeq(indexMetadata);
+      List<Row> rows = new ArrayList<>(2);
+      rows.add(row);
+      rows.add(functionalIndexRow);
+      Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaSeq(rows);
+      return Row.merge(rowSeq);
+    }).collect(Collectors.toList());
   }
 
-  private static void buildBloomFilterMetadata(HoodieTableMetaClient 
metaClient,
-                                               Schema readerSchema,
-                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
-                                               String columnToIndex,
-                                               SQLContext sqlContext,
-                                               List<HoodieRecord> 
bloomFilterMetadataList,
-                                               StoragePath filePath,
-                                               HoodieWriteConfig writeConfig,
-                                               String partitionName,
-                                               String instantTime,
-                                               boolean isBaseFile) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
-    fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(writeConfig);
-    fileDf.foreach(row -> {
-      byte[] key = row.getAs(columnToIndex).toString().getBytes();
-      bloomFilter.add(key);
-    });
-    ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
-    bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, 
filePath.toString(), instantTime, writeConfig.getBloomFilterType(), 
bloomByteBuffer, false));
+  public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(Dataset<Row> dataset,
+                                                                               
    HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                               
    String columnToIndex) {
+    // Aggregate col stats related data for the column to index
+    Dataset<Row> columnRangeMetadataDataset = dataset
+        .select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
+        .groupBy(SparkMetadataWriterUtils.getFunctionalIndexColumns())
+        
.agg(functions.count(functions.when(functions.col(columnToIndex).isNull(), 
1)).alias("nullCount"),
+            functions.min(columnToIndex).alias("minValue"),
+            functions.max(columnToIndex).alias("maxValue"),
+            functions.count(columnToIndex).alias("valueCount"));
+    // Generate column stat records using the aggregated data
+    return 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row,
 Iterator<HoodieRecord>>)
+        row -> {
+          int baseAggregatePosition = 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames().length;
+          long nullCount = row.getLong(baseAggregatePosition);
+          Comparable minValue = (Comparable) row.get(baseAggregatePosition + 
1);
+          Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 
2);
+          long valueCount = row.getLong(baseAggregatePosition + 3);
+
+          String partitionName = row.getString(0);
+          String filePath = row.getString(1);
+          long totalFileSize = row.getLong(2);
+          // Total uncompressed size is harder to get directly. This is just 
an approximation to maintain the order.
+          long totalUncompressedSize = totalFileSize * 2;
+
+          HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
+              filePath,
+              columnToIndex,
+              minValue,
+              maxValue,
+              nullCount,
+              valueCount,
+              totalFileSize,
+              totalUncompressedSize
+          );
+          return createColumnStatsRecords(partitionName, 
Collections.singletonList(rangeMetadata), false, functionalIndex.getIndexName(),
+              
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();}
+    );
   }
 
-  private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext 
sqlContext,
-                                               HoodieTableMetaClient 
metaClient, Schema schema,
-                                               boolean isBaseFile) {
+  public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex,
+                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime) {
+    // Group data using functional index metadata and then create bloom filter 
on the group
+    Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
+        .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(0), 
row.getString(1)), Encoders.kryo(Pair.class))
+        .flatMapGroups((FlatMapGroupsFunction<Pair, Row, HoodieRecord>)  
((pair, iterator) -> {
+          String partition = pair.getLeft().toString();
+          String filePath = pair.getRight().toString();
+          BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);

Review Comment:
   oh, the first entry refers to partition and 2nd refers to filePath is it?
   may be we are good. but lets use col name if possible 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to