codope commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1908191752


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -566,12 +566,25 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(
     return Pair.of(fileGroupCount, records);
   }
 
+  /**
+   * Generates expression index records
+   * @param partitionFilePathAndSizeTriplet Triplet of file path, file size 
and partition name to which file belongs
+   * @param indexDefinition Hoodie Index Definition for the expression index 
for which records need to be generated
+   * @param metaClient Hoodie Table Meta Client
+   * @param parallelism Parallelism to use for engine operations
+   * @param readerSchema Schema of reader
+   * @param storageConf Storage Config
+   * @param instantTime Instant time
+   * @param shouldGeneratePartitionStatRecords Whether partition stat records 
need to be generated along with the file level expression index
+   *                                           records. Partition stat records 
need to be generated when bootstrapping the index
+   * @return HoodieData wrapper of expression index HoodieRecords
+   */
   protected abstract HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet,
                                                                         
HoodieIndexDefinition indexDefinition,
                                                                         
HoodieTableMetaClient metaClient,
                                                                         int 
parallelism, Schema readerSchema,
                                                                         
StorageConfiguration<?> storageConf,
-                                                                        String 
instantTime);
+                                                                        String 
instantTime, boolean shouldGeneratePartitionStatRecords);

Review Comment:
   don't need this boolean as `indexDefinition` is already being passed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -121,37 +157,53 @@ public static HoodieData<HoodieRecord> 
getExpressionIndexRecordsUsingColumnStats
             functions.max(columnToIndex).alias("maxValue"),
             functions.count(columnToIndex).alias("valueCount"));
     // Generate column stat records using the aggregated data
-    return 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row,
 Iterator<HoodieRecord>>)
-        row -> {
-          int baseAggregatePosition = 
SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
-          long nullCount = row.getLong(baseAggregatePosition);
-          Comparable minValue = (Comparable) row.get(baseAggregatePosition + 
1);
-          Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 
2);
-          long valueCount = row.getLong(baseAggregatePosition + 3);
-
-          String partitionName = row.getString(0);
-          String relativeFilePath = row.getString(1);
-          long totalFileSize = row.getLong(2);
-          // Total uncompressed size is harder to get directly. This is just 
an approximation to maintain the order.
-          long totalUncompressedSize = totalFileSize * 2;
-
-          HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
-              relativeFilePath,
-              columnToIndex,
-              minValue,
-              maxValue,
-              nullCount,
-              valueCount,
-              totalFileSize,
-              totalUncompressedSize
-          );
-          return createColumnStatsRecords(partitionName, 
Collections.singletonList(rangeMetadata), false, expressionIndex.getIndexName(),
-              
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();
-        });
+    HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
rangeMetadataHoodieJavaRDD = 
HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD())
+        .flatMapToPair((SerializableFunction<Row, Iterator<? extends 
Pair<String, HoodieColumnRangeMetadata<Comparable>>>>)
+            row -> {
+              int baseAggregatePosition = 
SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
+              long nullCount = row.getLong(baseAggregatePosition);
+              Comparable minValue = (Comparable) row.get(baseAggregatePosition 
+ 1);
+              Comparable maxValue = (Comparable) row.get(baseAggregatePosition 
+ 2);
+              long valueCount = row.getLong(baseAggregatePosition + 3);
+
+              String partitionName = row.getString(0);
+              String relativeFilePath = row.getString(1);
+              long totalFileSize = row.getLong(2);
+              // Total uncompressed size is harder to get directly. This is 
just an approximation to maintain the order.
+              long totalUncompressedSize = totalFileSize * 2;
+
+              HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
+                  relativeFilePath,
+                  columnToIndex,
+                  minValue,
+                  maxValue,
+                  nullCount,
+                  valueCount,
+                  totalFileSize,
+                  totalUncompressedSize
+              );
+              return Collections.singletonList(Pair.of(partitionName, 
rangeMetadata)).iterator();
+            });
+
+    if (partitionRecordsFunctionOpt.isPresent()) {
+      rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");

Review Comment:
   good default, but let's create a jira to make it configurable



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1147,14 +1156,8 @@ private void 
updateExpressionIndexIfPresent(HoodieCommitMetadata commitMetadata,
    * @param indexPartition partition name of the expression index
    * @param instantTime    timestamp at of the current update commit
    */
-  private HoodieData<HoodieRecord> 
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
-    HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
-    List<Pair<String, Pair<String, Long>>> partitionFilePathPairs = new 
ArrayList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add(
-        Pair.of(writeStat.getPartitionPath(), Pair.of(new 
StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), 
writeStat.getFileSizeInBytes())))));
-    int parallelism = Math.min(partitionFilePathPairs.size(), 
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
-    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
-    return getExpressionIndexRecords(partitionFilePathPairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf, instantTime);
+  protected HoodieData<HoodieRecord> 
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
+    throw new UnsupportedOperationException("");

Review Comment:
   add some message



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +278,155 @@ private static List<Row> toRows(List<HoodieRecord> 
records, Schema schema, Hoodi
         .collect(Collectors.toList());
     return avroRecords;
   }
+
+  /**
+   * Generates expression index records
+   * @param partitionFilePathAndSizeTriplet Triplet of file path, file size 
and partition name to which file belongs
+   * @param indexDefinition Hoodie Index Definition for the expression index 
for which records need to be generated
+   * @param metaClient Hoodie Table Meta Client
+   * @param parallelism Parallelism to use for engine operations
+   * @param readerSchema Schema of reader
+   * @param instantTime Instant time
+   * @param engineContext HoodieEngineContext
+   * @param dataWriteConfig Write Config for the data table
+   * @param metadataWriteConfig Write config for the metadata table
+   * @param partitionRecordsFunctionOpt Function used to generate partition 
stat records for the EI. It takes the column range metadata generated for the 
provided partition files as input
+   *                                    and uses those to generate the final 
partition stats
+   * @return ExpressionIndexComputationMetadata containing both EI column stat 
records and partition stat records if partitionRecordsFunctionOpt is provided
+   */
+  @SuppressWarnings("checkstyle:LineLength")
+  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+                                                                       
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String 
instantTime,
+                                                                       
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, 
HoodieWriteConfig metadataWriteConfig,
+                                                                       
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, 
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
+    if (indexDefinition.getSourceFields().isEmpty()) {
+      // In case there are no columns to index, bail
+      return new 
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+    }
+
+    // NOTE: We are assuming that the index expression is operating on a 
single column
+    //       HUDI-6994 will address this.
+    ValidationUtils.checkArgument(indexDefinition.getSourceFields().size() == 
1, "Only one source field is supported for expression index");
+    String columnToIndex = indexDefinition.getSourceFields().get(0);
+    SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+    // Read records and append expression index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry ->
+            getExpressionIndexRecordsIterator(metaClient, readerSchema, 
dataWriteConfig, entry, sqlContext));
+
+    // Generate dataset with expression index metadata
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
 DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
+    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
+
+    // Apply expression index and generate the column to index
+    HoodieExpressionIndex<Column, Column> expressionIndex =
+        new HoodieSparkExpressionIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
+    Column indexedColumn = 
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+    rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+    // Generate expression index records
+    if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+      return getExpressionIndexRecordsUsingColumnStats(rowDataset, 
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+    } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
+      return getExpressionIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime, 
indexDefinition.getIndexName());
+    } else {
+      throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
+    }
+  }
+
+  private static Iterator<Row> 
getExpressionIndexRecordsIterator(HoodieTableMetaClient metaClient, Schema 
readerSchema, HoodieWriteConfig dataWriteConfig, Pair<String, Pair<String, 
Long>> entry,
+                                                       SQLContext sqlContext) {
+    String partition = entry.getKey();
+    Pair<String, Long> filePathSizePair = entry.getValue();
+    String filePath = filePathSizePair.getKey();
+    String relativeFilePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(filePath));
+    long fileSize = filePathSizePair.getValue();
+    List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+        FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+    List<Row> rowsWithIndexMetadata = 
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, 
relativeFilePath, fileSize);
+    return rowsWithIndexMetadata.iterator();
+  }
+
+  /**
+   * Fetches column range metadata from the EI partition for all the partition 
files impacted by the commit. This would only take into account completed 
commits for the partitions
+   * since EI updates have not yet been committed.
+   *
+   * @param commitMetadata Hoodie commit metadata
+   * @param indexPartition Partition name for the expression index
+   * @param engineContext  Hoodie engine context
+   * @param tableMetadata
+   * @param dataMetaClient Data table meta client
+   * @param metadataConfig Hoodie metadata config
+   * @return HoodiePairData of partition name and list of column range 
metadata for the partitions
+   */
+  public static HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>> 
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata, 
String indexPartition,
+                                                                               
                                            HoodieEngineContext engineContext, 
HoodieTableMetadata tableMetadata,
+                                                                               
                                            HoodieTableMetaClient 
dataMetaClient, HoodieMetadataConfig metadataConfig) {
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new 
ArrayList<>()));
+    }
+
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, 
dataMetaClient);
+    List<String> columnsToIndex = 
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+    try {
+      Option<Schema> writerSchema =
+          
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+              .flatMap(writerSchemaStr ->
+                  isNullOrEmpty(writerSchemaStr)
+                      ? Option.empty()
+                      : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+      HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+      Schema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema)
+          .orElseThrow(() -> new IllegalStateException(String.format("Expected 
writer schema in commit metadata %s", commitMetadata)));
+      // filter columns with only supported types
+      final List<String> validColumnsToIndex = columnsToIndex.stream()
+          .filter(col -> 
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) || 
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col, tableSchema))
+          .collect(Collectors.toList());
+      if (validColumnsToIndex.isEmpty()) {
+        return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new 
ArrayList<>()));
+      }
+      LOG.debug("Indexing following columns for partition stats index: {}", 
validColumnsToIndex);
+      // Group by partitionPath and then gather write stats lists,
+      // where each inner list contains HoodieWriteStat objects that have the 
same partitionPath.
+      List<List<HoodieWriteStat>> partitionedWriteStats = new 
ArrayList<>(allWriteStats.stream()
+          .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))

Review Comment:
   What happens in case of non-partitioned table? Do we add an entry with empty 
string partition? Let's ensure we have a test for non-partitioned table.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,68 @@ public void deletePartitions(String instantTime, 
List<MetadataPartitionType> par
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
+  /**
+   * Loads the file slices touched by the commit due to given instant time and 
returns the records for the expression index.
+   * This generates partition stat record updates along with EI column stat 
update records. Partition stat record updates are generated
+   * by reloading the affected partitions column range metadata from EI and 
then merging it with partition stat record from the updated data.
+   *
+   * @param commitMetadata {@code HoodieCommitMetadata}
+   * @param indexPartition partition name of the expression index
+   * @param instantTime    timestamp at of the current update commit
+   */
+  @Override
+  protected HoodieData<HoodieRecord> 
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
+    HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+    boolean isExprIndexUsingColumnStats = 
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+    Option<Function<HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> 
partitionRecordsFunctionOpt = Option.empty();
+    final HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
exprIndexPartitionStatUpdates;
+    if (isExprIndexUsingColumnStats) {
+      // Fetch column range metadata for affected partitions in the commit
+      exprIndexPartitionStatUpdates =
+          
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata, 
indexPartition,
+                  engineContext, getTableMetadata(), dataMetaClient, 
dataWriteConfig.getMetadataConfig())
+              .flatMapValues(List::iterator);
+      // The function below merges the column range metadata from the updated 
data with latest column range metadata of affected partition computed above
+      partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
+          
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata),
 true, Option.of(indexDefinition.getIndexName())));
+    } else {
+      exprIndexPartitionStatUpdates = null;
+    }

Review Comment:
   this is not required



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2499,14 +2535,36 @@ private static 
List<HoodieColumnRangeMetadata<Comparable>> getFileStatsRangeMeta
     return readColumnRangeMetadataFrom(partitionPath, fileName, 
datasetMetaClient, columnsToIndex, maxBufferSize);
   }
 
-  public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
-                                                                               
 HoodieEngineContext engineContext,
-                                                                               
 HoodieTableMetaClient dataMetaClient,
-                                                                               
 HoodieMetadataConfig metadataConfig) {
+  public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>> columnRangeMetadataPartitionPair,
+                                                                               
 HoodieTableMetaClient dataMetaClient) {
+    return 
convertMetadataToPartitionStatsRecords(columnRangeMetadataPartitionPair.flatMapValues(List::iterator),
+        Option.empty(), isShouldScanColStatsForTightBound(dataMetaClient));
+  }
+
+  public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataPartitionPair,
+                                                                               
 Option<String> indexPartitionOpt, boolean isTightBound) {
+    try {
+      return columnRangeMetadataPartitionPair
+          .groupByKey()
+          .map(pair -> {
+            final String partitionName = pair.getLeft();
+            return collectAndProcessColumnMetadata(pair.getRight(), 
partitionName, isTightBound, indexPartitionOpt);
+          })
+          .flatMap(recordStream -> recordStream.iterator());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table", e);
+    }
+  }
+
+  public static HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>> 
convertMetadataToPartitionStatsColumnRangeMetadata(HoodieCommitMetadata 
commitMetadata,
+                                                                               
                                                        HoodieEngineContext 
engineContext,
+                                                                               
                                                        HoodieTableMetaClient 
dataMetaClient,
+                                                                               
                                                        HoodieTableMetadata 
tableMetadata,
+                                                                               
                                                        HoodieMetadataConfig 
metadataConfig) {
     List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream).collect(Collectors.toList());
     if (allWriteStats.isEmpty()) {
-      return engineContext.emptyHoodieData();
+      return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new 
ArrayList<>()));

Review Comment:
   This is not an empty pair. Given that you're eventually flat-mapping only 
the values from the pair data, why not simply flatten and send HoodieData 
instead of pair from this method itself? I don't see the key of the pair being 
used anywhere later.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -397,7 +395,10 @@ public static Map<String, HoodieData<HoodieRecord>> 
convertMetadataToRecords(Hoo
     if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS)) 
{
       
checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient),
           "Column stats partition must be enabled to generate partition stats. 
Please enable: " + 
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
-      final HoodieData<HoodieRecord> partitionStatsRDD = 
convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient, 
metadataConfig);
+      // Generate Hoodie Pair data of partition name and list of column range 
metadata for all the files in that partition
+      HoodiePairData<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnRangeMetadata = 
convertMetadataToPartitionStatsColumnRangeMetadata(commitMetadata, context,
+          dataMetaClient, tableMetadata, metadataConfig);
+      final HoodieData<HoodieRecord> partitionStatsRDD = 
convertMetadataToPartitionStatsRecords(columnRangeMetadata, dataMetaClient);

Review Comment:
   Can this be fused? I see that the two methods are being used only for 
partition stats index.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord> 
records, Schema schema, Hoodi
         .collect(Collectors.toList());
     return avroRecords;
   }
+
+  /**
+   * Generates expression index records
+   * @param partitionFilePathAndSizeTriplet Triplet of file path, file size 
and partition name to which file belongs
+   * @param indexDefinition Hoodie Index Definition for the expression index 
for which records need to be generated
+   * @param metaClient Hoodie Table Meta Client
+   * @param parallelism Parallelism to use for engine operations
+   * @param readerSchema Schema of reader
+   * @param instantTime Instant time
+   * @param engineContext HoodieEngineContext
+   * @param dataWriteConfig Write Config for the data table
+   * @param metadataWriteConfig Write config for the metadata table
+   * @param partitionRecordsFunctionOpt Function used to generate partition 
stat records for the EI. It takes the column range metadata generated for the 
provided partition files as input
+   *                                    and uses those to generate the final 
partition stats
+   * @return ExpressionIndexComputationMetadata containing both EI column stat 
records and partition stat records if partitionRecordsFunctionOpt is provided
+   */
+  @SuppressWarnings("checkstyle:LineLength")
+  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+                                                                       
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String 
instantTime,
+                                                                       
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, 
HoodieWriteConfig metadataWriteConfig,
+                                                                       
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, 
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
+    if (indexDefinition.getSourceFields().isEmpty()) {
+      // In case there are no columns to index, bail
+      return new 
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+    }
+
+    // NOTE: We are assuming that the index expression is operating on a 
single column
+    //       HUDI-6994 will address this.
+    String columnToIndex = indexDefinition.getSourceFields().get(0);
+    SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+    // Read records and append expression index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
+          String partition = entry.getKey();
+          Pair<String, Long> filePathSizePair = entry.getValue();
+          String filePath = filePathSizePair.getKey();
+          String relativeFilePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(filePath));
+          long fileSize = filePathSizePair.getValue();
+          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+          List<Row> rowsWithIndexMetadata = 
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, 
relativeFilePath, fileSize);
+          return rowsWithIndexMetadata.iterator();
+        });
+
+    // Generate dataset with expression index metadata
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
 DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
+    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
+
+    // Apply expression index and generate the column to index
+    HoodieExpressionIndex<Column, Column> expressionIndex =
+        new HoodieSparkExpressionIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
+    Column indexedColumn = 
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+    rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+    // Generate expression index records
+    if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+      return getExpressionIndexRecordsUsingColumnStats(rowDataset, 
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+    } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
+      return new 
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
 columnToIndex, metadataWriteConfig, instantTime, 
indexDefinition.getIndexName()));
+    } else {
+      throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
+    }
+  }
+
+  /**
+   * Fetches column range metadata from the EI partition for all the partition 
files impacted by the commit. This would only take into account completed 
commits for the partitions
+   * since EI updates have not yet been committed.
+   *
+   * @param commitMetadata Hoodie commit metadata
+   * @param indexPartition Partition name for the expression index
+   * @param engineContext Hoodie engine context
+   * @param dataMetaClient Data table meta client
+   * @param metadataConfig Hoodie metadata config
+   *
+   * @return HoodiePairData of partition name and list of column range 
metadata for the partitions
+   */
+  public static HoodiePairData<String, 
List<List<HoodieColumnRangeMetadata<Comparable>>>> 
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata, 
String indexPartition,
+                                                                               
                                                  HoodieEngineContext 
engineContext,
+                                                                               
                                                  HoodieTableMetaClient 
dataMetaClient,
+                                                                               
                                                  HoodieMetadataConfig 
metadataConfig) {
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new 
ArrayList<>()));
+    }
+
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, 
dataMetaClient);
+    List<String> columnsToIndex = 
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+    try {
+      Option<Schema> writerSchema =
+          
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+              .flatMap(writerSchemaStr ->
+                  isNullOrEmpty(writerSchemaStr)
+                      ? Option.empty()
+                      : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+      HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+      Option<Schema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+      Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+      // filter columns with only supported types
+      final List<String> validColumnsToIndex = columnsToIndex.stream()
+          .filter(col -> 
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) || 
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col, 
writerSchemaOpt.get().get()))
+          .collect(Collectors.toList());
+      if (validColumnsToIndex.isEmpty()) {
+        return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new 
ArrayList<>()));
+      }
+      LOG.debug("Indexing following columns for partition stats index: {}", 
validColumnsToIndex);
+      // Group by partitionPath and then gather write stats lists,
+      // where each inner list contains HoodieWriteStat objects that have the 
same partitionPath.
+      List<List<HoodieWriteStat>> partitionedWriteStats = 
allWriteStats.stream()
+          .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+          .values()
+          .stream()
+          .collect(Collectors.toList());
+
+      int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(), 
metadataConfig, dataMetaClient.getBasePath().toString());
+      return engineContext.parallelize(partitionedWriteStats, 
parallelism).mapToPair(partitionedWriteStat -> {
+        final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
+        checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
+        // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
+        // Get all file names, including log files, in a set from the file 
slices
+        Set<String> fileNames = 
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
 Option.empty(), partitionName).stream()
+            .flatMap(fileSlice -> Stream.concat(
+                
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+                fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+            .filter(Objects::nonNull)
+            .collect(Collectors.toSet());
+        // Fetch EI column stat records for above files
+        List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+            
tableMetadata.getRecordsByKeyPrefixes(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
 partitionName), indexPartition, false)
+                // schema and properties are ignored in getInsertValue, so 
simply pass as null
+                .map(record -> record.getData().getInsertValue(null, null))
+                .filter(Option::isPresent)
+                .map(data -> ((HoodieMetadataRecord) 
data.get()).getColumnStatsMetadata())
+                .filter(stats -> fileNames.contains(stats.getFileName()))
+                .map(HoodieColumnRangeMetadata::fromColumnStats)
+                .collectAsList();

Review Comment:
   Ok, i would also suggest to add the algorithm here in simple plain english 
in the Javadoc. WOuld be much easier to follow in future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to