lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1907316692
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,58 +160,61 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
+ /**
+ * Loads the file slices touched by the commit due to given instant time and
returns the records for the expression index.
+ * This generates partition stat record updates along with EI column stat
update records. Partition stat record updates are generated
+ * by reloading the affected partitions column range metadata from EI and
then merging it with partition stat record from the updated data.
+ *
+ * @param commitMetadata {@code HoodieCommitMetadata}
+ * @param indexPartition partition name of the expression index
+ * @param instantTime timestamp at of the current update commit
+ */
+ @Override
+ protected HoodieData<HoodieRecord>
getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
+ HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
+ boolean isColumnStatEI =
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.empty(), partitionName).stream()
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
+ return engineContext.parallelize(partitionedWriteStats,
parallelism).mapToPair(partitionedWriteStat -> {
+ final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames =
HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.empty(), partitionName).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch EI column stat records for above files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
partitionName), indexPartition, false)
+ // schema and properties are ignored in getInsertValue, so
simply pass as null
+ .map(record -> record.getData().getInsertValue(null, null))
+ .filter(Option::isPresent)
+ .map(data -> ((HoodieMetadataRecord)
data.get()).getColumnStatsMetadata())
+ .filter(stats -> fileNames.contains(stats.getFileName()))
+ .map(HoodieColumnRangeMetadata::fromColumnStats)
+ .collectAsList();
Review Comment:
This seems ok. We can either parallelize in the outer block across
partitions or within the getRecordsByKeyPrefixes for a particular partition.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,4 +276,155 @@ private static List<Row> toRows(List<HoodieRecord>
records, Schema schema, Hoodi
.collect(Collectors.toList());
return avroRecords;
}
+
+ /**
+ * Generates expression index records
+ * @param partitionFilePathAndSizeTriplet Triplet of file path, file size
and partition name to which file belongs
+ * @param indexDefinition Hoodie Index Definition for the expression index
for which records need to be generated
+ * @param metaClient Hoodie Table Meta Client
+ * @param parallelism Parallelism to use for engine operations
+ * @param readerSchema Schema of reader
+ * @param instantTime Instant time
+ * @param engineContext HoodieEngineContext
+ * @param dataWriteConfig Write Config for the data table
+ * @param metadataWriteConfig Write config for the metadata table
+ * @param partitionRecordsFunctionOpt Function used to generate partition
stat records for the EI. It takes the column range metadata generated for the
provided partition files as input
+ * and uses those to generate the final
partition stats
+ * @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
+ */
+ @SuppressWarnings("checkstyle:LineLength")
+ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
+
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
+
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
+ if (indexDefinition.getSourceFields().isEmpty()) {
+ // In case there are no columns to index, bail
+ return new
ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
+ }
+
+ // NOTE: We are assuming that the index expression is operating on a
single column
+ // HUDI-6994 will address this.
+ String columnToIndex = indexDefinition.getSourceFields().get(0);
+ SQLContext sqlContext = sparkEngineContext.getSqlContext();
+
+ // Read records and append expression index metadata to every row
+ HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
+ .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
+ String partition = entry.getKey();
+ Pair<String, Long> filePathSizePair = entry.getValue();
+ String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
+ long fileSize = filePathSizePair.getValue();
+ List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
+ FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
+ List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
+ return rowsWithIndexMetadata.iterator();
+ });
+
+ // Generate dataset with expression index metadata
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
+ Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
+
+ // Apply expression index and generate the column to index
+ HoodieExpressionIndex<Column, Column> expressionIndex =
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
+ rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
+
+ // Generate expression index records
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ return new
ExpressionIndexComputationMetadata(getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName()));
+ } else {
+ throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
+ }
+ }
+
+ /**
+ * Fetches column range metadata from the EI partition for all the partition
files impacted by the commit. This would only take into account completed
commits for the partitions
+ * since EI updates have not yet been committed.
+ *
+ * @param commitMetadata Hoodie commit metadata
+ * @param indexPartition Partition name for the expression index
+ * @param engineContext Hoodie engine context
+ * @param dataMetaClient Data table meta client
+ * @param metadataConfig Hoodie metadata config
+ *
+ * @return HoodiePairData of partition name and list of column range
metadata for the partitions
+ */
+ public static HoodiePairData<String,
List<List<HoodieColumnRangeMetadata<Comparable>>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
+
HoodieEngineContext
engineContext,
+
HoodieTableMetaClient
dataMetaClient,
+
HoodieMetadataConfig
metadataConfig) {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+
+ HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
+ List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
+ try {
+ Option<Schema> writerSchema =
+
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
+ Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ // filter columns with only supported types
+ final List<String> validColumnsToIndex = columnsToIndex.stream()
+ .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col,
writerSchemaOpt.get().get()))
+ .collect(Collectors.toList());
+ if (validColumnsToIndex.isEmpty()) {
+ return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
+ }
+ LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ // Group by partitionPath and then gather write stats lists,
+ // where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
+ List<List<HoodieWriteStat>> partitionedWriteStats =
allWriteStats.stream()
+ .collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath))
+ .values()
+ .stream()
+ .collect(Collectors.toList());
+
+ int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(),
metadataConfig, dataMetaClient.getBasePath().toString());
Review Comment:
Addressed for both this change and partition stats code. We dont need to
close now, I am reusing from metadata writer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]