codope commented on code in PR #12151:
URL: https://github.com/apache/hudi/pull/12151#discussion_r1818938876


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -81,6 +82,10 @@ public String getIndexType() {
     return indexType;
   }
 
+  public boolean isFunctionalIndex() {
+    return indexName.startsWith(PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX);
+  }

Review Comment:
   We can use the method in `MetadataPartitionType`



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java:
##########
@@ -226,6 +227,57 @@ public void testSecondaryIndexValidation() throws 
IOException {
     validateSecondaryIndex();
   }
 
+  @Test
+  public void testFunctionalIndexValidation() throws IOException {

Review Comment:
   this test is failing. Could you look into this failure?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -1115,6 +1145,86 @@ private void 
validateSecondaryIndex(HoodieSparkEngineContext engineContext, Hood
     secondaryKeys.unpersist();
   }
 
+  private void validateFunctionalIndex(HoodieSparkEngineContext engineContext, 
HoodieMetadataValidationContext metadataContext,
+                                      HoodieTableMetaClient metaClient) throws 
Exception {
+    Option<Map<String, HoodieIndexDefinition>> indexDefinitions = 
metaClient.getIndexMetadata().map(HoodieIndexMetadata::getIndexDefinitions);
+    if (!indexDefinitions.isPresent()) {
+      return;
+    }
+    for (Map.Entry<String, HoodieIndexDefinition> indexDefinitionEntry : 
indexDefinitions.get().entrySet()) {
+      if (indexDefinitionEntry.getValue().isFunctionalIndex()) {
+        validateFunctionalIndex(engineContext, metadataContext, metaClient, 
indexDefinitionEntry.getKey(), indexDefinitionEntry.getValue());
+      }
+    }
+  }
+
+  private void validateFunctionalIndex(HoodieSparkEngineContext engineContext, 
HoodieMetadataValidationContext metadataContext,
+                                       HoodieTableMetaClient metaClient, 
String indexPartition, HoodieIndexDefinition indexDefinition) throws Exception {
+    // Fetch latest mdt functional index records using FunctionalIndexSupport 
API
+    // Generate functional index records using functional index initialization 
API and compare them
+    FunctionalIndexSupport functionalIndexSupport = new 
FunctionalIndexSupport(engineContext.getSqlContext().sparkSession(), 
metadataContext.getMetadataConfig(), metaClient);
+    Dataset<Row> mdtIndexDf = 
functionalIndexSupport.loadFunctionalIndexDataFrame(indexPartition, 
false).cache();
+    Dataset<Row> fsFunctionalIndexDf = 
getFSFunctionalIndexRecords(engineContext, metaClient.getBasePath(), 
indexDefinition, metadataContext)
+        
.select(Arrays.stream(FunctionalIndexSupport.getTargetColumnStatsIndexColumns()).map(functions::col).toArray(Column[]::new))
+        .cache();
+    Dataset<Row> fsFunctionalIndexDiff = 
fsFunctionalIndexDf.exceptAll(mdtIndexDf);
+    List<String> diffRecordsFilenames = 
fsFunctionalIndexDiff.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+        .map((MapFunction<Row, String>) row -> row.getString(0), 
Encoders.STRING())
+        .collectAsList();
+    Dataset<Row> mdtFunctionalIndexDiffRecords = 
mdtIndexDf.filter((FilterFunction<Row>) row -> {
+      String fileName = 
row.getAs(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME);
+      return diffRecordsFilenames.contains(fileName);
+    });
+    if (diffRecordsFilenames.isEmpty()) {
+      throw new HoodieValidationException(String.format("Functional Index does 
not match : \nFS functional index diff records: %s \nCorresponding MDT 
functional index diff records: %s",
+          Arrays.toString(fsFunctionalIndexDf.collectAsList().toArray()), 
Arrays.toString(mdtFunctionalIndexDiffRecords.collectAsList().toArray())));
+    }
+    fsFunctionalIndexDf.unpersist();
+    mdtIndexDf.unpersist();
+  }
+
+  Dataset<Row> getFSFunctionalIndexRecords(HoodieSparkEngineContext 
sparkEngineContext, StoragePath basePath, HoodieIndexDefinition indexDefinition,
+                                           HoodieMetadataValidationContext 
metadataContext) throws Exception {
+    // Get latest partition file slices and generate functional index records 
using functional index initialization API
+    List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = 
getPartitionFileSlicePairs(metaClient, metadataContext.tableMetadata, 
metadataContext.fileSystemView);
+    Schema readerSchema = 
getProjectedSchemaForFunctionalIndex(indexDefinition, metaClient);
+    StructType columnStatsRecordStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$);
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
+    HoodieData<InternalRow> fsFunctionalIndexRecords = 
getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
metaClient, 10, readerSchema,
+        metadataContext.getMetaClient().getStorageConf(), "", 
sparkEngineContext, writeConfig, writeConfig)
+        .map(record -> {
+          HoodieMetadataPayload metadataPayload = (HoodieMetadataPayload) 
record.getData();
+          return 
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
 
columnStatsRecordStructType).apply(metadataPayload.getColumnStatMetadata().get()).orNull(null);
+        });

Review Comment:
   Why do we need to convert here? Can we not have a FunctionalIndexSupport API 
to return metadata payload itself instead of Dataset<Row> because that support 
class anyway uses the metadata table reader which returns the payload and then 
we convert to dataframe.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -168,57 +150,12 @@ public void deletePartitions(String instantTime, 
List<MetadataPartitionType> par
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
-  @Override
   protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet,
-                                                               
HoodieIndexDefinition indexDefinition,
-                                                               
HoodieTableMetaClient metaClient, int parallelism,
-                                                               Schema 
readerSchema, StorageConfiguration<?> storageConf,
+                                                               
HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient,
+                                                               int 
parallelism, Schema readerSchema, StorageConfiguration<?> storageConf,
                                                                String 
instantTime) {
-    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
-    if (indexDefinition.getSourceFields().isEmpty()) {
-      // In case there are no columns to index, bail
-      return sparkEngineContext.emptyHoodieData();
-    }
-
-    // NOTE: We are assuming that the index expression is operating on a 
single column
-    //       HUDI-6994 will address this.
-    String columnToIndex = indexDefinition.getSourceFields().get(0);
-    SQLContext sqlContext = sparkEngineContext.getSqlContext();
-
-    // Read records and append functional index metadata to every row
-    HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
-        .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry -> {
-          String partition = entry.getKey();
-          Pair<String, Long> filePathSizePair = entry.getValue();
-          String filePath = filePathSizePair.getKey();
-          long fileSize = filePathSizePair.getValue();
-          List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
-              FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
-          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, filePath, fileSize);
-          return rowsWithIndexMetadata.iterator();
-        });
-
-    // Generate dataset with functional index metadata
-    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, 
DataTypes.StringType, false, Metadata.empty()))
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
-    Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
-
-    // Apply functional index and generate the column to index
-    HoodieFunctionalIndex<Column, Column> functionalIndex =
-        new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
-    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
-    rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
-
-    // Generate functional index records
-    if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
-      return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset, 
functionalIndex, columnToIndex);
-    } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
-      return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime);
-    } else {
-      throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
-    }
+    return 
SparkMetadataWriterUtils.getFunctionalIndexRecords(partitionFilePathAndSizeTriplet,
 indexDefinition, metaClient, parallelism, readerSchema,
+        storageConf, instantTime, engineContext, dataWriteConfig, 
metadataWriteConfig);

Review Comment:
   Can we do this refactoring in a separate PR? Is it really necessary for the 
validator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to