lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1907316031


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime, 
List<MetadataPartitionType> par
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
+  /**
+   * Loads the file slices touched by the commit due to given instant time and 
returns the records for the expression index.
+   * This generates partition stat record updates along with EI column stat 
update records. Partition stat record updates are generated
+   * by reloading the affected partitions column range metadata from EI and 
then merging it with partition stat record from the updated data.
+   *
+   * @param commitMetadata {@code HoodieCommitMetadata}
+   * @param indexPartition partition name of the expression index
+   * @param instantTime    timestamp at of the current update commit
+   */
+  @Override
+  protected HoodieData<HoodieRecord> 
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
+    HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+    boolean isColumnStatEI = 
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+    Option<Function<HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> 
partitionRecordsFunctionOpt = Option.empty();
+    final HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
exprIndexPartitionStatUpdates;
+    if (isColumnStatEI) {
+      // Fetch column range metadata for affected partitions in the commit
+      exprIndexPartitionStatUpdates =
+          
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata, 
indexPartition,
+                  engineContext, dataMetaClient, 
dataWriteConfig.getMetadataConfig())
+              .flatMapValues(list -> 
list.stream().flatMap(List::stream).iterator());
+      // The function below merges the column range metadata from the updated 
data with latest column range metadata of affected partition computed above
+      partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
+          
HoodieTableMetadataUtil.collectAndProcessEIPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata),
 true, Option.of(indexDefinition.getIndexName())));

Review Comment:
   I dont there is a way to avoid it. Both the datasets are computed in a 
different manner - one by reading EI partittion to get existing column stats 
and another by computing EI records on new files



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord> 
records, Schema schema, Hoodi
         .collect(Collectors.toList());
     return avroRecords;
   }
+
+  /**
+   * Generates expression index records
+   * @param partitionFilePathAndSizeTriplet Triplet of file path, file size 
and partition name to which file belongs
+   * @param indexDefinition Hoodie Index Definition for the expression index 
for which records need to be generated
+   * @param metaClient Hoodie Table Meta Client
+   * @param parallelism Parallelism to use for engine operations
+   * @param readerSchema Schema of reader
+   * @param instantTime Instant time
+   * @param engineContext HoodieEngineContext
+   * @param dataWriteConfig Write Config for the data table
+   * @param metadataWriteConfig Write config for the metadata table
+   * @param partitionRecordsFunctionOpt Function used to generate partition 
stat records for the EI. It takes the column range metadata generated for the 
provided partition files as input
+   *                                    and uses those to generate the final 
partition stats
+   * @return ExpressionIndexComputationMetadata containing both EI column stat 
records and partition stat records if partitionRecordsFunctionOpt is provided
+   */
+  @SuppressWarnings("checkstyle:LineLength")
+  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+                                                                       
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String 
instantTime,
+                                                                       
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, 
HoodieWriteConfig metadataWriteConfig,
+                                                                       
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, 
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
+    if (indexDefinition.getSourceFields().isEmpty()) {
+      // In case there are no columns to index, bail
+      return new 
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+    }
+
+    // NOTE: We are assuming that the index expression is operating on a 
single column
+    //       HUDI-6994 will address this.
+    String columnToIndex = indexDefinition.getSourceFields().get(0);

Review Comment:
   Addressed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime, 
List<MetadataPartitionType> par
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
+  /**
+   * Loads the file slices touched by the commit due to given instant time and 
returns the records for the expression index.
+   * This generates partition stat record updates along with EI column stat 
update records. Partition stat record updates are generated
+   * by reloading the affected partitions column range metadata from EI and 
then merging it with partition stat record from the updated data.
+   *
+   * @param commitMetadata {@code HoodieCommitMetadata}
+   * @param indexPartition partition name of the expression index
+   * @param instantTime    timestamp at of the current update commit
+   */
+  @Override
+  protected HoodieData<HoodieRecord> 
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
+    HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+    boolean isColumnStatEI = 
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+    Option<Function<HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> 
partitionRecordsFunctionOpt = Option.empty();
+    final HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
exprIndexPartitionStatUpdates;
+    if (isColumnStatEI) {
+      // Fetch column range metadata for affected partitions in the commit
+      exprIndexPartitionStatUpdates =
+          
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata, 
indexPartition,
+                  engineContext, dataMetaClient, 
dataWriteConfig.getMetadataConfig())
+              .flatMapValues(list -> 
list.stream().flatMap(List::stream).iterator());
+      // The function below merges the column range metadata from the updated 
data with latest column range metadata of affected partition computed above
+      partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
+          
HoodieTableMetadataUtil.collectAndProcessEIPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata),
 true, Option.of(indexDefinition.getIndexName())));
+    } else {
+      exprIndexPartitionStatUpdates = null;
+    }
+
+    List<Pair<String, Pair<String, Long>>> partitionFilePathPairs = new 
ArrayList<>();

Review Comment:
   Addressed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord> 
records, Schema schema, Hoodi
         .collect(Collectors.toList());
     return avroRecords;
   }
+
+  /**
+   * Generates expression index records
+   * @param partitionFilePathAndSizeTriplet Triplet of file path, file size 
and partition name to which file belongs
+   * @param indexDefinition Hoodie Index Definition for the expression index 
for which records need to be generated
+   * @param metaClient Hoodie Table Meta Client
+   * @param parallelism Parallelism to use for engine operations
+   * @param readerSchema Schema of reader
+   * @param instantTime Instant time
+   * @param engineContext HoodieEngineContext
+   * @param dataWriteConfig Write Config for the data table
+   * @param metadataWriteConfig Write config for the metadata table
+   * @param partitionRecordsFunctionOpt Function used to generate partition 
stat records for the EI. It takes the column range metadata generated for the 
provided partition files as input
+   *                                    and uses those to generate the final 
partition stats
+   * @return ExpressionIndexComputationMetadata containing both EI column stat 
records and partition stat records if partitionRecordsFunctionOpt is provided
+   */
+  @SuppressWarnings("checkstyle:LineLength")
+  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+                                                                       
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String 
instantTime,
+                                                                       
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, 
HoodieWriteConfig metadataWriteConfig,
+                                                                       
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, 
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
+    if (indexDefinition.getSourceFields().isEmpty()) {
+      // In case there are no columns to index, bail
+      return new 
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+    }
+
+    // NOTE: We are assuming that the index expression is operating on a 
single column
+    //       HUDI-6994 will address this.
+    String columnToIndex = indexDefinition.getSourceFields().get(0);
+    SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+    // Read records and append expression index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
+          String partition = entry.getKey();
+          Pair<String, Long> filePathSizePair = entry.getValue();
+          String filePath = filePathSizePair.getKey();
+          String relativeFilePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(filePath));
+          long fileSize = filePathSizePair.getValue();
+          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+          List<Row> rowsWithIndexMetadata = 
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, 
relativeFilePath, fileSize);
+          return rowsWithIndexMetadata.iterator();
+        });
+
+    // Generate dataset with expression index metadata
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
 DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
+    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
+
+    // Apply expression index and generate the column to index
+    HoodieExpressionIndex<Column, Column> expressionIndex =
+        new HoodieSparkExpressionIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
+    Column indexedColumn = 
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+    rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+    // Generate expression index records
+    if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+      return getExpressionIndexRecordsUsingColumnStats(rowDataset, 
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+    } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
+      return new 
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
 columnToIndex, metadataWriteConfig, instantTime, 
indexDefinition.getIndexName()));

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to