lokeshj1703 commented on code in PR #12151:
URL: https://github.com/apache/hudi/pull/12151#discussion_r1832413756
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -1115,6 +1145,86 @@ private void
validateSecondaryIndex(HoodieSparkEngineContext engineContext, Hood
secondaryKeys.unpersist();
}
+ private void validateFunctionalIndex(HoodieSparkEngineContext engineContext,
HoodieMetadataValidationContext metadataContext,
+ HoodieTableMetaClient metaClient) throws
Exception {
+ Option<Map<String, HoodieIndexDefinition>> indexDefinitions =
metaClient.getIndexMetadata().map(HoodieIndexMetadata::getIndexDefinitions);
+ if (!indexDefinitions.isPresent()) {
+ return;
+ }
+ for (Map.Entry<String, HoodieIndexDefinition> indexDefinitionEntry :
indexDefinitions.get().entrySet()) {
+ if (indexDefinitionEntry.getValue().isFunctionalIndex()) {
+ validateFunctionalIndex(engineContext, metadataContext, metaClient,
indexDefinitionEntry.getKey(), indexDefinitionEntry.getValue());
+ }
+ }
+ }
+
+ private void validateFunctionalIndex(HoodieSparkEngineContext engineContext,
HoodieMetadataValidationContext metadataContext,
+ HoodieTableMetaClient metaClient,
String indexPartition, HoodieIndexDefinition indexDefinition) throws Exception {
+ // Fetch latest mdt functional index records using FunctionalIndexSupport
API
+ // Generate functional index records using functional index initialization
API and compare them
+ FunctionalIndexSupport functionalIndexSupport = new
FunctionalIndexSupport(engineContext.getSqlContext().sparkSession(),
metadataContext.getMetadataConfig(), metaClient);
+ Dataset<Row> mdtIndexDf =
functionalIndexSupport.loadFunctionalIndexDataFrame(indexPartition,
false).cache();
+ Dataset<Row> fsFunctionalIndexDf =
getFSFunctionalIndexRecords(engineContext, metaClient.getBasePath(),
indexDefinition, metadataContext)
+
.select(Arrays.stream(FunctionalIndexSupport.getTargetColumnStatsIndexColumns()).map(functions::col).toArray(Column[]::new))
+ .cache();
+ Dataset<Row> fsFunctionalIndexDiff =
fsFunctionalIndexDf.exceptAll(mdtIndexDf);
+ List<String> diffRecordsFilenames =
fsFunctionalIndexDiff.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .map((MapFunction<Row, String>) row -> row.getString(0),
Encoders.STRING())
+ .collectAsList();
+ Dataset<Row> mdtFunctionalIndexDiffRecords =
mdtIndexDf.filter((FilterFunction<Row>) row -> {
+ String fileName =
row.getAs(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME);
+ return diffRecordsFilenames.contains(fileName);
+ });
+ if (diffRecordsFilenames.isEmpty()) {
+ throw new HoodieValidationException(String.format("Functional Index does
not match : \nFS functional index diff records: %s \nCorresponding MDT
functional index diff records: %s",
+ Arrays.toString(fsFunctionalIndexDf.collectAsList().toArray()),
Arrays.toString(mdtFunctionalIndexDiffRecords.collectAsList().toArray())));
+ }
+ fsFunctionalIndexDf.unpersist();
+ mdtIndexDf.unpersist();
+ }
+
+ Dataset<Row> getFSFunctionalIndexRecords(HoodieSparkEngineContext
sparkEngineContext, StoragePath basePath, HoodieIndexDefinition indexDefinition,
+ HoodieMetadataValidationContext
metadataContext) throws Exception {
+ // Get latest partition file slices and generate functional index records
using functional index initialization API
+ List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet =
getPartitionFileSlicePairs(metaClient, metadataContext.tableMetadata,
metadataContext.fileSystemView);
+ Schema readerSchema =
getProjectedSchemaForFunctionalIndex(indexDefinition, metaClient);
+ StructType columnStatsRecordStructType =
AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$);
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
+ HoodieData<InternalRow> fsFunctionalIndexRecords =
getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition,
metaClient, 10, readerSchema,
+ metadataContext.getMetaClient().getStorageConf(), "",
sparkEngineContext, writeConfig, writeConfig)
+ .map(record -> {
+ HoodieMetadataPayload metadataPayload = (HoodieMetadataPayload)
record.getData();
+ return
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
columnStatsRecordStructType).apply(metadataPayload.getColumnStatMetadata().get()).orNull(null);
+ });
Review Comment:
This is actually beneficial since the schema of the dataset is only
reflecting column stat fields. It makes it easier to compare and query.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java:
##########
@@ -226,6 +227,57 @@ public void testSecondaryIndexValidation() throws
IOException {
validateSecondaryIndex();
}
+ @Test
+ public void testFunctionalIndexValidation() throws IOException {
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -168,57 +150,12 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
- @Override
protected HoodieData<HoodieRecord>
getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet,
-
HoodieIndexDefinition indexDefinition,
-
HoodieTableMetaClient metaClient, int parallelism,
- Schema
readerSchema, StorageConfiguration<?> storageConf,
+
HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient,
+ int
parallelism, Schema readerSchema, StorageConfiguration<?> storageConf,
String
instantTime) {
- HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
- if (indexDefinition.getSourceFields().isEmpty()) {
- // In case there are no columns to index, bail
- return sparkEngineContext.emptyHoodieData();
- }
-
- // NOTE: We are assuming that the index expression is operating on a
single column
- // HUDI-6994 will address this.
- String columnToIndex = indexDefinition.getSourceFields().get(0);
- SQLContext sqlContext = sparkEngineContext.getSqlContext();
-
- // Read records and append functional index metadata to every row
- HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
- .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
- String partition = entry.getKey();
- Pair<String, Long> filePathSizePair = entry.getValue();
- String filePath = filePathSizePair.getKey();
- long fileSize = filePathSizePair.getValue();
- List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
- FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
- List<Row> rowsWithIndexMetadata =
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath,
partition, filePath, fileSize);
- return rowsWithIndexMetadata.iterator();
- });
-
- // Generate dataset with functional index metadata
- StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
- Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
-
- // Apply functional index and generate the column to index
- HoodieFunctionalIndex<Column, Column> functionalIndex =
- new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
- Column indexedColumn =
functionalIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
- rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
-
- // Generate functional index records
- if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset,
functionalIndex, columnToIndex);
- } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime);
- } else {
- throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
- }
+ return
SparkMetadataWriterUtils.getFunctionalIndexRecords(partitionFilePathAndSizeTriplet,
indexDefinition, metaClient, parallelism, readerSchema,
+ storageConf, instantTime, engineContext, dataWriteConfig,
metadataWriteConfig);
Review Comment:
I am using the refactored API in the validator
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]