codope commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1905261283


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1091,13 +1102,23 @@ public void update(HoodieCommitMetadata commitMetadata, 
String instantTime) {
               enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
               dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getWritesFileIdEncoding());
 
+      Option<HoodiePairData<String, 
List<List<HoodieColumnRangeMetadata<Comparable>>>>> 
partitionRangeMetadataPartitionPairOpt = Option.empty();
+      if 
(enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS)) {
+        
checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient),
+            "Column stats partition must be enabled to generate partition 
stats. Please enable: " + 
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
+        partitionRangeMetadataPartitionPairOpt = 
Option.of(convertMetadataToPartitionStatsColumnRangeMetadata(commitMetadata, 
engineContext, dataMetaClient, dataWriteConfig.getMetadataConfig()));

Review Comment:
   So, this is getting called only if 
`enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS)`. In 
this case, can we reuse the output of 
`convertMetadataToPartitionStatsColumnRangeMetadata` if we ensure that records 
for partition stats are computed before expression index?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +269,128 @@ private static List<Row> toRows(List<HoodieRecord> 
records, Schema schema, Hoodi
         .collect(Collectors.toList());
     return avroRecords;
   }
+
+  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+                                                                       
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String 
instantTime, boolean fetchCachedColumnMetadata,
+                                                                       
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, 
HoodieWriteConfig metadataWriteConfig) {
+    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
+    if (indexDefinition.getSourceFields().isEmpty()) {
+      // In case there are no columns to index, bail
+      return new 
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+    }
+
+    // NOTE: We are assuming that the index expression is operating on a 
single column
+    //       HUDI-6994 will address this.
+    String columnToIndex = indexDefinition.getSourceFields().get(0);
+    SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+    // Read records and append expression index metadata to every row
+    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
+          String partition = entry.getKey();
+          Pair<String, Long> filePathSizePair = entry.getValue();
+          String filePath = filePathSizePair.getKey();
+          String relativeFilePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(filePath));
+          long fileSize = filePathSizePair.getValue();
+          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+          List<Row> rowsWithIndexMetadata = 
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, 
relativeFilePath, fileSize);
+          return rowsWithIndexMetadata.iterator();
+        });
+
+    // Generate dataset with expression index metadata
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
 DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
+    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
+
+    // Apply expression index and generate the column to index
+    HoodieExpressionIndex<Column, Column> expressionIndex =
+        new HoodieSparkExpressionIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
+    Column indexedColumn = 
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+    rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+    // Generate expression index records
+    if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+      return getExpressionIndexRecordsUsingColumnStats(rowDataset, 
expressionIndex, columnToIndex, fetchCachedColumnMetadata);
+    } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
+      return new 
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
 columnToIndex, metadataWriteConfig, instantTime, 
indexDefinition.getIndexName()));
+    } else {
+      throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
+    }
+  }
+
+  public static HoodiePairData<String, 
List<List<HoodieColumnRangeMetadata<Comparable>>>> 
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition,

Review Comment:
   javadoc



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +157,45 @@ public void deletePartitions(String instantTime, 
List<MetadataPartitionType> par
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
+  @Override
+  protected HoodieData<HoodieRecord> 
getExpressionIndexUpdates(Option<HoodiePairData<String, 
List<List<HoodieColumnRangeMetadata<Comparable>>>>> 
partitionRangeMetadataPairOpt,

Review Comment:
   I don't see `partitionRangeMetadataPairOpt` getting used.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +269,128 @@ private static List<Row> toRows(List<HoodieRecord> 
records, Schema schema, Hoodi
         .collect(Collectors.toList());
     return avroRecords;
   }
+
+  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,

Review Comment:
   please add javadoc for this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to