lokeshj1703 commented on code in PR #12151:
URL: https://github.com/apache/hudi/pull/12151#discussion_r1832413756


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -1115,6 +1145,86 @@ private void 
validateSecondaryIndex(HoodieSparkEngineContext engineContext, Hood
     secondaryKeys.unpersist();
   }
 
+  private void validateFunctionalIndex(HoodieSparkEngineContext engineContext, 
HoodieMetadataValidationContext metadataContext,
+                                      HoodieTableMetaClient metaClient) throws 
Exception {
+    Option<Map<String, HoodieIndexDefinition>> indexDefinitions = 
metaClient.getIndexMetadata().map(HoodieIndexMetadata::getIndexDefinitions);
+    if (!indexDefinitions.isPresent()) {
+      return;
+    }
+    for (Map.Entry<String, HoodieIndexDefinition> indexDefinitionEntry : 
indexDefinitions.get().entrySet()) {
+      if (indexDefinitionEntry.getValue().isFunctionalIndex()) {
+        validateFunctionalIndex(engineContext, metadataContext, metaClient, 
indexDefinitionEntry.getKey(), indexDefinitionEntry.getValue());
+      }
+    }
+  }
+
+  private void validateFunctionalIndex(HoodieSparkEngineContext engineContext, 
HoodieMetadataValidationContext metadataContext,
+                                       HoodieTableMetaClient metaClient, 
String indexPartition, HoodieIndexDefinition indexDefinition) throws Exception {
+    // Fetch latest mdt functional index records using FunctionalIndexSupport 
API
+    // Generate functional index records using functional index initialization 
API and compare them
+    FunctionalIndexSupport functionalIndexSupport = new 
FunctionalIndexSupport(engineContext.getSqlContext().sparkSession(), 
metadataContext.getMetadataConfig(), metaClient);
+    Dataset<Row> mdtIndexDf = 
functionalIndexSupport.loadFunctionalIndexDataFrame(indexPartition, 
false).cache();
+    Dataset<Row> fsFunctionalIndexDf = 
getFSFunctionalIndexRecords(engineContext, metaClient.getBasePath(), 
indexDefinition, metadataContext)
+        
.select(Arrays.stream(FunctionalIndexSupport.getTargetColumnStatsIndexColumns()).map(functions::col).toArray(Column[]::new))
+        .cache();
+    Dataset<Row> fsFunctionalIndexDiff = 
fsFunctionalIndexDf.exceptAll(mdtIndexDf);
+    List<String> diffRecordsFilenames = 
fsFunctionalIndexDiff.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+        .map((MapFunction<Row, String>) row -> row.getString(0), 
Encoders.STRING())
+        .collectAsList();
+    Dataset<Row> mdtFunctionalIndexDiffRecords = 
mdtIndexDf.filter((FilterFunction<Row>) row -> {
+      String fileName = 
row.getAs(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME);
+      return diffRecordsFilenames.contains(fileName);
+    });
+    if (diffRecordsFilenames.isEmpty()) {
+      throw new HoodieValidationException(String.format("Functional Index does 
not match : \nFS functional index diff records: %s \nCorresponding MDT 
functional index diff records: %s",
+          Arrays.toString(fsFunctionalIndexDf.collectAsList().toArray()), 
Arrays.toString(mdtFunctionalIndexDiffRecords.collectAsList().toArray())));
+    }
+    fsFunctionalIndexDf.unpersist();
+    mdtIndexDf.unpersist();
+  }
+
+  Dataset<Row> getFSFunctionalIndexRecords(HoodieSparkEngineContext 
sparkEngineContext, StoragePath basePath, HoodieIndexDefinition indexDefinition,
+                                           HoodieMetadataValidationContext 
metadataContext) throws Exception {
+    // Get latest partition file slices and generate functional index records 
using functional index initialization API
+    List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = 
getPartitionFileSlicePairs(metaClient, metadataContext.tableMetadata, 
metadataContext.fileSystemView);
+    Schema readerSchema = 
getProjectedSchemaForFunctionalIndex(indexDefinition, metaClient);
+    StructType columnStatsRecordStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$);
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
+    HoodieData<InternalRow> fsFunctionalIndexRecords = 
getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
metaClient, 10, readerSchema,
+        metadataContext.getMetaClient().getStorageConf(), "", 
sparkEngineContext, writeConfig, writeConfig)
+        .map(record -> {
+          HoodieMetadataPayload metadataPayload = (HoodieMetadataPayload) 
record.getData();
+          return 
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
 
columnStatsRecordStructType).apply(metadataPayload.getColumnStatMetadata().get()).orNull(null);
+        });

Review Comment:
   This is actually beneficial since the schema of the dataset is only 
reflecting column stat fields. It makes it easier to compare and query.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java:
##########
@@ -226,6 +227,57 @@ public void testSecondaryIndexValidation() throws 
IOException {
     validateSecondaryIndex();
   }
 
+  @Test
+  public void testFunctionalIndexValidation() throws IOException {

Review Comment:
   Addressed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -168,57 +150,12 @@ public void deletePartitions(String instantTime, 
List<MetadataPartitionType> par
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
-  @Override
   protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet,
-                                                               
HoodieIndexDefinition indexDefinition,
-                                                               
HoodieTableMetaClient metaClient, int parallelism,
-                                                               Schema 
readerSchema, StorageConfiguration<?> storageConf,
+                                                               
HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient,
+                                                               int 
parallelism, Schema readerSchema, StorageConfiguration<?> storageConf,
                                                                String 
instantTime) {
-    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
-    if (indexDefinition.getSourceFields().isEmpty()) {
-      // In case there are no columns to index, bail
-      return sparkEngineContext.emptyHoodieData();
-    }
-
-    // NOTE: We are assuming that the index expression is operating on a 
single column
-    //       HUDI-6994 will address this.
-    String columnToIndex = indexDefinition.getSourceFields().get(0);
-    SQLContext sqlContext = sparkEngineContext.getSqlContext();
-
-    // Read records and append functional index metadata to every row
-    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
-        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
-          String partition = entry.getKey();
-          Pair<String, Long> filePathSizePair = entry.getValue();
-          String filePath = filePathSizePair.getKey();
-          long fileSize = filePathSizePair.getValue();
-          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
-              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
-          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, filePath, fileSize);
-          return rowsWithIndexMetadata.iterator();
-        });
-
-    // Generate dataset with functional index metadata
-    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, 
DataTypes.StringType, false, Metadata.empty()))
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
-    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
-
-    // Apply functional index and generate the column to index
-    HoodieFunctionalIndex<Column, Column> functionalIndex =
-        new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
-    rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
-
-    // Generate functional index records
-    if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
-      return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset, 
functionalIndex, columnToIndex);
-    } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
-      return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime);
-    } else {
-      throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
-    }
+    return 
SparkMetadataWriterUtils.getFunctionalIndexRecords(partitionFilePathAndSizeTriplet,
 indexDefinition, metaClient, parallelism, readerSchema,
+        storageConf, instantTime, engineContext, dataWriteConfig, 
metadataWriteConfig);

Review Comment:
   I am using the refactored API in the validator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to