codope commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1905559715
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.empty(), partitionName).stream()
Review Comment:
this is going to load filesystem view repeatedly. Perhaps, we can load once
before launching executors.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
Review Comment:
1. Close `HoodieTableMetadata`?
2. Instead of creating everytime per index partition update, can we not load
in `HoodieBackedTableMetadataWriter` just once, and pass it here.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
Review Comment:
Since it is eager anyway, you can remove it and use `tableSchema.orElseThrow`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
+ /**
+ * Loads the file slices touched by the commit due to given instant time and
returns the records for the expression index.
+ * This generates partition stat record updates along with EI column stat
update records. Partition stat record updates are generated
+ * by reloading the affected partitions column range metadata from EI and
then merging it with partition stat record from the updated data.
+ *
+ * @param commitMetadata {@code HoodieCommitMetadata}
+ * @param indexPartition partition name of the expression index
+ * @param instantTime timestamp at of the current update commit
+ */
+ @Override
+ protected HoodieData<HoodieRecord>
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
+ HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+ boolean isColumnStatEI =
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+ Option<Function<HoodiePairData<String,
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>
partitionRecordsFunctionOpt = Option.empty();
+ final HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>
exprIndexPartitionStatUpdates;
+ if (isColumnStatEI) {
+ // Fetch column range metadata for affected partitions in the commit
+ exprIndexPartitionStatUpdates =
+
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata,
indexPartition,
+ engineContext, dataMetaClient,
dataWriteConfig.getMetadataConfig())
+ .flatMapValues(list ->
list.stream().flatMap(List::stream).iterator());
Review Comment:
I guess if you just return `Pair<String, List<HoodieColumnRangeMetadata>>`
from `getExpressionIndexPartitionStatUpdates`, then no need to use
`flatMapValues`?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.empty(), partitionName).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch EI column stat records for above files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
partitionName), indexPartition, false)
+ // schema and properties are ignored in getInsertValue, so
simply pass as null
+ .map(record -> record.getData().getInsertValue(null, null))
+ .filter(Option::isPresent)
+ .map(data -> ((HoodieMetadataRecord)
data.get()).getColumnStatsMetadata())
+ .filter(stats -> fileNames.contains(stats.getFileName()))
+ .map(HoodieColumnRangeMetadata::fromColumnStats)
+ .collectAsList();
+ List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
new ArrayList<>();
Review Comment:
What's the outer list for? Please add a comment.
As per my understanding, each executor is simply collecting file stats for
all files in the partition, so shouldn't Pair<String,
List<HoodieColumnRangeMetadata>> be sufficient?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java:
##########
@@ -72,4 +76,27 @@ public Column apply(List<Column> orderedSourceValues) {
sparkFunction.validateOptions(options);
return sparkFunction.apply(orderedSourceValues, options);
}
+
+ public static class ExpressionIndexComputationMetadata {
Review Comment:
What's the need for this class? Just for composition, is it?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
Review Comment:
let's add a `checkArgument` validation too.
##########
hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java:
##########
@@ -35,6 +35,8 @@ public interface HoodieExpressionIndex<S, T> extends
Serializable {
String HOODIE_EXPRESSION_INDEX_PARTITION =
"_hoodie_expression_index_partition";
String HOODIE_EXPRESSION_INDEX_FILE_SIZE =
"_hoodie_expression_index_file_size";
+ String HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX = "_partition_stat_";
Review Comment:
To be consistent with `__all_partitions__` naming convention
```suggestion
String HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX =
"__partition_stat__";
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
+ /**
+ * Loads the file slices touched by the commit due to given instant time and
returns the records for the expression index.
+ * This generates partition stat record updates along with EI column stat
update records. Partition stat record updates are generated
+ * by reloading the affected partitions column range metadata from EI and
then merging it with partition stat record from the updated data.
+ *
+ * @param commitMetadata {@code HoodieCommitMetadata}
+ * @param indexPartition partition name of the expression index
+ * @param instantTime timestamp at of the current update commit
+ */
+ @Override
+ protected HoodieData<HoodieRecord>
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
+ HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+ boolean isColumnStatEI =
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+ Option<Function<HoodiePairData<String,
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>
partitionRecordsFunctionOpt = Option.empty();
+ final HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>
exprIndexPartitionStatUpdates;
+ if (isColumnStatEI) {
+ // Fetch column range metadata for affected partitions in the commit
+ exprIndexPartitionStatUpdates =
+
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata,
indexPartition,
+ engineContext, dataMetaClient,
dataWriteConfig.getMetadataConfig())
+ .flatMapValues(list ->
list.stream().flatMap(List::stream).iterator());
+ // The function below merges the column range metadata from the updated
data with latest column range metadata of affected partition computed above
+ partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
+
HoodieTableMetadataUtil.collectAndProcessEIPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata),
true, Option.of(indexDefinition.getIndexName())));
+ } else {
+ exprIndexPartitionStatUpdates = null;
+ }
+
+ List<Pair<String, Pair<String, Long>>> partitionFilePathPairs = new
ArrayList<>();
Review Comment:
please document step-by-step in the code what's going on after every
transformation. It will be easier to follow while making any changes even in
future.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2423,18 +2427,53 @@ public HoodieRecord next() {
private static Stream<HoodieRecord> collectAndProcessColumnMetadata(
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata,
String partitionPath, boolean isTightBound) {
+ return collectAndProcessColumnMetadata(partitionPath, isTightBound,
Option.empty(), fileColumnMetadata.stream().flatMap(List::stream));
+ }
- // Step 1: Flatten and Group by Column Name
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap
= fileColumnMetadata.stream()
- .flatMap(List::stream)
-
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
Collectors.toList()));
+ private static Stream<HoodieRecord>
collectAndProcessColumnMetadata(Iterable<HoodieColumnRangeMetadata<Comparable>>
fileColumnMetadataIterable, String partitionPath,
+ boolean
isTightBound, Option<String> indexPartitionOpt) {
- // Step 2: Aggregate Column Ranges
+ List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = new
ArrayList<>();
+ fileColumnMetadataIterable.forEach(fileColumnMetadata::add);
+ // Group by Column Name
+ return collectAndProcessColumnMetadata(partitionPath, isTightBound,
indexPartitionOpt, fileColumnMetadata.stream());
+ }
+
+ private static Stream<HoodieRecord> collectAndProcessColumnMetadata(String
partitionPath, boolean isTightBound, Option<String> indexPartitionOpt,
+
Stream<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata) {
+ // Group by Column Name
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap
=
+
fileColumnMetadata.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
Collectors.toList()));
+
+ // Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata
= columnMetadataMap.entrySet().stream()
- .map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
+ .map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath,
entry.getValue()));
// Create Partition Stats Records
- return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound);
+ return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound,
indexPartitionOpt);
+ }
+
+ public static HoodieData<HoodieRecord>
collectAndProcessEIPartitionStatRecords(
Review Comment:
rename to `collectAndProcessExprIndexPartitionStatRecords` and add a UT
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -330,9 +330,19 @@ case class HoodieFileIndex(spark: SparkSession,
// fall back to listing all partitions
case _: HoodieException => (false,
listMatchingPartitionPaths(Seq.empty))
}
+ } else if (isExpressionIndexEnabled) {
+ val expressionIndexSupport = new ExpressionIndexSupport(spark,
schema, metadataConfig, metaClient)
Review Comment:
is there a way to tell `HoodieFileIndex` that EI was used in partition
pruning, and if so, then avoid instantiating `ExpressionIndexSupport` twice?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala:
##########
@@ -92,6 +95,47 @@ class ExpressionIndexSupport(spark: SparkSession,
}
}
+ def prunePartitions(fileIndex: HoodieFileIndex,
Review Comment:
let's call this directly in a test and validate positive and negative
scenario
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
+ /**
+ * Loads the file slices touched by the commit due to given instant time and
returns the records for the expression index.
+ * This generates partition stat record updates along with EI column stat
update records. Partition stat record updates are generated
+ * by reloading the affected partitions column range metadata from EI and
then merging it with partition stat record from the updated data.
+ *
+ * @param commitMetadata {@code HoodieCommitMetadata}
+ * @param indexPartition partition name of the expression index
+ * @param instantTime timestamp at of the current update commit
+ */
+ @Override
+ protected HoodieData<HoodieRecord>
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
+ HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+ boolean isColumnStatEI =
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
Review Comment:
rename to `isExprIndexUsingColumnStats`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.empty(), partitionName).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch EI column stat records for above files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
partitionName), indexPartition, false)
+ // schema and properties are ignored in getInsertValue, so
simply pass as null
+ .map(record -> record.getData().getInsertValue(null, null))
+ .filter(Option::isPresent)
+ .map(data -> ((HoodieMetadataRecord)
data.get()).getColumnStatsMetadata())
+ .filter(stats -> fileNames.contains(stats.getFileName()))
+ .map(HoodieColumnRangeMetadata::fromColumnStats)
+ .collectAsList();
Review Comment:
so this will be collected in executor memory right.. is there a way to avoid
it? I guess should be mostly ok because this is per partition per file, but
still if possible let's see if we can avoid any collection.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
Review Comment:
Avoid multiple `.stream()` and group stats in a single pass
```suggestion
// Group write stats by partition path in a single pass
Map<String, List<HoodieWriteStat>> partitionedWriteStatsMap =
allWriteStats.stream()
.collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath));
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
Review Comment:
UT
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
Review Comment:
please add UT for this method
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
Review Comment:
extract the logic inside flatmap in a separate method
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
Review Comment:
do `new ExpressionIndexComputationMetadata` inside
`getExpressionIndexRecordsUsingBloomFilter` for consistency
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -397,7 +399,9 @@ public static Map<String, HoodieData<HoodieRecord>>
convertMetadataToRecords(Hoo
if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS))
{
checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient),
"Column stats partition must be enabled to generate partition stats.
Please enable: " +
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
- final HoodieData<HoodieRecord> partitionStatsRDD =
convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient,
metadataConfig);
+ HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>> partitionRangeMetadata =
convertMetadataToPartitionStatsColumnRangeMetadata(commitMetadata, context,
Review Comment:
let's add a comment here about what this pair data is
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
+ /**
+ * Loads the file slices touched by the commit due to given instant time and
returns the records for the expression index.
+ * This generates partition stat record updates along with EI column stat
update records. Partition stat record updates are generated
+ * by reloading the affected partitions column range metadata from EI and
then merging it with partition stat record from the updated data.
+ *
+ * @param commitMetadata {@code HoodieCommitMetadata}
+ * @param indexPartition partition name of the expression index
+ * @param instantTime timestamp at of the current update commit
+ */
+ @Override
+ protected HoodieData<HoodieRecord>
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
+ HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+ boolean isColumnStatEI =
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+ Option<Function<HoodiePairData<String,
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>
partitionRecordsFunctionOpt = Option.empty();
+ final HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>
exprIndexPartitionStatUpdates;
+ if (isColumnStatEI) {
+ // Fetch column range metadata for affected partitions in the commit
+ exprIndexPartitionStatUpdates =
+
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata,
indexPartition,
+ engineContext, dataMetaClient,
dataWriteConfig.getMetadataConfig())
+ .flatMapValues(list ->
list.stream().flatMap(List::stream).iterator());
+ // The function below merges the column range metadata from the updated
data with latest column range metadata of affected partition computed above
+ partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
+
HoodieTableMetadataUtil.collectAndProcessEIPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata),
true, Option.of(indexDefinition.getIndexName())));
Review Comment:
We should be mindful of repated union. It can be expensive as it creates new
dataset every time. Let's see if we can avoid union.
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -191,6 +192,11 @@ public <W> HoodiePairData<K, Pair<V, Option<W>>>
leftOuterJoin(HoodiePairData<K,
return new HoodieListPairData<>(leftOuterJoined, lazy);
}
+ @Override
+ public HoodiePairData<K, V> union(HoodiePairData<K, V> other) {
+ throw new UnsupportedOperationException("Operation is not supported yet");
Review Comment:
Can't we just do Stream.concat? Something like:
```
@Override
public HoodiePairData<K, V> union(HoodiePairData<K, V> other) {
ValidationUtils.checkArgument(other instanceof HoodieListPairData);
Stream<Pair<K, V>> unionStream = Stream.concat(asStream(),
((HoodieListPairData<K, V>) other).asStream());
return new HoodieListPairData<>(unionStream, lazy);
}
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala:
##########
@@ -92,6 +95,47 @@ class ExpressionIndexSupport(spark: SparkSession,
}
}
+ def prunePartitions(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String]):
Option[Set[String]] = {
+ lazy val expressionIndexPartitionOpt =
getExpressionIndexPartitionAndLiterals(queryFilters)
+ if (isIndexAvailable && queryFilters.nonEmpty &&
expressionIndexPartitionOpt.nonEmpty) {
+ val (indexPartition, expressionIndexQuery, _) =
expressionIndexPartitionOpt.get
+ val indexDefinition =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+ if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
{
+ val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
+ val expressionIndexRecords =
loadExpressionIndexPartitionStatRecords(indexDefinition, readInMemory)
+ loadTransposed(queryReferencedColumns, readInMemory,
expressionIndexRecords, expressionIndexQuery) {
+ transposedPartitionStatsDF => {
+ val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+ if (allPartitions.nonEmpty) {
+ // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has covered
the case where the
+ // column in a filter does not have the stats available,
by making sure such a
+ // filter does not prune any partition.
+ val indexSchema = transposedPartitionStatsDF.schema
+ val indexFilter =
Seq(expressionIndexQuery).map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema, isExpressionIndex = true)).reduce(And)
+ Some(transposedPartitionStatsDF.where(new Column(indexFilter))
+ .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
Review Comment:
I guess this is a remnant of partitions stats index support - `collect`
called on same `transposedPartitionStatsDF` repeatedly? Also, is this on
driver? Should we cache first?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala:
##########
@@ -875,6 +875,209 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
}
}
+ /**
+ * Test expression index partition pruning with partition stats.
+ */
+ @Test
+ def testPartitionPruningWithPartitionStats(): Unit = {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName +
s"_pruning_partition_filters_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30
01:30:40', '2020-11-30', 'san_francisco','california'),
+ | (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30
01:30:40', '2021-11-30', 'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30
01:30:40', '2022-11-30', 'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30
01:30:40', '2023-11-30', 'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
+
+ val tableSchema: StructType =
+ StructType(
+ Seq(
+ StructField("ts", LongType),
+ StructField("id", StringType),
+ StructField("rider", StringType),
+ StructField("driver", StringType),
+ StructField("fare", DoubleType),
+ StructField("dateDefault", StringType),
+ StructField("date", StringType),
+ StructField("city", StringType),
+ StructField("state", StringType)
+ )
+ )
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+ spark.sql(s"create index idx_ts on $tableName using column_stats(ts)
options(expr='from_unixtime', format='yyyy-MM-dd')")
+ var metaClient = createMetaClient(spark, basePath)
+ // validate skipping with both types of expression
+ val fromUnixTimeExpr = resolveExpr(spark,
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get,
tableSchema)
+ var literal = Literal.create("2023-11-07")
+ var dataFilter = EqualTo(fromUnixTimeExpr, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+ spark.sql(s"drop index idx_ts on $tableName")
+
+ spark.sql(s"create index idx_unix on $tableName using
column_stats(date) options(expr='unix_timestamp', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val unixTimestamp = resolveExpr(spark,
unapply(functions.unix_timestamp(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ literal = Literal.create(1732924800L)
+ dataFilter = EqualTo(unixTimestamp, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+ spark.sql(s"drop index idx_unix on $tableName")
+
+ spark.sql(s"create index idx_to_date on $tableName using
column_stats(date) options(expr='to_date', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val toDate = resolveExpr(spark,
unapply(functions.to_date(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ dataFilter = EqualTo(toDate, lit(18230).expr)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+ spark.sql(s"drop index idx_to_date on $tableName")
+ }
+ }
+ }
+
+ /**
+ * Test expression index pruning after update with partition stats.
+ */
+ @Test
+ def testPartitionPruningAfterUpdateWithPartitionStats(): Unit = {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val isTableMOR = tableType.equals("mor")
+ val tableName = generateTableName +
s"_pruning_partition_filters_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30
01:30:40', '2020-11-30', 'san_francisco','california'),
+ | (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30
01:30:40', '2021-11-30', 'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30
01:30:40', '2022-11-30', 'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30
01:30:40', '2023-11-30', 'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-B','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-D','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
+
+ val tableSchema: StructType =
+ StructType(
+ Seq(
+ StructField("ts", LongType),
+ StructField("id", StringType),
+ StructField("rider", StringType),
+ StructField("driver", StringType),
+ StructField("fare", DoubleType),
+ StructField("dateDefault", StringType),
+ StructField("date", StringType),
+ StructField("city", StringType),
+ StructField("state", StringType)
+ )
+ )
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+ spark.sql(s"create index idx_rider on $tableName using
column_stats(rider) options(expr='upper')")
+ var metaClient = createMetaClient(spark, basePath)
+ // validate skipping with both types of expression
+ val riderExpr = resolveExpr(spark,
unapply(functions.upper(functions.col("rider"))).get, tableSchema)
+ var literal = Literal.create("RIDER-A")
+ var dataFilter = EqualTo(riderExpr, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
Review Comment:
additionally, can we also validate the EI records themselves? Especially,
the partition stat records in EI.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala:
##########
@@ -875,6 +875,209 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
}
}
+ /**
+ * Test expression index partition pruning with partition stats.
+ */
+ @Test
+ def testPartitionPruningWithPartitionStats(): Unit = {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName +
s"_pruning_partition_filters_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30
01:30:40', '2020-11-30', 'san_francisco','california'),
+ | (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30
01:30:40', '2021-11-30', 'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30
01:30:40', '2022-11-30', 'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30
01:30:40', '2023-11-30', 'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
+
+ val tableSchema: StructType =
+ StructType(
+ Seq(
+ StructField("ts", LongType),
+ StructField("id", StringType),
+ StructField("rider", StringType),
+ StructField("driver", StringType),
+ StructField("fare", DoubleType),
+ StructField("dateDefault", StringType),
+ StructField("date", StringType),
+ StructField("city", StringType),
+ StructField("state", StringType)
+ )
+ )
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+ spark.sql(s"create index idx_ts on $tableName using column_stats(ts)
options(expr='from_unixtime', format='yyyy-MM-dd')")
+ var metaClient = createMetaClient(spark, basePath)
+ // validate skipping with both types of expression
+ val fromUnixTimeExpr = resolveExpr(spark,
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get,
tableSchema)
+ var literal = Literal.create("2023-11-07")
+ var dataFilter = EqualTo(fromUnixTimeExpr, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+ spark.sql(s"drop index idx_ts on $tableName")
+
+ spark.sql(s"create index idx_unix on $tableName using
column_stats(date) options(expr='unix_timestamp', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val unixTimestamp = resolveExpr(spark,
unapply(functions.unix_timestamp(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ literal = Literal.create(1732924800L)
+ dataFilter = EqualTo(unixTimestamp, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+ spark.sql(s"drop index idx_unix on $tableName")
+
+ spark.sql(s"create index idx_to_date on $tableName using
column_stats(date) options(expr='to_date', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val toDate = resolveExpr(spark,
unapply(functions.to_date(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ dataFilter = EqualTo(toDate, lit(18230).expr)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+ spark.sql(s"drop index idx_to_date on $tableName")
+ }
+ }
+ }
+
+ /**
+ * Test expression index pruning after update with partition stats.
+ */
+ @Test
+ def testPartitionPruningAfterUpdateWithPartitionStats(): Unit = {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val isTableMOR = tableType.equals("mor")
+ val tableName = generateTableName +
s"_pruning_partition_filters_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30
01:30:40', '2020-11-30', 'san_francisco','california'),
+ | (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30
01:30:40', '2021-11-30', 'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30
01:30:40', '2022-11-30', 'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30
01:30:40', '2023-11-30', 'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-B','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-D','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
+
+ val tableSchema: StructType =
+ StructType(
+ Seq(
+ StructField("ts", LongType),
+ StructField("id", StringType),
+ StructField("rider", StringType),
+ StructField("driver", StringType),
+ StructField("fare", DoubleType),
+ StructField("dateDefault", StringType),
+ StructField("date", StringType),
+ StructField("city", StringType),
+ StructField("state", StringType)
+ )
+ )
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+ spark.sql(s"create index idx_rider on $tableName using
column_stats(rider) options(expr='upper')")
+ var metaClient = createMetaClient(spark, basePath)
+ // validate skipping with both types of expression
+ val riderExpr = resolveExpr(spark,
unapply(functions.upper(functions.col("rider"))).get, tableSchema)
+ var literal = Literal.create("RIDER-A")
+ var dataFilter = EqualTo(riderExpr, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true)
+
+ spark.sql(s"update $tableName set rider = 'rider-G' where id =
'trip5'")
+ metaClient = createMetaClient(spark, basePath)
+ literal = Literal.create("RIDER-D")
+ dataFilter = EqualTo(riderExpr, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true, isNoScanExpected = !isTableMOR)
+
+ if (isTableMOR) {
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+ }
+ spark.sql(s"update $tableName set rider = 'rider-H' where id =
'trip5'")
+ metaClient = createMetaClient(spark, basePath)
+ literal = Literal.create("RIDER-D")
+ dataFilter = EqualTo(riderExpr, literal)
+ verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient,
isDataSkippingExpected = true, isNoScanExpected = true)
Review Comment:
let's validate actual partition stats in EI after update as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]