This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ca1d6c49d1 [HUDI-7662] Add a metadata config to enable or disable 
functional index (#12001)
9ca1d6c49d1 is described below

commit 9ca1d6c49d1e770f7f5ecc22d62a4c5ac5f1b6cf
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Sep 27 07:57:56 2024 +0530

    [HUDI-7662] Add a metadata config to enable or disable functional index 
(#12001)
    
    Add a metadata config to enable or disable functional index. Note that this 
config
    is used to disable all functional indexes. To build or drop each functional 
index
    individually, users still need to use CREATE/DROP INDEX SQL commands.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   6 +
 .../client/utils/SparkMetadataWriterUtils.java     | 174 +++++++--------------
 .../SparkHoodieBackedTableMetadataWriter.java      |  54 ++-----
 .../hudi/common/config/HoodieMetadataConfig.java   |  11 ++
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  12 +-
 .../hudi/metadata/MetadataPartitionType.java       |   5 +-
 .../hudi/command/index/TestFunctionalIndex.scala   |  81 +++++++++-
 7 files changed, 185 insertions(+), 158 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f41bd21c78e..d03891b7c6c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -527,6 +527,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   }
 
   private Set<String> getFunctionalIndexPartitionsToInit() {
+    if (dataMetaClient.getIndexMetadata().isEmpty()) {
+      return Collections.emptySet();
+    }
     Set<String> functionalIndexPartitions = 
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().keySet();
     Set<String> completedMetadataPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
     functionalIndexPartitions.removeAll(completedMetadataPartitions);
@@ -1050,6 +1053,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * Update functional index from {@link HoodieCommitMetadata}.
    */
   private void updateFunctionalIndexIfPresent(HoodieCommitMetadata 
commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>> 
partitionToRecordMap) {
+    if (!dataWriteConfig.getMetadataConfig().isFunctionalIndexEnabled()) {
+      return;
+    }
     dataMetaClient.getTableConfig().getMetadataPartitions()
         .stream()
         .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index f9fdf0af0c7..8072b34c6d3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -20,7 +20,6 @@
 package org.apache.hudi.client.utils;
 
 import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -28,13 +27,11 @@ import 
org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.functional.HoodieFunctionalIndex;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -53,7 +50,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
 
@@ -70,118 +66,78 @@ public class SparkMetadataWriterUtils {
   private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
   private static final String GLOB_PATHS_CONFIG = "glob.paths";
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext) {
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      long fileSize = baseFile.getFileSize();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        long fileSize = baseFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
baseFile.getStoragePath());
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(logFile -> {
+        long fileSize = logFile.getFileSize();
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath());
+      });
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      long fileSize = logFile.getFileSize();
-      buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
-    });
-    return HoodieJavaRDD.of(createColumnStatsRecords(partition, 
columnRangeMetadataList, false).collect(Collectors.toList()), 
sparkEngineContext, parallelism);
+    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
   }
 
-  public static HoodieJavaRDD<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(
-      HoodieTableMetaClient metaClient,
-      int parallelism,
-      Schema readerSchema,
-      FileSlice fileSlice,
-      String basePath,
-      String partition,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      HoodieSparkEngineContext sparkEngineContext,
-      HoodieWriteConfig metadataWriteConfig) {
+  public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+                                                                             
Schema readerSchema,
+                                                                             
List<FileSlice> fileSlices,
+                                                                             
String partition,
+                                                                             
HoodieFunctionalIndex<Column, Column> functionalIndex,
+                                                                             
String columnToIndex,
+                                                                             
SQLContext sqlContext,
+                                                                             
HoodieWriteConfig metadataWriteConfig) {
     List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
-    if (fileSlice.getBaseFile().isPresent()) {
-      HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      String filename = baseFile.getFileName();
-      Path baseFilePath = filePath(basePath, partition, filename);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          baseFilePath,
-          metadataWriteConfig,
-          partition,
-          baseFile.getCommitTime());
+    for (FileSlice fileSlice : fileSlices) {
+      if (fileSlice.getBaseFile().isPresent()) {
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(), 
metadataWriteConfig, partition,
+            baseFile.getCommitTime());
+      }
+      // Handle log files
+      fileSlice.getLogFiles().forEach(
+          logFile -> buildBloomFilterMetadata(metaClient, readerSchema, 
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, 
logFile.getPath(), metadataWriteConfig, partition,
+              logFile.getDeltaCommitTime()));
     }
-    // Handle log files
-    fileSlice.getLogFiles().forEach(logFile -> {
-      String fileName = logFile.getFileName();
-      Path logFilePath = filePath(basePath, partition, fileName);
-      buildBloomFilterMetadata(
-          metaClient,
-          readerSchema,
-          functionalIndex,
-          columnToIndex,
-          sqlContext,
-          bloomFilterMetadataList,
-          logFilePath,
-          metadataWriteConfig,
-          partition,
-          logFile.getDeltaCommitTime());
-    });
-    return HoodieJavaRDD.of(bloomFilterMetadataList, sparkEngineContext, 
parallelism);
+    return bloomFilterMetadataList;
   }
 
-  private static void buildColumnRangeMetadata(
-      HoodieTableMetaClient metaClient,
-      Schema readerSchema,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-      long fileSize,
-      Path filePath) {
-    Dataset<Row> fileDf = readRecordsAsRow(
-        new StoragePath[] {convertToStoragePath(filePath)},
-        sqlContext,
-        metaClient,
-        readerSchema);
+  private static void buildColumnRangeMetadata(HoodieTableMetaClient 
metaClient,
+                                               Schema readerSchema,
+                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
+                                               String columnToIndex,
+                                               SQLContext sqlContext,
+                                               
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                               long fileSize,
+                                               StoragePath filePath) {
+    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema);
     Column indexedColumn = 
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
     fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
-    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata =
-        computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), 
fileSize);
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = 
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), 
fileSize);
     columnRangeMetadataList.add(columnRangeMetadata);
   }
 
-  private static void buildBloomFilterMetadata(
-      HoodieTableMetaClient metaClient,
-      Schema readerSchema,
-      HoodieFunctionalIndex<Column, Column> functionalIndex,
-      String columnToIndex,
-      SQLContext sqlContext,
-      List<HoodieRecord> bloomFilterMetadataList,
-      Path filePath,
-      HoodieWriteConfig writeConfig,
-      String partitionName,
-      String instantTime) {
-    Dataset<Row> fileDf =
-        readRecordsAsRow(new StoragePath[] {convertToStoragePath(filePath)},
-            sqlContext, metaClient, readerSchema);
+  private static void buildBloomFilterMetadata(HoodieTableMetaClient 
metaClient,
+                                               Schema readerSchema,
+                                               HoodieFunctionalIndex<Column, 
Column> functionalIndex,
+                                               String columnToIndex,
+                                               SQLContext sqlContext,
+                                               List<HoodieRecord> 
bloomFilterMetadataList,
+                                               StoragePath filePath,
+                                               HoodieWriteConfig writeConfig,
+                                               String partitionName,
+                                               String instantTime) {
+    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema);
     Column indexedColumn = 
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
     fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
     BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(writeConfig);
@@ -190,9 +146,7 @@ public class SparkMetadataWriterUtils {
       bloomFilter.add(key);
     });
     ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
-    bloomFilterMetadataList.add(createBloomFilterMetadataRecord(
-        partitionName, filePath.toString(), instantTime, 
writeConfig.getBloomFilterType(),
-        bloomByteBuffer, false));
+    bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, 
filePath.toString(), instantTime, writeConfig.getBloomFilterType(), 
bloomByteBuffer, false));
   }
 
   private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext 
sqlContext,
@@ -252,12 +206,4 @@ public class SparkMetadataWriterUtils {
             .filter(c -> !HOODIE_META_COLUMNS.contains(c))
             .map(df::col).toArray(Column[]::new));
   }
-
-  private static Path filePath(String basePath, String partition, String 
filename) {
-    if (partition.isEmpty()) {
-      return new Path(basePath, filename);
-    } else {
-      return new Path(basePath, partition + StoragePath.SEPARATOR + filename);
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 9d7f63f354b..4dd46e03243 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -150,7 +150,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   @Override
   public void deletePartitions(String instantTime, List<MetadataPartitionType> 
partitions) {
     List<String> partitionsToDrop = 
partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
-    LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop);
+    LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop);
 
     SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
     String actionType = 
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, 
HoodieTableType.MERGE_ON_READ);
@@ -163,11 +163,8 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
                                                                
HoodieIndexDefinition indexDefinition,
                                                                
HoodieTableMetaClient metaClient, int parallelism,
                                                                Schema 
readerSchema, StorageConfiguration<?> storageConf) {
-    HoodieFunctionalIndex<Column, Column> functionalIndex = new 
HoodieSparkFunctionalIndex(
-        indexDefinition.getIndexName(),
-        indexDefinition.getIndexFunction(),
-        indexDefinition.getSourceFields(),
-        indexDefinition.getIndexOptions());
+    HoodieFunctionalIndex<Column, Column> functionalIndex =
+        new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
     HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
     if (indexDefinition.getSourceFields().isEmpty()) {
       // In case there are no columns to index, bail
@@ -179,40 +176,23 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     String columnToIndex = indexDefinition.getSourceFields().get(0);
     SQLContext sqlContext = sparkEngineContext.getSqlContext();
     String basePath = metaClient.getBasePath().toString();
-    for (Pair<String, FileSlice> pair : partitionFileSlicePairs) {
-      String partition = pair.getKey();
-      FileSlice fileSlice = pair.getValue();
-      // For functional index using column_stats
+
+    // Group FileSlices by partition
+    Map<String, List<FileSlice>> partitionToFileSlicesMap = 
partitionFileSlicePairs.stream()
+        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
+    List<HoodieRecord> allRecords = new ArrayList<>();
+    for (Map.Entry<String, List<FileSlice>> entry : 
partitionToFileSlicesMap.entrySet()) {
+      String partition = entry.getKey();
+      List<FileSlice> fileSlices = entry.getValue();
+      List<HoodieRecord> recordsForPartition = Collections.emptyList();
       if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
-        return getFunctionalIndexRecordsUsingColumnStats(
-            metaClient,
-            parallelism,
-            readerSchema,
-            fileSlice,
-            basePath,
-            partition,
-            functionalIndex,
-            columnToIndex,
-            sqlContext,
-            sparkEngineContext);
-      }
-      // For functional index using bloom_filters
-      if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
-        return getFunctionalIndexRecordsUsingBloomFilter(
-            metaClient,
-            parallelism,
-            readerSchema,
-            fileSlice,
-            basePath,
-            partition,
-            functionalIndex,
-            columnToIndex,
-            sqlContext,
-            sparkEngineContext,
-            metadataWriteConfig);
+        recordsForPartition = 
getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices, 
partition, functionalIndex, columnToIndex, sqlContext);
+      } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
+        recordsForPartition = 
getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, fileSlices, 
partition, functionalIndex, columnToIndex, sqlContext, metadataWriteConfig);
       }
+      allRecords.addAll(recordsForPartition);
     }
-    return HoodieJavaRDD.of(Collections.emptyList(), sparkEngineContext, 
parallelism);
+    return HoodieJavaRDD.of(allRecords, sparkEngineContext, parallelism);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 40e2ab94d41..736b21e847a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -316,6 +316,13 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("Initializes the metadata table by reading from the 
file system when the table is first created. Enabled by default. "
           + "Warning: This should only be disabled when manually constructing 
the metadata table outside of typical Hudi writer flows.");
 
+  public static final ConfigProperty<Boolean> FUNCTIONAL_INDEX_ENABLE_PROP = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.functional.enable")
+      .defaultValue(false)
+      .sinceVersion("1.0.0")
+      .withDocumentation("Enable functional index within the Metadata Table. 
Note that this config is to enable/disable all functional indexes. "
+          + "To enable or disable each functional index individually, users 
still need to use CREATE/DROP INDEX SQL commands.");
+
   public static final ConfigProperty<Integer> 
FUNCTIONAL_INDEX_FILE_GROUP_COUNT = ConfigProperty
       .key(METADATA_PREFIX + ".index.functional.file.group.count")
       .defaultValue(2)
@@ -494,6 +501,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getBoolean(AUTO_INITIALIZE);
   }
 
+  public boolean isFunctionalIndexEnabled() {
+    return getBooleanOrDefault(FUNCTIONAL_INDEX_ENABLE_PROP);
+  }
+
   public int getFunctionalIndexFileGroupCount() {
     return getInt(FUNCTIONAL_INDEX_FILE_GROUP_COUNT);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e85aebc9da1..31bb643bbda 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1851,9 +1851,15 @@ public class HoodieTableMetadataUtil {
   }
 
   public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
-    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-    Schema tableSchema = schemaResolver.getTableAvroSchema();
-    return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
+    Schema tableSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    List<String> partitionFields = 
metaClient.getTableConfig().getPartitionFields()
+        .map(Arrays::asList)
+        .orElse(Collections.emptyList());
+    List<String> sourceFields = indexDefinition.getSourceFields();
+    List<String> mergedFields = new ArrayList<>(partitionFields.size() + 
sourceFields.size());
+    mergedFields.addAll(partitionFields);
+    mergedFields.addAll(sourceFields);
+    return addMetadataFields(getSchemaForFields(tableSchema, mergedFields));
   }
 
   public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 80ec142ea77..c539971162d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -43,6 +43,7 @@ import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS;
+import static 
org.apache.hudi.common.config.HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -176,9 +177,7 @@ public enum MetadataPartitionType {
   
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
 "func-index-", -1) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
-      // Functional index is created via sql and not via write path.
-      // HUDI-7662 tracks adding a separate config to enable/disable 
functional index.
-      return false;
+      return getBooleanWithAltKeys(writeConfig, FUNCTIONAL_INDEX_ENABLE_PROP);
     }
 
     @Override
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 4bc841cb728..356aed35b70 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -20,7 +20,7 @@
 package org.apache.spark.sql.hudi.command.index
 
 import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.Option
@@ -424,6 +424,85 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  price double,
+             |  ts long,
+             |  name string
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(name)
+             | location '$basePath'
+       """.stripMargin)
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+        spark.sql(s"insert into $tableName values(1, 10, 1601098924, 'a1')")
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+        spark.sql(s"insert into $tableName values(2, 10, 1632634924, 'a1')")
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+        spark.sql(s"insert into $tableName values(3, 10, 1664170924, 'a2')")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        assertEquals(1, 
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+
+        // verify functional index records by querying metadata table
+        val metadataSql = s"select ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') 
where type=3"
+        checkAnswer(metadataSql)(
+          Seq("2020-09-26", "2021-09-26"), // for file in name=a1
+          Seq("2022-09-26", "2022-09-26") // for file in name=a2
+        )
+
+        // disable functional index
+        spark.sql(s"set 
${HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key}=false")
+        // do another insert after disabling the index
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+        spark.sql(s"insert into $tableName values(4, 10, 1664170924, 'a2')")
+        // check query result
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') = '2022-09-26'")(
+          Seq(3, "a2"),
+          Seq(4, "a2")
+        )
+        // verify there are no new updates to functional index
+        checkAnswer(metadataSql)(
+          Seq("2020-09-26", "2021-09-26"),
+          Seq("2022-09-26", "2022-09-26")
+        )
+
+        // enable functional index
+        spark.sql(s"set 
${HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key}=true")
+        // do another insert after initializing the index
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2024-09-26
+        spark.sql(s"insert into $tableName values(5, 10, 1727329324, 'a3')")
+        // check query result for predicates including values when functional 
index was disabled
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') IN ('2024-09-26', '2022-09-26')")(
+          Seq(3, "a2"),
+          Seq(4, "a2"),
+          Seq(5, "a3")
+        )
+        // verify there are new updates to functional index
+        checkAnswer(metadataSql)(
+          Seq("2020-09-26", "2021-09-26"),
+          Seq("2022-09-26", "2022-09-26"),
+          Seq("2024-09-26", "2024-09-26") // for file in name=a3
+        )
+      }
+    }
+  }
+
   private def assertTableIdentifier(catalogTable: CatalogTable,
                                     expectedDatabaseName: String,
                                     expectedTableName: String): Unit = {

Reply via email to