codope commented on code in PR #12151:
URL: https://github.com/apache/hudi/pull/12151#discussion_r1818938876
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -81,6 +82,10 @@ public String getIndexType() {
return indexType;
}
+ public boolean isFunctionalIndex() {
+ return indexName.startsWith(PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX);
+ }
Review Comment:
We can use the method in `MetadataPartitionType`
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java:
##########
@@ -226,6 +227,57 @@ public void testSecondaryIndexValidation() throws
IOException {
validateSecondaryIndex();
}
+ @Test
+ public void testFunctionalIndexValidation() throws IOException {
Review Comment:
this test is failing. Could you look into this failure?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -1115,6 +1145,86 @@ private void
validateSecondaryIndex(HoodieSparkEngineContext engineContext, Hood
secondaryKeys.unpersist();
}
+ private void validateFunctionalIndex(HoodieSparkEngineContext engineContext,
HoodieMetadataValidationContext metadataContext,
+ HoodieTableMetaClient metaClient) throws
Exception {
+ Option<Map<String, HoodieIndexDefinition>> indexDefinitions =
metaClient.getIndexMetadata().map(HoodieIndexMetadata::getIndexDefinitions);
+ if (!indexDefinitions.isPresent()) {
+ return;
+ }
+ for (Map.Entry<String, HoodieIndexDefinition> indexDefinitionEntry :
indexDefinitions.get().entrySet()) {
+ if (indexDefinitionEntry.getValue().isFunctionalIndex()) {
+ validateFunctionalIndex(engineContext, metadataContext, metaClient,
indexDefinitionEntry.getKey(), indexDefinitionEntry.getValue());
+ }
+ }
+ }
+
+ private void validateFunctionalIndex(HoodieSparkEngineContext engineContext,
HoodieMetadataValidationContext metadataContext,
+ HoodieTableMetaClient metaClient,
String indexPartition, HoodieIndexDefinition indexDefinition) throws Exception {
+ // Fetch latest mdt functional index records using FunctionalIndexSupport
API
+ // Generate functional index records using functional index initialization
API and compare them
+ FunctionalIndexSupport functionalIndexSupport = new
FunctionalIndexSupport(engineContext.getSqlContext().sparkSession(),
metadataContext.getMetadataConfig(), metaClient);
+ Dataset<Row> mdtIndexDf =
functionalIndexSupport.loadFunctionalIndexDataFrame(indexPartition,
false).cache();
+ Dataset<Row> fsFunctionalIndexDf =
getFSFunctionalIndexRecords(engineContext, metaClient.getBasePath(),
indexDefinition, metadataContext)
+
.select(Arrays.stream(FunctionalIndexSupport.getTargetColumnStatsIndexColumns()).map(functions::col).toArray(Column[]::new))
+ .cache();
+ Dataset<Row> fsFunctionalIndexDiff =
fsFunctionalIndexDf.exceptAll(mdtIndexDf);
+ List<String> diffRecordsFilenames =
fsFunctionalIndexDiff.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .map((MapFunction<Row, String>) row -> row.getString(0),
Encoders.STRING())
+ .collectAsList();
+ Dataset<Row> mdtFunctionalIndexDiffRecords =
mdtIndexDf.filter((FilterFunction<Row>) row -> {
+ String fileName =
row.getAs(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME);
+ return diffRecordsFilenames.contains(fileName);
+ });
+ if (diffRecordsFilenames.isEmpty()) {
+ throw new HoodieValidationException(String.format("Functional Index does
not match : \nFS functional index diff records: %s \nCorresponding MDT
functional index diff records: %s",
+ Arrays.toString(fsFunctionalIndexDf.collectAsList().toArray()),
Arrays.toString(mdtFunctionalIndexDiffRecords.collectAsList().toArray())));
+ }
+ fsFunctionalIndexDf.unpersist();
+ mdtIndexDf.unpersist();
+ }
+
+ Dataset<Row> getFSFunctionalIndexRecords(HoodieSparkEngineContext
sparkEngineContext, StoragePath basePath, HoodieIndexDefinition indexDefinition,
+ HoodieMetadataValidationContext
metadataContext) throws Exception {
+ // Get latest partition file slices and generate functional index records
using functional index initialization API
+ List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet =
getPartitionFileSlicePairs(metaClient, metadataContext.tableMetadata,
metadataContext.fileSystemView);
+ Schema readerSchema =
getProjectedSchemaForFunctionalIndex(indexDefinition, metaClient);
+ StructType columnStatsRecordStructType =
AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$);
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
+ HoodieData<InternalRow> fsFunctionalIndexRecords =
getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition,
metaClient, 10, readerSchema,
+ metadataContext.getMetaClient().getStorageConf(), "",
sparkEngineContext, writeConfig, writeConfig)
+ .map(record -> {
+ HoodieMetadataPayload metadataPayload = (HoodieMetadataPayload)
record.getData();
+ return
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
columnStatsRecordStructType).apply(metadataPayload.getColumnStatMetadata().get()).orNull(null);
+ });
Review Comment:
Why do we need to convert here? Can we not have a FunctionalIndexSupport API
to return metadata payload itself instead of Dataset<Row> because that support
class anyway uses the metadata table reader which returns the payload and then
we convert to dataframe.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -168,57 +150,12 @@ public void deletePartitions(String instantTime,
List<MetadataPartitionType> par
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
- @Override
protected HoodieData<HoodieRecord>
getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet,
-
HoodieIndexDefinition indexDefinition,
-
HoodieTableMetaClient metaClient, int parallelism,
- Schema
readerSchema, StorageConfiguration<?> storageConf,
+
HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient,
+ int
parallelism, Schema readerSchema, StorageConfiguration<?> storageConf,
String
instantTime) {
- HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
- if (indexDefinition.getSourceFields().isEmpty()) {
- // In case there are no columns to index, bail
- return sparkEngineContext.emptyHoodieData();
- }
-
- // NOTE: We are assuming that the index expression is operating on a
single column
- // HUDI-6994 will address this.
- String columnToIndex = indexDefinition.getSourceFields().get(0);
- SQLContext sqlContext = sparkEngineContext.getSqlContext();
-
- // Read records and append functional index metadata to every row
- HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
- .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry -> {
- String partition = entry.getKey();
- Pair<String, Long> filePathSizePair = entry.getValue();
- String filePath = filePathSizePair.getKey();
- long fileSize = filePathSizePair.getValue();
- List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
- FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
- List<Row> rowsWithIndexMetadata =
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath,
partition, filePath, fileSize);
- return rowsWithIndexMetadata.iterator();
- });
-
- // Generate dataset with functional index metadata
- StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
- Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
-
- // Apply functional index and generate the column to index
- HoodieFunctionalIndex<Column, Column> functionalIndex =
- new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
- Column indexedColumn =
functionalIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
- rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
-
- // Generate functional index records
- if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset,
functionalIndex, columnToIndex);
- } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime);
- } else {
- throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
- }
+ return
SparkMetadataWriterUtils.getFunctionalIndexRecords(partitionFilePathAndSizeTriplet,
indexDefinition, metaClient, parallelism, readerSchema,
+ storageConf, instantTime, engineContext, dataWriteConfig,
metadataWriteConfig);
Review Comment:
Can we do this refactoring in a separate PR? Is it really necessary for the
validator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]