This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9ca1d6c49d1 [HUDI-7662] Add a metadata config to enable or disable
functional index (#12001)
9ca1d6c49d1 is described below
commit 9ca1d6c49d1e770f7f5ecc22d62a4c5ac5f1b6cf
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Sep 27 07:57:56 2024 +0530
[HUDI-7662] Add a metadata config to enable or disable functional index
(#12001)
Add a metadata config to enable or disable functional index. Note that this
config
is used to disable all functional indexes. To build or drop each functional
index
individually, users still need to use CREATE/DROP INDEX SQL commands.
---
.../metadata/HoodieBackedTableMetadataWriter.java | 6 +
.../client/utils/SparkMetadataWriterUtils.java | 174 +++++++--------------
.../SparkHoodieBackedTableMetadataWriter.java | 54 ++-----
.../hudi/common/config/HoodieMetadataConfig.java | 11 ++
.../hudi/metadata/HoodieTableMetadataUtil.java | 12 +-
.../hudi/metadata/MetadataPartitionType.java | 5 +-
.../hudi/command/index/TestFunctionalIndex.scala | 81 +++++++++-
7 files changed, 185 insertions(+), 158 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f41bd21c78e..d03891b7c6c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -527,6 +527,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
private Set<String> getFunctionalIndexPartitionsToInit() {
+ if (dataMetaClient.getIndexMetadata().isEmpty()) {
+ return Collections.emptySet();
+ }
Set<String> functionalIndexPartitions =
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().keySet();
Set<String> completedMetadataPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
functionalIndexPartitions.removeAll(completedMetadataPartitions);
@@ -1050,6 +1053,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* Update functional index from {@link HoodieCommitMetadata}.
*/
private void updateFunctionalIndexIfPresent(HoodieCommitMetadata
commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>>
partitionToRecordMap) {
+ if (!dataWriteConfig.getMetadataConfig().isFunctionalIndexEnabled()) {
+ return;
+ }
dataMetaClient.getTableConfig().getMetadataPartitions()
.stream()
.filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX))
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index f9fdf0af0c7..8072b34c6d3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -20,7 +20,6 @@
package org.apache.hudi.client.utils;
import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -28,13 +27,11 @@ import
org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.functional.HoodieFunctionalIndex;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -53,7 +50,6 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
@@ -70,118 +66,78 @@ public class SparkMetadataWriterUtils {
private static final String READ_PATHS_CONFIG =
"hoodie.datasource.read.paths";
private static final String GLOB_PATHS_CONFIG = "glob.paths";
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new
ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- long fileSize = baseFile.getFileSize();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ long fileSize = baseFile.getFileSize();
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
baseFile.getStoragePath());
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(logFile -> {
+ long fileSize = logFile.getFileSize();
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
logFile.getPath());
+ });
}
- // Handle log files
- fileSlice.getLogFiles().forEach(logFile -> {
- String fileName = logFile.getFileName();
- Path logFilePath = filePath(basePath, partition, fileName);
- long fileSize = logFile.getFileSize();
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
- });
- return HoodieJavaRDD.of(createColumnStatsRecords(partition,
columnRangeMetadataList, false).collect(Collectors.toList()),
sparkEngineContext, parallelism);
+ return createColumnStatsRecords(partition, columnRangeMetadataList,
false).collect(Collectors.toList());
}
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext,
- HoodieWriteConfig metadataWriteConfig) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext,
+
HoodieWriteConfig metadataWriteConfig) {
List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildBloomFilterMetadata(
- metaClient,
- readerSchema,
- functionalIndex,
- columnToIndex,
- sqlContext,
- bloomFilterMetadataList,
- baseFilePath,
- metadataWriteConfig,
- partition,
- baseFile.getCommitTime());
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(),
metadataWriteConfig, partition,
+ baseFile.getCommitTime());
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(
+ logFile -> buildBloomFilterMetadata(metaClient, readerSchema,
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList,
logFile.getPath(), metadataWriteConfig, partition,
+ logFile.getDeltaCommitTime()));
}
- // Handle log files
- fileSlice.getLogFiles().forEach(logFile -> {
- String fileName = logFile.getFileName();
- Path logFilePath = filePath(basePath, partition, fileName);
- buildBloomFilterMetadata(
- metaClient,
- readerSchema,
- functionalIndex,
- columnToIndex,
- sqlContext,
- bloomFilterMetadataList,
- logFilePath,
- metadataWriteConfig,
- partition,
- logFile.getDeltaCommitTime());
- });
- return HoodieJavaRDD.of(bloomFilterMetadataList, sparkEngineContext,
parallelism);
+ return bloomFilterMetadataList;
}
- private static void buildColumnRangeMetadata(
- HoodieTableMetaClient metaClient,
- Schema readerSchema,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
- long fileSize,
- Path filePath) {
- Dataset<Row> fileDf = readRecordsAsRow(
- new StoragePath[] {convertToStoragePath(filePath)},
- sqlContext,
- metaClient,
- readerSchema);
+ private static void buildColumnRangeMetadata(HoodieTableMetaClient
metaClient,
+ Schema readerSchema,
+ HoodieFunctionalIndex<Column,
Column> functionalIndex,
+ String columnToIndex,
+ SQLContext sqlContext,
+
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+ long fileSize,
+ StoragePath filePath) {
+ Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath},
sqlContext, metaClient, readerSchema);
Column indexedColumn =
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
- HoodieColumnRangeMetadata<Comparable> columnRangeMetadata =
- computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(),
fileSize);
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata =
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(),
fileSize);
columnRangeMetadataList.add(columnRangeMetadata);
}
- private static void buildBloomFilterMetadata(
- HoodieTableMetaClient metaClient,
- Schema readerSchema,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- List<HoodieRecord> bloomFilterMetadataList,
- Path filePath,
- HoodieWriteConfig writeConfig,
- String partitionName,
- String instantTime) {
- Dataset<Row> fileDf =
- readRecordsAsRow(new StoragePath[] {convertToStoragePath(filePath)},
- sqlContext, metaClient, readerSchema);
+ private static void buildBloomFilterMetadata(HoodieTableMetaClient
metaClient,
+ Schema readerSchema,
+ HoodieFunctionalIndex<Column,
Column> functionalIndex,
+ String columnToIndex,
+ SQLContext sqlContext,
+ List<HoodieRecord>
bloomFilterMetadataList,
+ StoragePath filePath,
+ HoodieWriteConfig writeConfig,
+ String partitionName,
+ String instantTime) {
+ Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath},
sqlContext, metaClient, readerSchema);
Column indexedColumn =
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
BloomFilter bloomFilter =
HoodieFileWriterFactory.createBloomFilter(writeConfig);
@@ -190,9 +146,7 @@ public class SparkMetadataWriterUtils {
bloomFilter.add(key);
});
ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
- bloomFilterMetadataList.add(createBloomFilterMetadataRecord(
- partitionName, filePath.toString(), instantTime,
writeConfig.getBloomFilterType(),
- bloomByteBuffer, false));
+ bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName,
filePath.toString(), instantTime, writeConfig.getBloomFilterType(),
bloomByteBuffer, false));
}
private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext
sqlContext,
@@ -252,12 +206,4 @@ public class SparkMetadataWriterUtils {
.filter(c -> !HOODIE_META_COLUMNS.contains(c))
.map(df::col).toArray(Column[]::new));
}
-
- private static Path filePath(String basePath, String partition, String
filename) {
- if (partition.isEmpty()) {
- return new Path(basePath, filename);
- } else {
- return new Path(basePath, partition + StoragePath.SEPARATOR + filename);
- }
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 9d7f63f354b..4dd46e03243 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -150,7 +150,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType>
partitions) {
List<String> partitionsToDrop =
partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
- LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop);
+ LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop);
SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
String actionType =
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION,
HoodieTableType.MERGE_ON_READ);
@@ -163,11 +163,8 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient, int parallelism,
Schema
readerSchema, StorageConfiguration<?> storageConf) {
- HoodieFunctionalIndex<Column, Column> functionalIndex = new
HoodieSparkFunctionalIndex(
- indexDefinition.getIndexName(),
- indexDefinition.getIndexFunction(),
- indexDefinition.getSourceFields(),
- indexDefinition.getIndexOptions());
+ HoodieFunctionalIndex<Column, Column> functionalIndex =
+ new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
if (indexDefinition.getSourceFields().isEmpty()) {
// In case there are no columns to index, bail
@@ -179,40 +176,23 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
String columnToIndex = indexDefinition.getSourceFields().get(0);
SQLContext sqlContext = sparkEngineContext.getSqlContext();
String basePath = metaClient.getBasePath().toString();
- for (Pair<String, FileSlice> pair : partitionFileSlicePairs) {
- String partition = pair.getKey();
- FileSlice fileSlice = pair.getValue();
- // For functional index using column_stats
+
+ // Group FileSlices by partition
+ Map<String, List<FileSlice>> partitionToFileSlicesMap =
partitionFileSlicePairs.stream()
+ .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
+ List<HoodieRecord> allRecords = new ArrayList<>();
+ for (Map.Entry<String, List<FileSlice>> entry :
partitionToFileSlicesMap.entrySet()) {
+ String partition = entry.getKey();
+ List<FileSlice> fileSlices = entry.getValue();
+ List<HoodieRecord> recordsForPartition = Collections.emptyList();
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- return getFunctionalIndexRecordsUsingColumnStats(
- metaClient,
- parallelism,
- readerSchema,
- fileSlice,
- basePath,
- partition,
- functionalIndex,
- columnToIndex,
- sqlContext,
- sparkEngineContext);
- }
- // For functional index using bloom_filters
- if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- return getFunctionalIndexRecordsUsingBloomFilter(
- metaClient,
- parallelism,
- readerSchema,
- fileSlice,
- basePath,
- partition,
- functionalIndex,
- columnToIndex,
- sqlContext,
- sparkEngineContext,
- metadataWriteConfig);
+ recordsForPartition =
getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices,
partition, functionalIndex, columnToIndex, sqlContext);
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ recordsForPartition =
getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, fileSlices,
partition, functionalIndex, columnToIndex, sqlContext, metadataWriteConfig);
}
+ allRecords.addAll(recordsForPartition);
}
- return HoodieJavaRDD.of(Collections.emptyList(), sparkEngineContext,
parallelism);
+ return HoodieJavaRDD.of(allRecords, sparkEngineContext, parallelism);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 40e2ab94d41..736b21e847a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -316,6 +316,13 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.withDocumentation("Initializes the metadata table by reading from the
file system when the table is first created. Enabled by default. "
+ "Warning: This should only be disabled when manually constructing
the metadata table outside of typical Hudi writer flows.");
+ public static final ConfigProperty<Boolean> FUNCTIONAL_INDEX_ENABLE_PROP =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.functional.enable")
+ .defaultValue(false)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Enable functional index within the Metadata Table.
Note that this config is to enable/disable all functional indexes. "
+ + "To enable or disable each functional index individually, users
still need to use CREATE/DROP INDEX SQL commands.");
+
public static final ConfigProperty<Integer>
FUNCTIONAL_INDEX_FILE_GROUP_COUNT = ConfigProperty
.key(METADATA_PREFIX + ".index.functional.file.group.count")
.defaultValue(2)
@@ -494,6 +501,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getBoolean(AUTO_INITIALIZE);
}
+ public boolean isFunctionalIndexEnabled() {
+ return getBooleanOrDefault(FUNCTIONAL_INDEX_ENABLE_PROP);
+ }
+
public int getFunctionalIndexFileGroupCount() {
return getInt(FUNCTIONAL_INDEX_FILE_GROUP_COUNT);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e85aebc9da1..31bb643bbda 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1851,9 +1851,15 @@ public class HoodieTableMetadataUtil {
}
public static Schema
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
- TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- Schema tableSchema = schemaResolver.getTableAvroSchema();
- return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
+ Schema tableSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ List<String> partitionFields =
metaClient.getTableConfig().getPartitionFields()
+ .map(Arrays::asList)
+ .orElse(Collections.emptyList());
+ List<String> sourceFields = indexDefinition.getSourceFields();
+ List<String> mergedFields = new ArrayList<>(partitionFields.size() +
sourceFields.size());
+ mergedFields.addAll(partitionFields);
+ mergedFields.addAll(sourceFields);
+ return addMetadataFields(getSchemaForFields(tableSchema, mergedFields));
}
public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 80ec142ea77..c539971162d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -43,6 +43,7 @@ import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -176,9 +177,7 @@ public enum MetadataPartitionType {
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
"func-index-", -1) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
- // Functional index is created via sql and not via write path.
- // HUDI-7662 tracks adding a separate config to enable/disable
functional index.
- return false;
+ return getBooleanWithAltKeys(writeConfig, FUNCTIONAL_INDEX_ENABLE_PROP);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 4bc841cb728..356aed35b70 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -20,7 +20,7 @@
package org.apache.spark.sql.hudi.command.index
import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.common.util.Option
@@ -424,6 +424,85 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ test("Test Enable and Disable Functional Index") {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ // create a simple partitioned mor table and insert some records
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | price double,
+ | ts long,
+ | name string
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(name)
+ | location '$basePath'
+ """.stripMargin)
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+ spark.sql(s"insert into $tableName values(1, 10, 1601098924, 'a1')")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+ spark.sql(s"insert into $tableName values(2, 10, 1632634924, 'a1')")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+ spark.sql(s"insert into $tableName values(3, 10, 1664170924, 'a2')")
+ // create functional index and verify
+ spark.sql(s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+ val metaClient = createMetaClient(spark, basePath)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+ assertTrue(metaClient.getIndexMetadata.isPresent)
+ assertEquals(1,
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+
+ // verify functional index records by querying metadata table
+ val metadataSql = s"select ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName')
where type=3"
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2021-09-26"), // for file in name=a1
+ Seq("2022-09-26", "2022-09-26") // for file in name=a2
+ )
+
+ // disable functional index
+ spark.sql(s"set
${HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key}=false")
+ // do another insert after disabling the index
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+ spark.sql(s"insert into $tableName values(4, 10, 1664170924, 'a2')")
+ // check query result
+ checkAnswer(s"select id, name from $tableName where from_unixtime(ts,
'yyyy-MM-dd') = '2022-09-26'")(
+ Seq(3, "a2"),
+ Seq(4, "a2")
+ )
+ // verify there are no new updates to functional index
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2021-09-26"),
+ Seq("2022-09-26", "2022-09-26")
+ )
+
+ // enable functional index
+ spark.sql(s"set
${HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key}=true")
+ // do another insert after initializing the index
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2024-09-26
+ spark.sql(s"insert into $tableName values(5, 10, 1727329324, 'a3')")
+ // check query result for predicates including values when functional
index was disabled
+ checkAnswer(s"select id, name from $tableName where from_unixtime(ts,
'yyyy-MM-dd') IN ('2024-09-26', '2022-09-26')")(
+ Seq(3, "a2"),
+ Seq(4, "a2"),
+ Seq(5, "a3")
+ )
+ // verify there are new updates to functional index
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2021-09-26"),
+ Seq("2022-09-26", "2022-09-26"),
+ Seq("2024-09-26", "2024-09-26") // for file in name=a3
+ )
+ }
+ }
+ }
+
private def assertTableIdentifier(catalogTable: CatalogTable,
expectedDatabaseName: String,
expectedTableName: String): Unit = {