This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae5833ea34a [HUDI-8533] Bloom functional Index creation without 
function fails (#12290)
ae5833ea34a is described below

commit ae5833ea34afc8849722509d185f4e94dd3fb424
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Nov 22 20:18:49 2024 +0530

    [HUDI-8533] Bloom functional Index creation without function fails (#12290)
    
    Allow creation of functional index without an input function.
    Uses a default identity function in such a case.
    
    ---------
    
    Co-authored-by: vinoth chandar <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../client/utils/SparkMetadataWriterUtils.java     |  20 +--
 .../SparkHoodieBackedTableMetadataWriter.java      |   7 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   8 +-
 .../index/functional/HoodieFunctionalIndex.java    |   2 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  12 +-
 .../metadata/FileSystemBackedTableMetadata.java    |   4 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |  14 +-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  31 +++-
 .../hudi/metadata/MetadataPartitionType.java       |   7 +
 .../common/table/TestHoodieTableMetaClient.java    |  29 ++++
 .../org/apache/hudi/FunctionalIndexSupport.scala   | 136 +++++++++++-----
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |  10 +-
 .../apache/hudi/TestSecondaryIndexSupport.scala    |   1 -
 .../spark/sql/hudi/command/IndexCommands.scala     |   6 +-
 .../functional/TestSecondaryIndexPruning.scala     | 111 ++++++++++++-
 .../hudi/command/index/TestFunctionalIndex.scala   | 175 +++++++++++++++++++--
 16 files changed, 485 insertions(+), 88 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 4a9c326b296..484ddb651eb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -82,7 +83,7 @@ public class SparkMetadataWriterUtils {
   public static Column[] getFunctionalIndexColumns() {
     return new Column[] {
         functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION),
-        functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH),
+        
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH),
         functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE)
     };
   }
@@ -90,7 +91,7 @@ public class SparkMetadataWriterUtils {
   public static String[] getFunctionalIndexColumnNames() {
     return new String[] {
         HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
-        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
+        HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH,
         HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE
     };
   }
@@ -129,13 +130,13 @@ public class SparkMetadataWriterUtils {
           long valueCount = row.getLong(baseAggregatePosition + 3);
 
           String partitionName = row.getString(0);
-          String filePath = row.getString(1);
+          String relativeFilePath = row.getString(1);
           long totalFileSize = row.getLong(2);
           // Total uncompressed size is harder to get directly. This is just 
an approximation to maintain the order.
           long totalUncompressedSize = totalFileSize * 2;
 
           HoodieColumnRangeMetadata<Comparable> rangeMetadata = 
HoodieColumnRangeMetadata.create(
-              filePath,
+              relativeFilePath,
               columnToIndex,
               minValue,
               maxValue,
@@ -150,21 +151,22 @@ public class SparkMetadataWriterUtils {
   }
 
   public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex,
-                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime) {
+                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime, String 
indexName) {
     // Group data using functional index metadata and then create bloom filter 
on the group
     Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, 
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
-        // row.get(0) refers to partition path value and row.get(1) refers to 
file name.
-        .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(0), 
row.getString(1)), Encoders.kryo(Pair.class))
+        // row.get(1) refers to partition path value and row.get(2) refers to 
file name.
+        .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(1), 
row.getString(2)), Encoders.kryo(Pair.class))
         .flatMapGroups((FlatMapGroupsFunction<Pair, Row, HoodieRecord>)  
((pair, iterator) -> {
           String partition = pair.getLeft().toString();
-          String fileName = pair.getRight().toString();
+          String relativeFilePath = pair.getRight().toString();
+          String fileName = FSUtils.getFileName(relativeFilePath, partition);
           BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);
           iterator.forEachRemaining(row -> {
             byte[] key = row.getAs(columnToIndex).toString().getBytes();
             bloomFilter.add(key);
           });
           ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
-          HoodieRecord bloomFilterRecord = 
createBloomFilterMetadataRecord(partition, fileName, instantTime, 
metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false);
+          HoodieRecord bloomFilterRecord = 
createBloomFilterMetadataRecord(partition, fileName, instantTime, 
metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
           return Collections.singletonList(bloomFilterRecord).iterator();
         }), Encoders.kryo(HoodieRecord.class));
     return HoodieJavaRDD.of(bloomFilterRecords.javaRDD());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 6eb3d51247b..2e1f6352e37 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -191,17 +191,18 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
           String partition = entry.getKey();
           Pair<String, Long> filePathSizePair = entry.getValue();
           String filePath = filePathSizePair.getKey();
+          String relativeFilePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(filePath));
           long fileSize = filePathSizePair.getValue();
           List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
               FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
-          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, filePath, fileSize);
+          List<Row> rowsWithIndexMetadata = 
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, 
partition, relativeFilePath, fileSize);
           return rowsWithIndexMetadata.iterator();
         });
 
     // Generate dataset with functional index metadata
     StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
         
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
-        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, 
DataTypes.StringType, false, Metadata.empty()))
+        
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH,
 DataTypes.StringType, false, Metadata.empty()))
         
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
     Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
 
@@ -215,7 +216,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
       return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset, 
functionalIndex, columnToIndex);
     } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
-      return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime);
+      return 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime, 
indexDefinition.getIndexName());
     } else {
       throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 442d0d81be6..428cfe0777f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +89,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.index.functional.HoodieFunctionalIndex.SPARK_IDENTITY;
 import static org.apache.hudi.io.storage.HoodieIOFactory.getIOFactory;
 
 /**
@@ -219,11 +219,13 @@ public class HoodieTableMetaClient implements 
Serializable {
         "Index metadata is already present");
     String indexMetaPath = getIndexDefinitionPath();
     List<String> columnNames = new ArrayList<>(columns.keySet());
-    HoodieIndexDefinition indexDefinition = new 
HoodieIndexDefinition(indexName, indexType, options.get("func"), columnNames, 
options);
+    HoodieIndexDefinition indexDefinition = new 
HoodieIndexDefinition(indexName, indexType, options.getOrDefault("func", 
SPARK_IDENTITY), columnNames, options);
     if (indexMetadataOpt.isPresent()) {
       indexMetadataOpt.get().getIndexDefinitions().put(indexName, 
indexDefinition);
     } else {
-      indexMetadataOpt = Option.of(new 
HoodieIndexMetadata(Collections.singletonMap(indexName, indexDefinition)));
+      Map<String, HoodieIndexDefinition> indexDefinitionMap = new HashMap<>();
+      indexDefinitionMap.put(indexName, indexDefinition);
+      indexMetadataOpt = Option.of(new 
HoodieIndexMetadata(indexDefinitionMap));
     }
     try {
       FileIOUtils.createFileInPath(storage, new StoragePath(indexMetaPath), 
Option.of(getUTF8Bytes(indexMetadataOpt.get().toJson())));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
index 4edb1e4c090..df3cc10f434 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
@@ -31,7 +31,7 @@ import java.util.List;
  */
 public interface HoodieFunctionalIndex<S, T> extends Serializable {
 
-  String HOODIE_FUNCTIONAL_INDEX_FILE_PATH = 
"_hoodie_functional_index_file_path";
+  String HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH = 
"_hoodie_functional_index_relative_file_path";
   String HOODIE_FUNCTIONAL_INDEX_PARTITION = 
"_hoodie_functional_index_partition";
   String HOODIE_FUNCTIONAL_INDEX_FILE_SIZE = 
"_hoodie_functional_index_file_size";
   String SPARK_DATE_FORMAT = "date_format";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index cef4dd9352f..4aea9eeb356 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -166,14 +166,14 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   @Override
-  public Option<BloomFilter> getBloomFilter(final String partitionName, final 
String fileName) throws HoodieMetadataException {
-    if 
(!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS))
 {
+  public Option<BloomFilter> getBloomFilter(final String partitionName, final 
String fileName, final String metadataPartitionName) throws 
HoodieMetadataException {
+    if 
(!dataMetaClient.getTableConfig().getMetadataPartitions().contains(metadataPartitionName))
 {
       LOG.error("Metadata bloom filter index is disabled!");
       return Option.empty();
     }
 
     final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
-    Map<Pair<String, String>, BloomFilter> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    Map<Pair<String, String>, BloomFilter> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName), 
metadataPartitionName);
     if (bloomFilters.isEmpty()) {
       LOG.error("Meta index: missing bloom filter for partition: {}, file: 
{}", partitionName, fileName);
       return Option.empty();
@@ -184,9 +184,9 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   @Override
-  public Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+  public Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList, final String 
metadataPartitionName)
       throws HoodieMetadataException {
-    if 
(!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS))
 {
+    if 
(!dataMetaClient.getTableConfig().getMetadataPartitions().contains(metadataPartitionName))
 {
       LOG.error("Metadata bloom filter index is disabled!");
       return Collections.emptyMap();
     }
@@ -206,7 +206,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
 
     List<String> partitionIDFileIDStringsList = new 
ArrayList<>(partitionIDFileIDStrings);
     Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
-        getRecordsByKeys(partitionIDFileIDStringsList, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+        getRecordsByKeys(partitionIDFileIDStringsList, metadataPartitionName);
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));
     metrics.ifPresent(m -> 
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR, 
partitionIDFileIDStringsList.size()));
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index d0036e6bc5b..efafbad5a08 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -285,13 +285,13 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
     // no-op
   }
 
-  public Option<BloomFilter> getBloomFilter(final String partitionName, final 
String fileName)
+  public Option<BloomFilter> getBloomFilter(final String partitionName, final 
String fileName, final String metadataPartitionName)
       throws HoodieMetadataException {
     throw new HoodieMetadataException("Unsupported operation: getBloomFilter 
for " + fileName);
   }
 
   @Override
-  public Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+  public Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList, final String 
metadataPartitionName)
       throws HoodieMetadataException {
     throw new HoodieMetadataException("Unsupported operation: 
getBloomFilters!");
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index b3562b86d2b..b0bb9670825 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -288,6 +288,15 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     return new HoodieAvroRecord<>(key, payload);
   }
 
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final String bloomFilterType,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    return createBloomFilterMetadataRecord(partitionName, baseFileName, 
timestamp, bloomFilterType, bloomFilter, isDeleted, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+  }
+
   /**
    * Create bloom filter metadata record.
    *
@@ -303,12 +312,13 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
                                                                                
     final String timestamp,
                                                                                
     final String bloomFilterType,
                                                                                
     final ByteBuffer bloomFilter,
-                                                                               
     final boolean isDeleted) {
+                                                                               
     final boolean isDeleted,
+                                                                               
     String metadataPartitionName) {
     checkArgument(!baseFileName.contains(StoragePath.SEPARATOR)
             && FSUtils.isBaseFile(new StoragePath(baseFileName)),
         "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
     final String bloomFilterIndexKey = getBloomFilterRecordKey(partitionName, 
baseFileName);
-    HoodieKey key = new HoodieKey(bloomFilterIndexKey, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    HoodieKey key = new HoodieKey(bloomFilterIndexKey, metadataPartitionName);
 
     HoodieMetadataBloomFilter metadataBloomFilter =
         new HoodieMetadataBloomFilter(bloomFilterType, timestamp, bloomFilter, 
isDeleted);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 816e34fce49..af11bfaf6e8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -202,7 +202,21 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
    * @return BloomFilter if available, otherwise empty
    * @throws HoodieMetadataException
    */
-  Option<BloomFilter> getBloomFilter(final String partitionName, final String 
fileName)
+  default Option<BloomFilter> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    return getBloomFilter(partitionName, fileName, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+  }
+
+  /**
+   * Get the bloom filter for the FileID from the metadata table.
+   *
+   * @param partitionName         - Partition name
+   * @param fileName              - File name for which bloom filter needs to 
be retrieved
+   * @param metadataPartitionName - Metadata partition name
+   * @return BloomFilter if available, otherwise empty
+   * @throws HoodieMetadataException
+   */
+  Option<BloomFilter> getBloomFilter(final String partitionName, final String 
fileName, final String metadataPartitionName)
       throws HoodieMetadataException;
 
   /**
@@ -212,7 +226,20 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
    * @return Map of partition file name pair to its bloom filter
    * @throws HoodieMetadataException
    */
-  Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+  default Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    return getBloomFilters(partitionNameFileNameList, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+  }
+
+  /**
+   * Get bloom filters for files from the metadata table index.
+   *
+   * @param partitionNameFileNameList - List of partition and file name pair 
for which bloom filters need to be retrieved
+   * @param metadataPartitionName     - Metadata partition name
+   * @return Map of partition file name pair to its bloom filter
+   * @throws HoodieMetadataException
+   */
+  Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList, final String 
metadataPartitionName)
       throws HoodieMetadataException;
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 00d1cc0ff05..ab8e74a9e9e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -335,6 +335,13 @@ public enum MetadataPartitionType {
         || 
metadataPartitionPath.startsWith(FUNCTIONAL_INDEX.getPartitionPath());
   }
 
+  public static String getGenericIndexNameWithoutPrefix(String indexName) {
+    String prefix = indexName.startsWith(SECONDARY_INDEX.getPartitionPath())
+        ? SECONDARY_INDEX.getPartitionPath()
+        : FUNCTIONAL_INDEX.getPartitionPath();
+    return indexName.substring(prefix.length());
+  }
+
   // Partition path in metadata table.
   private final String partitionPath;
   // FileId prefix used for all file groups in this partition.
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index 171230a3dda..b6a541a9f0f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -18,13 +18,16 @@
 
 package org.apache.hudi.common.table;
 
+import org.apache.hudi.common.model.HoodieIndexMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.Path;
@@ -33,6 +36,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
@@ -243,4 +249,27 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
     
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
 "/a/b/c");
     assertEquals(randomDefinitionPath, metaClient.getIndexDefinitionPath());
   }
+
+  @Test
+  public void testDeleteDefinition() throws IOException {
+    final String basePath = tempDir.toAbsolutePath() + Path.SEPARATOR + "t7";
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName("table")
+        .initTable(this.metaClient.getStorageConf(), basePath);
+    Map<String, Map<String, String>> columnsMap = new HashMap<>();
+    columnsMap.put("c1", Collections.emptyMap());
+    String indexName = 
MetadataPartitionType.FUNCTIONAL_INDEX.getPartitionPath() + "idx";
+    metaClient.buildIndexDefinition(indexName, "column_stats",
+        columnsMap, Collections.emptyMap());
+    
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(indexName));
+    assertTrue(metaClient.getStorage().exists(new 
StoragePath(metaClient.getIndexDefinitionPath())));
+    metaClient.deleteIndexDefinition(indexName);
+    assertTrue(metaClient.getIndexMetadata().isEmpty());
+    assertTrue(metaClient.getStorage().exists(new 
StoragePath(metaClient.getIndexDefinitionPath())));
+    // Read from storage
+    HoodieIndexMetadata indexMetadata = HoodieIndexMetadata.fromJson(
+        new String(FileIOUtils.readDataFromPath(metaClient.getStorage(), new 
StoragePath(metaClient.getIndexDefinitionPath())).get()));
+    assertTrue(indexMetadata.getIndexDefinitions().isEmpty());
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 9a56a865c7d..a6689ba5fce 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -22,19 +22,20 @@ package org.apache.hudi
 import org.apache.hudi.FunctionalIndexSupport._
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP
+import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
 import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
-import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
+import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition, 
HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
 import org.apache.hudi.data.HoodieJavaRDD
-import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.util.JFunction
 import 
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, 
createDataFrameFromRDD}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -57,12 +58,21 @@ class FunctionalIndexSupport(spark: SparkSession,
                                          prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
                                          shouldPushDownFilesFilter: Boolean
                                         ): Option[Set[String]] = {
-    lazy val functionalIndexPartitionOpt = 
getFunctionalIndexPartition(queryFilters)
+    lazy val functionalIndexPartitionOpt = 
getFunctionalIndexPartitionAndLiterals(queryFilters)
     if (isIndexAvailable && queryFilters.nonEmpty && 
functionalIndexPartitionOpt.nonEmpty) {
-      val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, 
inMemoryProjectionThreshold)
-      val (prunedPartitions, prunedFileNames) = 
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
-      val indexDf = 
loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, prunedPartitions, 
readInMemory)
-      Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+      val (indexPartition, literals) = functionalIndexPartitionOpt.get
+      val indexDefinition = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+      if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
 {
+        val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
+        val (prunedPartitions, prunedFileNames) = 
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
+        val indexDf = loadFunctionalIndexDataFrame(indexPartition, 
prunedPartitions, readInMemory)
+        Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+      } else if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
 {
+        val prunedPartitionAndFileNames = 
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, 
includeLogFiles = true)
+        Option.apply(getCandidateFilesForKeys(indexPartition, 
prunedPartitionAndFileNames, literals))
+      } else {
+        Option.empty
+      }
     } else {
       Option.empty
     }
@@ -79,6 +89,23 @@ class FunctionalIndexSupport(spark: SparkSession,
     metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent && 
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
   }
 
+  def filterQueriesWithFunctionalFilterKey(queryFilters: Seq[Expression], 
sourceFieldOpt: Option[String]): List[Tuple2[Expression, List[String]]] = {
+    var functionalIndexQueries: List[Tuple2[Expression, List[String]]] = 
List.empty
+    for (query <- queryFilters) {
+      filterQueryWithRecordKey(query, sourceFieldOpt, (expr: Expression) => {
+        expr match {
+          case expression: UnaryExpression => expression.child
+          case other => other
+        }
+      }).foreach({
+        case (exp: Expression, literals: List[String]) =>
+          functionalIndexQueries = functionalIndexQueries :+ Tuple2.apply(exp, 
literals)
+      })
+    }
+
+    functionalIndexQueries
+  }
+
   /**
    * Searches for an index partition based on the specified index function and 
target column name.
    *
@@ -92,18 +119,20 @@ class FunctionalIndexSupport(spark: SparkSession,
    * @return An `Option` containing the index partition identifier if a 
matching index definition is found.
    *         Returns `None` if no matching index definition is found.
    */
-  private def getFunctionalIndexPartition(queryFilters: Seq[Expression]): 
Option[String] = {
-    val functionToColumnNames = extractSparkFunctionNames(queryFilters)
-    if (functionToColumnNames.nonEmpty) {
-      // Currently, only one functional index in the query is supported. 
HUDI-7620 for supporting multiple functions.
-      checkState(functionToColumnNames.size == 1, "Currently, only one 
function with functional index in the query is supported")
-      val (indexFunction, targetColumnName) = functionToColumnNames.head
-      val indexDefinitions = 
metaClient.getIndexMetadata.get().getIndexDefinitions
-      indexDefinitions.asScala.collectFirst {
-        case (indexPartition, indexDefinition)
-          if indexDefinition.getIndexFunction.equals(indexFunction) && 
indexDefinition.getSourceFields.contains(targetColumnName) =>
-          indexPartition
-      }
+  private def getFunctionalIndexPartitionAndLiterals(queryFilters: 
Seq[Expression]): Option[Tuple2[String, List[String]]] = {
+    val indexDefinitions = 
metaClient.getIndexMetadata.get().getIndexDefinitions.asScala
+    if (indexDefinitions.nonEmpty) {
+      val functionDefinitions = indexDefinitions.values
+        .filter(definition => 
MetadataPartitionType.fromPartitionPath(definition.getIndexName).equals(MetadataPartitionType.FUNCTIONAL_INDEX))
+        .toList
+      var indexPartitionAndLiteralsOpt: Option[Tuple2[String, List[String]]] = 
Option.empty
+      functionDefinitions.foreach(indexDefinition => {
+        val queryInfoOpt = extractQueryAndLiterals(queryFilters, 
indexDefinition)
+        if (queryInfoOpt.isDefined) {
+          indexPartitionAndLiteralsOpt = 
Option.apply(Tuple2.apply(indexDefinition.getIndexName, queryInfoOpt.get._2))
+        }
+      })
+      indexPartitionAndLiteralsOpt
     } else {
       Option.empty
     }
@@ -118,20 +147,18 @@ class FunctionalIndexSupport(spark: SparkSession,
    * one of the functions and operates on a single column, this method maps 
the function name to the
    * column name.
    */
-  private def extractSparkFunctionNames(queryFilters: Seq[Expression]): 
Map[String, String] = {
-    queryFilters.flatMap { expr =>
-      // Support only simple binary expression on single column
-      if (expr.references.size == 1) {
-        val targetColumnName = expr.references.head.name
-        // Check if the expression string contains any of the function names
-        val exprString = expr.toString
-        SPARK_FUNCTION_MAP.asScala.keys
-          .find(exprString.contains)
-          .map(functionName => functionName -> targetColumnName)
-      } else {
-        None // Skip expressions that do not match the criteria
+  private def extractQueryAndLiterals(queryFilters: Seq[Expression], 
indexDefinition: HoodieIndexDefinition): Option[(Expression, List[String])] = {
+    val functionalIndexQueries = 
filterQueriesWithFunctionalFilterKey(queryFilters, 
Option.apply(indexDefinition.getSourceFields.get(0)))
+    var queryAndLiteralsOpt: Option[(Expression, List[String])] = Option.empty
+    functionalIndexQueries.foreach { tuple =>
+      val (expr, literals) = (tuple._1, tuple._2)
+      val functionNameOption = 
SPARK_FUNCTION_MAP.asScala.keys.find(expr.toString.contains)
+      val functionName = functionNameOption.getOrElse("identity")
+      if (indexDefinition.getIndexFunction.equals(functionName)) {
+        queryAndLiteralsOpt = Option.apply(Tuple2.apply(expr, literals))
       }
-    }.toMap
+    }
+    queryAndLiteralsOpt
   }
 
   def loadFunctionalIndexDataFrame(indexPartition: String,
@@ -139,11 +166,6 @@ class FunctionalIndexSupport(spark: SparkSession,
                                    shouldReadInMemory: Boolean): DataFrame = {
     val colStatsDF = {
       val indexDefinition = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
-      val indexType = indexDefinition.getIndexType
-      // NOTE: Currently only functional indexes created using column_stats is 
supported.
-      // HUDI-7007 tracks for adding support for other index types such as 
bloom filters.
-      
checkState(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS),
-        s"Index type $indexType is not supported")
       val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
loadFunctionalIndexForColumnsInternal(
         indexDefinition.getSourceFields.asScala.toSeq, prunedPartitions, 
indexPartition, shouldReadInMemory)
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
@@ -198,6 +220,44 @@ class FunctionalIndexSupport(spark: SparkSession,
 
     columnStatsRecords
   }
+
+  def getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         includeLogFiles: Boolean = false): 
Map[String, Set[String]] = {
+    prunedPartitionsAndFileSlices.foldLeft(Map.empty[String, Set[String]]) {
+      case (partitionToFileMap, (partitionPathOpt, fileSlices)) =>
+        partitionPathOpt match {
+          case Some(partitionPath) =>
+            val fileNames = fileSlices.flatMap { fileSlice =>
+              val baseFile = 
Option(fileSlice.getBaseFile.orElse(null)).map(_.getFileName).toSeq
+              val logFiles = if (includeLogFiles) {
+                
fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toSeq
+              } else Seq.empty[String]
+              baseFile ++ logFiles
+            }.toSet
+
+            // Update the map with the new partition and its file names
+            partitionToFileMap.updated(partitionPath.path, 
partitionToFileMap.getOrElse(partitionPath.path, Set.empty) ++ fileNames)
+          case None =>
+            partitionToFileMap // Skip if no partition path
+        }
+    }
+  }
+
+  private def getCandidateFilesForKeys(indexPartition: String, 
prunedPartitionAndFileNames: Map[String, Set[String]], keys: List[String]): 
Set[String] = {
+    val candidateFiles = prunedPartitionAndFileNames.flatMap { case 
(partition, fileNames) =>
+      fileNames.filter { fileName =>
+        val bloomFilterOpt = 
toScalaOption(metadataTable.getBloomFilter(partition, fileName, indexPartition))
+        bloomFilterOpt match {
+          case Some(bloomFilter) =>
+            keys.exists(bloomFilter.mightContain)
+          case None =>
+            true // If bloom filter is empty or undefined, assume the file 
might contain the record key
+        }
+      }
+    }.toSet
+
+    candidateFiles
+  }
 }
 
 object FunctionalIndexSupport {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 51e0f846551..fdaa22e5a50 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -129,10 +129,16 @@ object RecordLevelIndexSupport {
    * @param queryFilter The query that need to be filtered.
    * @return Tuple of filtered query and list of record key literals that need 
to be matched
    */
+
   def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: 
Option[String]): Option[(Expression, List[String])] = {
+    filterQueryWithRecordKey(queryFilter, recordKeyOpt, attributeFetcher = 
expr => expr)
+  }
+
+  def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: 
Option[String], attributeFetcher: Function1[Expression, Expression]
+                              ): Option[(Expression, List[String])] = {
     queryFilter match {
       case equalToQuery: EqualTo =>
-        val attributeLiteralTuple = 
getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
+        val attributeLiteralTuple = 
getAttributeLiteralTuple(attributeFetcher.apply(equalToQuery.left), 
attributeFetcher.apply(equalToQuery.right)).orNull
         if (attributeLiteralTuple != null) {
           val attribute = attributeLiteralTuple._1
           val literal = attributeLiteralTuple._2
@@ -147,7 +153,7 @@ object RecordLevelIndexSupport {
 
       case inQuery: In =>
         var validINQuery = true
-        inQuery.value match {
+        attributeFetcher.apply(inQuery.value) match {
           case attribute: AttributeReference =>
             if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
               validINQuery = false
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
index 5b487b740a9..67bae0ef392 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
@@ -101,5 +101,4 @@ class TestSecondaryIndexSupport {
     result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
     assertTrue(result.isEmpty)
   }
-
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index d6d0b6fe233..6e975f08baf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.exception.HoodieIndexException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.metadata.MetadataPartitionType
+import org.apache.hudi.metadata.{HoodieTableMetadataUtil, 
MetadataPartitionType}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -51,7 +51,9 @@ case class CreateIndexCommand(table: CatalogTable,
       new util.LinkedHashMap[String, java.util.Map[String, String]]()
     columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))
 
-    if (options.contains("func") || indexType.equals("secondary_index")) {
+    if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)
+      || indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+      || 
indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
       val extraOpts = options ++ table.properties
       HoodieSparkIndexClient.getInstance(sparkSession).create(
         metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 8969f38f642..d2afdc5e784 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -988,7 +988,6 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
-
   /**
    * Test case to write with updates and validate secondary index with 
clustering.
    */
@@ -1112,6 +1111,8 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+
+
   private def confirmLastCommitType(actionType: ActionType): Unit = {
     metaClient = HoodieTableMetaClient.reload(metaClient)
     val instants = metaClient.getActiveTimeline.getInstants
@@ -1197,13 +1198,117 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testUpdatesReInsertsDeletes(hoodieTableType: HoodieTableType): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      val tableType = hoodieTableType.name()
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      val sqlTableType = if 
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
+      val tableName = s"test_updates_reinserts_deletes_$sqlTableType"
+
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (
+           |    ts BIGINT,
+           |    id STRING,
+           |    rider STRING,
+           |    driver STRING,
+           |    fare DOUBLE,
+           |    city STRING,
+           |    state STRING
+           |) USING HUDI
+           | options(
+           |    primaryKey ='id',
+           |    type = '$sqlTableType',
+           |    hoodie.metadata.enable = 'true',
+           |    hoodie.metadata.record.index.enable = 'true',
+           |    hoodie.datasource.write.recordkey.field = 'id',
+           |    hoodie.enable.data.skipping = 'true'
+           | )
+           | PARTITIONED BY (city, state)
+           | location '$basePath'
+           |""".stripMargin)
+
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.metadata.enable=true")
+      if (HoodieSparkUtils.gteqSpark3_4) {
+        spark.sql("set spark.sql.defaultColumn.enabled=false")
+      }
+
+      spark.sql(
+        s"""|INSERT INTO $tableName(ts, id, rider, driver, fare, city, state) 
VALUES
+            |    
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+            |    
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+            |    
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+            |    
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas');
+            |    """.stripMargin)
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName;")(
+        Seq(1695159649, "trip1", "rider-A", "driver-K", 19.10, 
"san_francisco", "california"),
+        Seq(1695091554, "trip2", "rider-C", "driver-M", 27.70, "sunnyvale", 
"california"),
+        Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin", 
"texas"),
+        Seq(1695516137, "trip4", "rider-F", "driver-P", 34.15, "houston", 
"texas"))
+
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      spark.sql(s"create index idx_rider_$tableName ON $tableName USING 
secondary_index(rider)")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where rider = 'rider-E'")(
+        Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin", 
"texas"))
+      verifyQueryPredicate(hudiOpts, "rider")
+
+      spark.sql(s"create index idx_driver_$tableName ON $tableName USING 
secondary_index(driver)")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where driver = 'driver-P'")(
+        Seq(1695516137, "trip4", "rider-F", "driver-P", 34.15, "houston", 
"texas")
+      )
+      verifyQueryPredicate(hudiOpts, "driver")
+
+      // update such that there are two rider-E records
+      spark.sql(s"update $tableName set rider = 'rider-E' where rider = 
'rider-F'")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where rider = 'rider-E'")(
+        Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin", 
"texas"),
+        Seq(1695516137, "trip4", "rider-E", "driver-P", 34.15, "houston", 
"texas")
+      )
+      verifyQueryPredicate(hudiOpts, "rider")
+
+      // delete one of those records
+      spark.sql(s"delete from $tableName where id = 'trip4'")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where rider = 'rider-E'")(
+        Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin", 
"texas")
+      )
+
+      // reinsert a rider-E record  while changing driver value as well.
+      spark.sql(s"insert into $tableName 
values(1695516137,'trip4','rider-G','driver-Q',34.15,'houston','texas')")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where driver = 'driver-Q';")(
+        Seq(1695516137, "trip4", "rider-G", "driver-Q", 34.15, "houston", 
"texas")
+      )
+
+      // update two other records to rider-E as well.
+      spark.sql(s"update $tableName set rider = 'rider-E' where rider in 
('rider-C','rider-G');")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where rider = 'rider-E'")(
+        Seq(1695091554, "trip2", "rider-E", "driver-M", 27.70, "sunnyvale", 
"california"),
+        Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin", 
"texas"),
+        Seq(1695516137, "trip4", "rider-E", "driver-Q", 34.15, "houston", 
"texas")
+      )
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName where driver = 'driver-Q'")(
+        Seq(1695516137, "trip4", "rider-E", "driver-Q", 34.15, "houston", 
"texas")
+      )
+      verifyQueryPredicate(hudiOpts, "rider")
+    }
+  }
+
   private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
   }
 
-  private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: 
String, nonExistantKey: String = ""): Unit = {
+  private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: 
String, nonExistentKey: String = ""): Unit = {
     mergedDfList = mergedDfList :+ 
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
-    val secondaryKey = mergedDfList.last.limit(2).collect().filter(row => 
!row.getAs(columnName).toString.equals(nonExistantKey))
+    val secondaryKey = mergedDfList.last.limit(2).collect().filter(row => 
!row.getAs(columnName).toString.equals(nonExistentKey))
       .map(row => row.getAs(columnName).toString).head
     val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey))
     verifyFilePruning(hudiOpts, dataFilter)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 69d7f571c95..7101e38cfe4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -23,6 +23,7 @@ import 
org.apache.hudi.DataSourceWriteOptions.{INSERT_OPERATION_OPT_VAL, OPERATI
 import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.client.utils.SparkMetadataWriterUtils
 import org.apache.hudi.{DataSourceReadOptions, FunctionalIndexSupport, 
HoodieFileIndex, HoodieSparkUtils}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig, TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
@@ -36,6 +37,7 @@ import org.apache.hudi.hive.HiveSyncConfigHolder._
 import org.apache.hudi.hive.testutils.HiveTestUtil
 import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
 import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.functional.HoodieFunctionalIndex
 import org.apache.hudi.metadata.{HoodieMetadataFileSystemView, 
MetadataPartitionType}
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, 
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
@@ -45,12 +47,14 @@ import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.{Column, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, FromUnixTime, Literal}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, FromUnixTime, Literal, Upper}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.hudi.command.{CreateIndexCommand, 
ShowIndexesCommand}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.apache.spark.sql.types.{BinaryType, ByteType, DateType, 
DecimalType, IntegerType, ShortType, StringType, StructType, TimestampType}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
 import org.scalatest.Ignore
 
 import java.util.stream.Collectors
@@ -249,8 +253,9 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
           assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
           assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
 
-          // Verify one can create more than one functional index
-          createIndexSql = s"create index name_lower on $tableName using 
column_stats(ts) options(func='identity')"
+          // Verify one can create more than one functional index. When 
function is not provided,
+          // default identity function is used
+          createIndexSql = s"create index name_lower on $tableName using 
column_stats(ts)"
           spark.sql(createIndexSql)
           metaClient = createMetaClient(spark, basePath)
           functionalIndexMetadata = metaClient.getIndexMetadata.get()
@@ -327,7 +332,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
         assert(mdtPartitions.contains("func_index_name_lower") && 
mdtPartitions.contains("func_index_idx_datestr"))
 
         // drop functional index
-        spark.sql(s"drop index func_index_idx_datestr on $tableName")
+        spark.sql(s"drop index idx_datestr on $tableName")
         // validate table config
         metaClient = HoodieTableMetaClient.reload(metaClient)
         
assert(!metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
@@ -685,10 +690,8 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
                  | $partitionByClause
                  | location '$basePath'
        """.stripMargin)
-            if (tableType == "mor") {
-              spark.sql("set hoodie.compact.inline=true")
-              spark.sql("set hoodie.compact.inline.max.delta.commits=2")
-            }
+
+            setCompactionConfigs(tableType)
             if (!isPartitioned) {
               // setting this for non-partitioned table to ensure multiple 
file groups are created
               spark.sql(s"set 
${HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()}=0")
@@ -741,6 +744,100 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  @Test
+  def testBloomFiltersIndexPruning(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          val tableName = generateTableName + s"_bloom_pruning_$tableType"
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+          spark.sql(
+            s"""
+           CREATE TABLE $tableName (
+               |    ts BIGINT,
+               |    id STRING,
+               |    rider STRING,
+               |    driver STRING,
+               |    fare DOUBLE,
+               |    city STRING,
+               |    state STRING
+               |) USING HUDI
+               |options(
+               |    primaryKey ='id',
+               |    type = '$tableType',
+               |    hoodie.metadata.enable = 'true',
+               |    hoodie.datasource.write.recordkey.field = 'id',
+               |    hoodie.enable.data.skipping = 'true'
+               |)
+               |PARTITIONED BY (state)
+               |location '$basePath'
+       """.stripMargin)
+
+          spark.sql("set hoodie.parquet.small.file.limit=0")
+          spark.sql("set hoodie.enable.data.skipping=true")
+          spark.sql("set hoodie.metadata.enable=true")
+          if (HoodieSparkUtils.gteqSpark3_4) {
+            spark.sql("set spark.sql.defaultColumn.enabled=false")
+          }
+
+          spark.sql(
+            s"""
+               |insert into $tableName(ts, id, rider, driver, fare, city, 
state) VALUES
+               |  
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+               |  
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+               |  
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+               |  
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+               |""".stripMargin)
+
+          spark.sql(
+            s"""
+               |insert into $tableName(ts, id, rider, driver, fare, city, 
state) VALUES
+               |  
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+               |  
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+               |""".stripMargin)
+
+          // create index using bloom filters on city column with upper() 
function
+          spark.sql(s"create index idx_bloom_$tableName on $tableName using 
bloom_filters(city) options(func='upper', numHashFunctions=1, 
fpp=0.00000000001)")
+
+          // Pruning takes place only if query uses upper function on city
+          checkAnswer(s"select id, rider from $tableName where upper(city) in 
('sunnyvale', 'sg')")()
+          checkAnswer(s"select id, rider from $tableName where lower(city) = 
'sunny'")()
+          checkAnswer(s"select id, rider from $tableName where upper(city) = 
'SUNNYVALE'")(
+            Seq("trip2", "rider-C")
+          )
+          // verify file pruning
+          var metaClient = createMetaClient(spark, basePath)
+          val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key 
-> "true", HoodieMetadataConfig.ENABLE.key -> "true")
+          val cityColumn = AttributeReference("city", StringType)()
+          val upperCityExpr = Upper(cityColumn) // Apply the `upper` function 
to the city column
+          val targetCityUpper = Literal.create("SUNNYVALE")
+          val dataFilterUpperCityEquals = EqualTo(upperCityExpr, 
targetCityUpper)
+          verifyFilePruning(opts, dataFilterUpperCityEquals, metaClient, 
isDataSkippingExpected = true)
+
+          // drop index and recreate without upper() function
+          spark.sql(s"drop index idx_bloom_$tableName on $tableName")
+          spark.sql(s"create index idx_bloom_$tableName on $tableName using 
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+          // Pruning takes place only if query uses no function on city
+          checkAnswer(s"select id, rider from $tableName where city = 
'sunnyvale'")(
+            Seq("trip2", "rider-C")
+          )
+          metaClient = createMetaClient(spark, basePath)
+          // verify file pruning
+          val targetCity = Literal.create("sunnyvale")
+          val dataFilterCityEquals = EqualTo(cityColumn, targetCity)
+          verifyFilePruning(opts, dataFilterCityEquals, metaClient, 
isDataSkippingExpected = true)
+          // validate IN query
+          checkAnswer(s"select id, rider from $tableName where city in 
('san_diego', 'sunnyvale')")(
+            Seq("trip2", "rider-C"),
+            Seq("trip5", "rider-A"),
+            Seq("trip6", "rider-C")
+          )
+        }
+      }
+    }
+  }
+
   private def assertTableIdentifier(catalogTable: CatalogTable,
                                     expectedDatabaseName: String,
                                     expectedTableName: String): Unit = {
@@ -756,6 +853,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
           val tableName = generateTableName + s"_init_$tableType$isPartitioned"
           val partitionByClause = if (isPartitioned) "partitioned by(price)" 
else ""
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          setCompactionConfigs(tableType)
           spark.sql(
             s"""
                |create table $tableName (
@@ -773,7 +871,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
                | location '$basePath'
        """.stripMargin)
 
-          writeRecordsAndValidateFunctionalIndex(tableName, basePath, 
"update", isDelete = false, shouldCompact = false, shouldCluster = false, 
shouldRollback = false)
+          writeRecordsAndValidateFunctionalIndex(tableName, basePath, isDelete 
= false, shouldCompact = false, shouldCluster = false, shouldRollback = false)
         }
       }
     }
@@ -787,6 +885,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
           val tableName = generateTableName + 
s"_rollback_$tableType$isPartitioned"
           val partitionByClause = if (isPartitioned) "partitioned by(price)" 
else ""
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          setCompactionConfigs(tableType)
           spark.sql(
             s"""
                |create table $tableName (
@@ -804,23 +903,28 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
                | location '$basePath'
        """.stripMargin)
 
-          writeRecordsAndValidateFunctionalIndex(tableName, basePath, 
"update", isDelete = false, shouldCompact = false, shouldCluster = false, 
shouldRollback = true)
+          writeRecordsAndValidateFunctionalIndex(tableName, basePath, isDelete 
= false, shouldCompact = false, shouldCluster = false, shouldRollback = true)
         }
       }
     }
   }
 
+  private def setCompactionConfigs(tableType: String): Unit = {
+    spark.sql(s"set hoodie.compact.inline= ${if (tableType == "mor") "true" 
else "false"}")
+    if (tableType == "mor") {
+      spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+    }
+  }
+
   /**
    * Write records to the table with the given operation type and do updates 
or deletes, and then validate functional index.
    */
   private def writeRecordsAndValidateFunctionalIndex(tableName: String,
                                                      basePath: String,
-                                                     operationType: String,
                                                      isDelete: Boolean,
                                                      shouldCompact: Boolean,
                                                      shouldCluster: Boolean,
-                                                     shouldRollback: Boolean,
-                                                     shouldValidate: Boolean = 
true): Unit = {
+                                                     shouldRollback: Boolean): 
Unit = {
     // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
     spark.sql(s"insert into $tableName values(1, 'a1', 1601098924, 10)")
     // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
@@ -1034,10 +1138,53 @@ class TestFunctionalIndex extends 
HoodieSparkSqlTestBase {
         assertTrue(prunedPartitionAndFileNames._1.size == 1) // partition
         assertTrue(prunedPartitionAndFileNames._2.size == 1) // log file
         assertTrue(FSUtils.isLogFile(prunedPartitionAndFileNames._2.head))
+
+        val prunedPartitionAndFileNamesMap = 
functionalIndexSupport.getPrunedPartitionsAndFileNamesMap(prunedPaths, 
includeLogFiles = true)
+        assertTrue(prunedPartitionAndFileNamesMap.keySet.size == 1) // 
partition
+        assertTrue(prunedPartitionAndFileNamesMap.values.head.size == 1) // 
log file
+        
assertTrue(FSUtils.isLogFile(prunedPartitionAndFileNamesMap.values.head.head))
       }
     }
   }
 
+  test("testGetFunctionalIndexRecordsUsingBloomFilter") {
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key -> "true"
+    )
+    val opts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieWriteConfig.TBL_NAME.key -> 
"testGetFunctionalIndexRecordsUsingBloomFilter",
+      TABLE_TYPE.key -> "MERGE_ON_READ",
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      PARTITIONPATH_FIELD.key() -> "c8",
+      // setting IndexType to be INMEMORY to simulate Global Index nature
+      HoodieIndexConfig.INDEX_TYPE.key -> HoodieIndex.IndexType.INMEMORY.name()
+    )
+    val sourceJSONTablePath = 
getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString
+
+    // NOTE: Schema here is provided for validation that the input date is in 
the appropriate format
+    val sourceTableSchema: StructType = new StructType()
+      .add("c1", IntegerType)
+      .add("c2", StringType)
+      .add("c3", DecimalType(9, 3))
+      .add("c4", TimestampType)
+      .add("c5", ShortType)
+      .add("c6", DateType)
+      .add("c7", BinaryType)
+      .add("c8", ByteType)
+    var df = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+    df = 
df.withColumn(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, 
lit("c/d"))
+      
.withColumn(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH, 
lit("c/d/123141ab-701b-4ba4-b60b-e6acd9e9103e-0_329-224134-258390_2131313124.parquet"))
+      .withColumn(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE, 
lit(100))
+    val bloomFilterRecords = 
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(df, "c5", 
HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", "random")
+    // Since there is only one partition file pair there is only one bloom 
filter record
+    assertEquals(1, bloomFilterRecords.collectAsList().size())
+    assertFalse(bloomFilterRecords.isEmpty)
+  }
+
   private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean 
= false, isNoScanExpected: Boolean = false): Unit = {
     // with data skipping
     val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)

Reply via email to