lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1905336749
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +269,128 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime, boolean fetchCachedColumnMetadata,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, fetchCachedColumnMetadata);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition,
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]