This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e0cf4b67a4e [HUDI-7146] Integrate secondary index on reader path 
(#11162)
e0cf4b67a4e is described below

commit e0cf4b67a4ec4fbf345563dac19170220f306b58
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jun 7 12:17:36 2024 +0530

    [HUDI-7146] Integrate secondary index on reader path (#11162)
    
    This PR integrates the secondary index on the reader path as follows:
    
    - Add HoodieTableMetadata#readSecondaryIndex API
    - Add SecondaryIndexSupport, a util class to get candidate files based on 
query filter and secondary index
    - Changes in HoodieFileIndex to use SecondaryIndexSupport
---
 .../client/common/HoodieFlinkEngineContext.java    |   6 +
 .../client/common/HoodieJavaEngineContext.java     |   6 +
 .../client/common/HoodieSparkEngineContext.java    |   9 +
 .../hudi/common/engine/HoodieEngineContext.java    |  14 ++
 .../common/engine/HoodieLocalEngineContext.java    |   6 +
 .../common/model/HoodieMetadataRecordMerger.java   |  12 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  35 +++-
 .../metadata/FileSystemBackedTableMetadata.java    |   5 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 187 +++++++++++++++++++--
 .../metadata/HoodieMetadataLogRecordReader.java    |   4 +-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |   6 +
 .../model/TestHoodieMetadataRecordMerger.java      |   4 +-
 .../hudi/HoodieSparkFunctionalIndexClient.java     |   1 +
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |   4 +
 .../org/apache/hudi/SecondaryIndexSupport.scala    | 130 ++++++++++++++
 .../apache/hudi/TestSecondaryIndexSupport.scala    |  91 ++++++++++
 .../hudi/functional/SecondaryIndexTestBase.scala   |  79 +++++++--
 .../hudi/functional/TestFunctionalIndex.scala      |  16 +-
 ...thSql.scala => TestSecondaryIndexPruning.scala} |  82 ++++-----
 .../hudi/command/index/TestSecondaryIndex.scala    | 101 ++++++++++-
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  14 +-
 21 files changed, 729 insertions(+), 83 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index cf4d0da4850..be73658e0e2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -192,6 +193,11 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
     // no operation for now
   }
 
+  @Override
+  public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+    return data.collectAsList().stream().parallel().reduce(zeroValue, 
seqOp::apply, combOp::apply);
+  }
+
   /**
    * Override the flink context supplier to return constant write token.
    */
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index df864a3334d..f542d5a0f7f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -169,4 +170,9 @@ public class HoodieJavaEngineContext extends 
HoodieEngineContext {
   public void cancelAllJobs() {
     // no operation for now
   }
+
+  @Override
+  public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+    return data.collectAsList().stream().reduce(zeroValue, seqOp::apply, 
combOp::apply);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 84fe97dcc8e..b1763634bc0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.sql.SQLContext;
 
@@ -229,6 +231,13 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
     javaSparkContext.cancelAllJobs();
   }
 
+  @Override
+  public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+    Function2<O, I, O> seqOpFunc = seqOp::apply;
+    Function2<O, O, O> combOpFunc = combOp::apply;
+    return HoodieJavaRDD.getJavaRDD(data).aggregate(zeroValue, seqOpFunc, 
combOpFunc);
+  }
+
   public SparkConf getConf() {
     return javaSparkContext.getConf();
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 597a2ea12a4..b16f9330292 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -109,4 +110,17 @@ public abstract class HoodieEngineContext {
   public abstract void cancelJob(String jobId);
 
   public abstract void cancelAllJobs();
+
+  /**
+   * Aggregate the elements of each partition, and then the results for all 
the partitions, using given combine functions and a neutral "zero value".
+   *
+   * @param data
+   * @param zeroValue the initial value for the accumulated result of each 
partition for the seqOp operator
+   * @param seqOp     function to aggregate the elements of each partition
+   * @param combOp    function to combine results from different partitions
+   * @param <I>       input object type
+   * @param <O>       output object type
+   * @return the result of the aggregation
+   */
+  public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index e1252d246b4..d605349b30d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -168,4 +169,9 @@ public final class HoodieLocalEngineContext extends 
HoodieEngineContext {
   public void cancelAllJobs() {
     // no operation for now
   }
+
+  @Override
+  public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+    return data.collectAsList().stream().reduce(zeroValue, seqOp::apply, 
combOp::apply);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
index c118cd803bc..477db44aa5e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
@@ -25,9 +25,12 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.avro.Schema;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
 /**
  * Record merger that accumulates metadata records.
  */
@@ -37,7 +40,12 @@ public class HoodieMetadataRecordMerger extends 
HoodiePreCombineAvroRecordMerger
 
   @Override
   public List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) 
throws IOException {
-    // TODO: Implement this method for secondary keys. Currently, it just 
mimics the superclass.
-    return Collections.singletonList(super.merge(older, oldSchema, newer, 
newSchema, props).get());
+    // If the new record is not a delete record, then combine the two records.
+    if (newer.isDelete(newSchema, props)) {
+      return Collections.singletonList(Pair.of(newer, newSchema));
+    }
+    checkArgument(older.getRecordKey().equals(newer.getRecordKey()), "Record 
key must be the same for both records");
+    checkArgument(oldSchema.equals(newSchema), "Schema must be the same for 
both records");
+    return Arrays.asList(Pair.of(older, oldSchema), Pair.of(newer, newSchema));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index ca3c6543afd..633ab7cd1da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -312,6 +312,33 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     return recordKeyToLocation;
   }
 
+  /**
+   * Get record-location using secondary-index and record-index
+   * <p>
+   * If the Metadata Table is not enabled, an exception is thrown to 
distinguish this from the absence of the key.
+   *
+   * @param secondaryKeys The list of secondary keys to read
+   */
+  @Override
+  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
+        "Record index is not initialized in MDT");
+    ValidationUtils.checkState(
+        
dataMetaClient.getTableConfig().getMetadataPartitions().contains(partitionName),
+        "Secondary index is not initialized in MDT for: " + partitionName);
+    // Fetch secondary-index records
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> secondaryKeyRecords 
= getSecondaryIndexRecords(secondaryKeys, partitionName);
+    // Now collect the record-keys and fetch the RLI records
+    List<String> recordKeys = new ArrayList<>();
+    secondaryKeyRecords.forEach((key, records) -> records.forEach(record -> {
+      if (!record.getData().isDeleted()) {
+        recordKeys.add(record.getData().getRecordKeyFromSecondaryIndex());
+      }
+    }));
+
+    return readRecordIndex(recordKeys);
+  }
+
   /**
    * Returns a map of (record-key -> secondary-key) for the provided record 
keys.
    */
@@ -380,8 +407,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     return pathInfoList;
   }
 
-  Map<String, List<StoragePathInfo>> 
fetchAllFilesInPartitionPaths(List<StoragePath> partitionPaths)
-      throws IOException {
+  Map<String, List<StoragePathInfo>> 
fetchAllFilesInPartitionPaths(List<StoragePath> partitionPaths) {
     Map<String, StoragePath> partitionIdToPathMap =
         partitionPaths.parallelStream()
             .collect(
@@ -439,6 +465,11 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
 
   protected abstract Map<String, String> 
getSecondaryKeysForRecordKeys(List<String> recordKeys, String partitionName);
 
+  /**
+   * Returns a map of (record-key -> list-of-secondary-index-records) for the 
provided secondary keys.
+   */
+  protected abstract Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName);
+
   public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 1fd9d0f5696..c7483e1e4f3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -319,6 +319,11 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
     throw new HoodieMetadataException("Unsupported operation: 
readRecordIndex!");
   }
 
+  @Override
+  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+    throw new HoodieMetadataException("Unsupported operation: 
readSecondaryIndex!");
+  }
+
   @Override
   public int getNumFileGroupsForPartition(MetadataPartitionType partition) {
     throw new HoodieMetadataException("Unsupported operation: 
getNumFileGroupsForPartition");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 18ad59dbb26..aa13ab0f0ad 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
@@ -37,11 +38,11 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -66,6 +67,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -75,8 +77,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
 import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
@@ -206,7 +210,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     //       records matching the key-prefix
     List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
         k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
-    ValidationUtils.checkState(!partitionFileSlices.isEmpty(), "Number of file 
slices for partition " + partitionName + " should be > 0");
+    checkState(!partitionFileSlices.isEmpty(), "Number of file slices for 
partition " + partitionName + " should be > 0");
 
     return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
         engineContext.parallelize(partitionFileSlices))
@@ -259,7 +263,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
         k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
     final int numFileSlices = partitionFileSlices.size();
-    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+    checkState(numFileSlices > 0, "Number of file slices for partition " + 
partitionName + " should be > 0");
 
     // Lookup keys from each file slice
     if (numFileSlices == 1) {
@@ -306,7 +310,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
         k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
     final int numFileSlices = partitionFileSlices.size();
-    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+    checkState(numFileSlices > 0, "Number of file slices for partition " + 
partitionName + " should be > 0");
 
     // Lookup keys from each file slice
     if (numFileSlices == 1) {
@@ -503,10 +507,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
           kv.getKey(),
           kv.getValue(),
           (oldRecordList, newRecordList) -> {
-            // TODO(vinay): Remove this assert and handle secondary keys 
correctly
-            assert oldRecordList.size() <= 1;
-            assert newRecordList.size() <= 1;
-
             List<HoodieRecord<HoodieMetadataPayload>> mergedRecordList = new 
ArrayList<>();
             HoodieMetadataPayload mergedPayload = null;
             HoodieKey key = null;
@@ -583,7 +583,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     try {
       HoodieTimer timer = HoodieTimer.start();
       // Open base file reader
-      Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = 
getBaseFileReader(slice, timer);
+      // If the partition is a secondary index partition, use the HBase HFile 
reader instead of native HFile reader.
+      // TODO (HUDI-7831): Support reading secondary index records using 
native HFile reader.
+      boolean shouldUseNativeHFileReader = 
!partitionName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX);
+      Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = 
getBaseFileReader(slice, timer, shouldUseNativeHFileReader);
       HoodieSeekingFileReader<?> baseFileReader = 
baseFileReaderOpenTimePair.getKey();
       final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
 
@@ -602,16 +605,20 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     }
   }
 
-  private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice 
slice, HoodieTimer timer) throws IOException {
+  private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice 
slice, HoodieTimer timer, boolean shouldUseNativeHFileReader) throws 
IOException {
     HoodieSeekingFileReader<?> baseFileReader;
     long baseFileOpenMs;
     // If the base file is present then create a reader
     Option<HoodieBaseFile> baseFile = slice.getBaseFile();
     if (baseFile.isPresent()) {
       StoragePath baseFilePath = baseFile.get().getStoragePath();
+      HoodieConfig readerConfig = DEFAULT_HUDI_CONFIG_FOR_READER;
+      if (!shouldUseNativeHFileReader) {
+        readerConfig.setValue(USE_NATIVE_HFILE_READER, "false");
+      }
       baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieIOFactory.getIOFactory(metadataMetaClient.getStorage())
           .getReaderFactory(HoodieRecordType.AVRO)
-          .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, baseFilePath);
+          .getFileReader(readerConfig, baseFilePath);
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s 
in %d ms", baseFilePath,
           baseFile.get().getCommitTime(), baseFileOpenMs));
@@ -851,6 +858,164 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return recordKeyMap;
   }
 
+  @Override
+  protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName) {
+    if (keys.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    checkState(numFileSlices > 0, "Number of file slices for partition " + 
partitionName + " should be > 0");
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Lookup keys 
from each file slice");
+    HoodieData<FileSlice> partitionRDD = 
engineContext.parallelize(partitionFileSlices);
+    // Define the seqOp function (merges elements within a partition)
+    Functions.Function2<Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>, FileSlice, Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>> seqOp =
+        (accumulator, partition) -> {
+          Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys, 
partition);
+          currentFileSliceResult.forEach((secondaryKey, secondaryRecords) -> 
accumulator.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
+            newRecords.addAll(oldRecords);
+            return newRecords;
+          }));
+          return accumulator;
+        };
+    // Define the combOp function (merges elements across partitions)
+    Functions.Function2<Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>> combOp =
+        (map1, map2) -> {
+          map2.forEach((secondaryKey, secondaryRecords) -> 
map1.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
+            newRecords.addAll(oldRecords);
+            return newRecords;
+          }));
+          return map1;
+        };
+    // Use aggregate to merge results within and across partitions
+    // Define the zero value (initial value)
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> zeroValue = new 
HashMap<>();
+    return engineContext.aggregate(partitionRDD, zeroValue, seqOp, combOp);
+  }
+
+  /**
+   * Lookup list of keys from a single file slice.
+   *
+   * @param partitionName Name of the partition
+   * @param secondaryKeys The list of secondary keys to lookup
+   * @param fileSlice     The file slice to read
+   * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} 
for the secondary-keys which were found in the file slice
+   */
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
lookupSecondaryKeysFromFileSlice(String partitionName, List<String> 
secondaryKeys, FileSlice fileSlice) {
+    Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
+    try {
+      List<Long> timings = new ArrayList<>(1);
+      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+      if (baseFileReader == null && logRecordScanner == null) {
+        return Collections.emptyMap();
+      }
+
+      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+      Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+      List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+      Collections.sort(sortedSecondaryKeys);
+      secondaryKeySet.addAll(sortedSecondaryKeys);
+
+      logRecordScanner.getRecords().forEach(record -> {
+        HoodieMetadataPayload payload = record.getData();
+        String recordKey = payload.getRecordKeyFromSecondaryIndex();
+        if (secondaryKeySet.contains(recordKey)) {
+          String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
+          logRecordsMap.computeIfAbsent(secondaryKey, k -> new 
HashMap<>()).put(recordKey, record);
+        }
+      });
+
+      return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, 
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + secondaryKeys.size() + " key : ", ioe);
+    } finally {
+      if (!reuse) {
+        closeReader(readers);
+      }
+    }
+  }
+
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
readNonUniqueRecordsAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
+                                                                               
                             List<String> sortedKeys,
+                                                                               
                             Map<String, HashMap<String, HoodieRecord>> 
logRecordsMap,
+                                                                               
                             List<Long> timings,
+                                                                               
                             String partitionName) throws IOException {
+    HoodieTimer timer = HoodieTimer.start();
+
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new 
HashMap<>();
+    if (reader == null) {
+      // No base file at all
+      timings.add(timer.endTimer());
+      logRecordsMap.forEach((secondaryKey, logRecords) -> {
+        List<HoodieRecord<HoodieMetadataPayload>> recordList = new 
ArrayList<>();
+        logRecords.values().forEach(record -> {
+          recordList.add((HoodieRecord<HoodieMetadataPayload>) record);
+        });
+        resultMap.put(secondaryKey, recordList);
+      });
+      return resultMap;
+    }
+
+    HoodieTimer readTimer = HoodieTimer.start();
+
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> baseFileRecordsMap =
+        fetchBaseFileAllRecordsByKeys(reader, sortedKeys, true, partitionName);
+    logRecordsMap.forEach((secondaryKey, logRecords) -> {
+      if (!baseFileRecordsMap.containsKey(secondaryKey)) {
+        List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
+            .values()
+            .stream()
+            .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+            .collect(Collectors.toList());
+
+        resultMap.put(secondaryKey, recordList);
+      } else {
+        List<HoodieRecord<HoodieMetadataPayload>> baseFileRecords = 
baseFileRecordsMap.get(secondaryKey);
+        List<HoodieRecord<HoodieMetadataPayload>> resultRecords = new 
ArrayList<>();
+
+        baseFileRecords.forEach(prevRecord -> {
+          HoodieMetadataPayload prevPayload = prevRecord.getData();
+          String recordKey = prevPayload.getRecordKeyFromSecondaryIndex();
+
+          if (!logRecords.containsKey(recordKey)) {
+            resultRecords.add(prevRecord);
+          } else {
+            // Merge the records
+            HoodieRecord<HoodieMetadataPayload> newRecord = 
logRecords.get(recordKey);
+            HoodieMetadataPayload newPayload = newRecord.getData();
+            
checkState(recordKey.equals(newPayload.getRecordKeyFromSecondaryIndex()), 
"Record key mismatch between log record and secondary index record");
+            // The rules for merging the prevRecord and the latestRecord is 
noted below. Note that this only applies for SecondaryIndex
+            // records in the metadata table (which is the only user of this 
API as of this implementation)
+            // 1. Iff latestRecord is deleted (i.e it is a tombstone) AND 
prevRecord is null (i.e not buffered), then discard latestRecord
+            //    basefile never had a matching record?
+            // 2. Iff latestRecord is deleted AND prevRecord is non-null, then 
remove prevRecord from the buffer AND discard the latestRecord
+            // 3. Iff latestRecord is not deleted AND prevRecord is non-null, 
then remove the prevRecord from the buffer AND retain the latestRecord
+            //    The rationale is that the most recent record is always 
retained (based on arrival time). TODO: verify this logic
+            // 4. Iff latestRecord is not deleted AND prevRecord is null, then 
retain the latestRecord (same rationale as #1)
+            if (!newPayload.isSecondaryIndexDeleted()) {
+              // All the four cases boils down to just "Retain newRecord iff 
it is not deleted"
+              resultRecords.add(newRecord);
+            }
+          }
+        });
+
+        resultMap.put(secondaryKey, resultRecords);
+      }
+    });
+
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
+
+    timings.add(timer.endTimer());
+    return resultMap;
+  }
+
   private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader, Set<String> 
keySet, String partitionName) throws IOException {
     if (reader == null) {
       // No base file at all
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 17b7688133e..b404c9e9ddc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -43,7 +43,7 @@ import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
 
 /**
  * Metadata log-block records reading implementation, internally relying on
@@ -253,7 +253,7 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
     }
 
     private boolean shouldUseMetadataMergedLogRecordScanner() {
-      return PARTITION_NAME_SECONDARY_INDEX.equals(partitionName);
+      return partitionName.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 91d52fdbdaa..f45790b38b4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -228,6 +228,12 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
    */
   Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String> 
recordKeys);
 
+  /**
+   * Returns the location of records which the provided secondary keys maps to.
+   * Records that are not found are ignored and won't be part of map object 
that is returned.
+   */
+  Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName);
+
   /**
    * Fetch records by key prefixes. Key prefix passed is expected to match the 
same prefix as stored in Metadata table partitions. For eg, in case of col 
stats partition,
    * actual keys in metadata partition is encoded values of column name, 
partition name and file name. So, key prefixes passed to this method is 
expected to be encoded already.
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
index b63eb9e1531..d734e0d8e49 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
@@ -59,7 +59,7 @@ public class TestHoodieMetadataRecordMerger {
     List<HoodieRecord> updateRecordList = dataGen.generateUpdates("0001", 
newRecordList);
     HoodieMetadataRecordMerger recordMerger = new HoodieMetadataRecordMerger();
     List<Pair<HoodieRecord, Schema>> mergedRecords = 
recordMerger.fullOuterMerge(newRecordList.get(0), AVRO_SCHEMA, 
updateRecordList.get(0), AVRO_SCHEMA, new TypedProperties());
-    assertEquals(1, mergedRecords.size());
-    assertEquals(updateRecordList.get(0), mergedRecords.get(0).getLeft());
+    assertEquals(2, mergedRecords.size());
+    assertEquals(updateRecordList.get(0), mergedRecords.get(1).getLeft());
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
index 6e924f30d4a..5ea59ea04d2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
@@ -85,6 +85,7 @@ public class HoodieSparkFunctionalIndexClient extends 
BaseHoodieFunctionalIndexC
     if (indexExists(metaClient, indexName)) {
       throw new HoodieFunctionalIndexException("Index already exists: " + 
indexName);
     }
+    checkArgument(columns.size() == 1, "Only one column can be indexed for 
functional or secondary index.");
 
     if (!metaClient.getTableConfig().getIndexDefinitionPath().isPresent()
         || !metaClient.getIndexMetadata().isPresent()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index ebf0cb8992e..7d5df55af17 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -103,6 +103,7 @@ case class HoodieFileIndex(spark: SparkSession,
   @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = 
List(
     new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
     new BucketIndexSupport(spark, metadataConfig, metaClient),
+    new SecondaryIndexSupport(spark, metadataConfig, metaClient),
     new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
     new FunctionalIndexSupport(spark, metadataConfig, metaClient),
     new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
@@ -411,6 +412,9 @@ case class HoodieFileIndex(spark: SparkSession,
   private def isBloomFiltersIndexEnabled: Boolean = indicesSupport.exists(idx 
=>
     idx.getIndexName == BloomFiltersIndexSupport.INDEX_NAME && 
idx.isIndexAvailable)
 
+  private def isSecondaryIndexEnabled: Boolean = indicesSupport.exists(idx =>
+    idx.getIndexName == SecondaryIndexSupport.INDEX_NAME && 
idx.isIndexAvailable)
+
   private def isIndexEnabled: Boolean = indicesSupport.exists(idx => 
idx.isIndexAvailable)
 
   private def validateConfig(): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
new file mode 100644
index 00000000000..c7e630ea33f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
+import org.apache.hudi.SecondaryIndexSupport.filterQueriesWithSecondaryKey
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
+import org.apache.hudi.storage.StoragePath
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConverters, mutable}
+
+class SecondaryIndexSupport(spark: SparkSession,
+                            metadataConfig: HoodieMetadataConfig,
+                            metaClient: HoodieTableMetaClient) extends 
RecordLevelIndexSupport(spark, metadataConfig, metaClient) {
+  override def getIndexName: String = SecondaryIndexSupport.INDEX_NAME
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean
+                                        ): Option[Set[String]] = {
+    val secondaryKeyConfigOpt = getSecondaryKeyConfig(queryReferencedColumns, 
metaClient)
+    if (secondaryKeyConfigOpt.isEmpty) {
+      Option.empty
+    }
+    lazy val (_, secondaryKeys) = if (isIndexAvailable) 
filterQueriesWithSecondaryKey(queryFilters, secondaryKeyConfigOpt.map(_._2)) 
else (List.empty, List.empty)
+    if (isIndexAvailable && queryFilters.nonEmpty && secondaryKeys.nonEmpty) {
+      val allFiles = fileIndex.inputFiles.map(strPath => new 
StoragePath(strPath)).toSeq
+      Some(getCandidateFilesFromSecondaryIndex(allFiles, secondaryKeys, 
secondaryKeyConfigOpt.get._1))
+    } else {
+      Option.empty
+    }
+  }
+
+  override def invalidateCaches(): Unit = {
+    // no caches for this index type, do nothing
+  }
+
+  /**
+   * Return true if metadata table is enabled and functional index metadata 
partition is available.
+   */
+  override def isIndexAvailable: Boolean = {
+    metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent && 
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
+  }
+
+  /**
+   * Returns the list of candidate files which store the provided record keys 
based on Metadata Table Secondary Index
+   * and Metadata Table Record Index.
+   *
+   * @param secondaryKeys - List of secondary keys.
+   * @return Sequence of file names which need to be queried
+   */
+  private def getCandidateFilesFromSecondaryIndex(allFiles: Seq[StoragePath], 
secondaryKeys: List[String], secondaryIndexName: String): Set[String] = {
+    val recordKeyLocationsMap = 
metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava,
 secondaryIndexName)
+    val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
+    val candidateFiles: mutable.Set[String] = mutable.Set.empty
+    for (locations <- 
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
 {
+      for (location <- 
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
+        fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
+      }
+    }
+
+    for (file <- allFiles) {
+      val fileId = FSUtils.getFileIdFromFilePath(file)
+      val partitionOpt = fileIdToPartitionMap.get(fileId)
+      if (partitionOpt.isDefined) {
+        candidateFiles += file.getName
+      }
+    }
+    candidateFiles.toSet
+  }
+
+  /**
+   * Returns the configured secondary key for the table
+   * TODO: Handle multiple secondary indexes (similar to functional index)
+   */
+  private def getSecondaryKeyConfig(queryReferencedColumns: Seq[String],
+                                    metaClient: HoodieTableMetaClient): 
Option[(String, String)] = {
+    val indexDefinitions = 
metaClient.getIndexMetadata.get.getIndexDefinitions.asScala
+    indexDefinitions.values
+      .find(indexDef => 
indexDef.getIndexType.equals(PARTITION_NAME_SECONDARY_INDEX) &&
+        queryReferencedColumns.contains(indexDef.getSourceFields.get(0)))
+      .map(indexDef => (indexDef.getIndexName, 
indexDef.getSourceFields.get(0)))
+  }
+}
+
+object SecondaryIndexSupport {
+  val INDEX_NAME = "secondary_index"
+
+  def filterQueriesWithSecondaryKey(queryFilters: Seq[Expression],
+                                    secondaryKeyConfigOpt: Option[String]): 
(List[Expression], List[String]) = {
+    var secondaryKeyQueries: List[Expression] = List.empty
+    var secondaryKeys: List[String] = List.empty
+    for (query <- queryFilters) {
+      filterQueryWithRecordKey(query, secondaryKeyConfigOpt).foreach({
+        case (exp: Expression, recKeys: List[String]) =>
+          secondaryKeys = secondaryKeys ++ recKeys
+          secondaryKeyQueries = secondaryKeyQueries :+ exp
+      })
+    }
+
+    Tuple2.apply(secondaryKeyQueries, secondaryKeys)
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
new file mode 100644
index 00000000000..d2bfb055d62
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.SecondaryIndexSupport.filterQueriesWithSecondaryKey
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, FromUnixTime, GreaterThan, In, Literal, Not}
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.util.TimeZone
+
+class TestSecondaryIndexSupport {
+
+  @Test
+  def testFilterQueryWithSecondaryKey(): Unit = {
+    // Case 1: EqualTo filters not on simple AttributeReference and 
non-Literal should return empty result
+    val fmt = "yyyy-MM-dd HH:mm:ss"
+    val fromUnixTime = FromUnixTime(Literal(0L), Literal(fmt), 
Some(TimeZone.getDefault.getID))
+    var testFilter: Expression = EqualTo(fromUnixTime, Literal("2020-01-01 
00:10:20"))
+    var result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.empty)._2
+    assertTrue(result.isEmpty)
+
+    // Case 2: EqualTo filters not on Literal and not on simple 
AttributeReference should return empty result
+    testFilter = EqualTo(Literal("2020-01-01 00:10:20"), fromUnixTime)
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), Option.empty)._2
+    assertTrue(result.isEmpty)
+
+    // Case 3: EqualTo filters on simple AttributeReference and non-Literal 
should return empty result
+    testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = 
true)(), fromUnixTime)
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), Option.empty)._2
+    assertTrue(result.isEmpty)
+
+    // Case 4: EqualTo filters on simple AttributeReference and Literal which 
should return non-empty result
+    testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = 
true)(), Literal("row1"))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+    assertTrue(result.nonEmpty)
+    assertEquals(result, List.apply("row1"))
+
+    // case 5: EqualTo on fields other than record key should return empty 
result
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply("blah"))._2
+    assertTrue(result.isEmpty)
+
+    // Case 6: In filter on fields other than record key should return empty 
result
+    testFilter = In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(Literal("xyz"), Literal("abc")))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply("blah"))._2
+    assertTrue(result.isEmpty)
+
+    // Case 7: In filter on record key should return non-empty result
+    testFilter = In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(Literal("xyz"), Literal("abc")))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+    assertTrue(result.nonEmpty)
+
+    // Case 8: In filter on simple AttributeReference(on record-key) and 
non-Literal should return empty result
+    testFilter = In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(fromUnixTime))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+    assertTrue(result.isEmpty)
+
+    // Case 9: Anything other than EqualTo and In predicate is not supported. 
Hence it returns empty result
+    testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(Literal("xyz"), Literal("abc"))))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+    assertTrue(result.isEmpty)
+
+    testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(fromUnixTime)))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+    assertTrue(result.isEmpty)
+
+    testFilter = GreaterThan(AttributeReference("_row_key", StringType, 
nullable = true)(), Literal("row1"))
+    result = filterQueriesWithSecondaryKey(Seq(testFilter), 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+    assertTrue(result.isEmpty)
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
index f10b1157a6a..9b7a14a5644 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
@@ -20,32 +20,37 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, 
PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
-import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConverters
 
 class SecondaryIndexTestBase extends HoodieSparkClientTestBase {
 
   var spark: SparkSession = _
-  var instantTime: AtomicInteger = _
-  val targetColumnsToIndex: Seq[String] = Seq("rider", "driver")
   val metadataOpts: Map[String, String] = Map(
     HoodieMetadataConfig.ENABLE.key -> "true",
-    HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
-    HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> 
targetColumnsToIndex.mkString(",")
+    HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
   )
   val commonOpts: Map[String, String] = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
-    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_si",
-    RECORDKEY_FIELD.key -> "_row_key",
-    PARTITIONPATH_FIELD.key -> "partition,trip_type",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    RECORDKEY_FIELD.key -> "record_key_col",
+    PARTITIONPATH_FIELD.key -> "partition_key_col",
     HIVE_STYLE_PARTITIONING.key -> "true",
-    PRECOMBINE_FIELD.key -> "timestamp"
+    PRECOMBINE_FIELD.key -> "ts"
   ) ++ metadataOpts
   var mergedDfList: List[DataFrame] = List.empty
 
@@ -53,7 +58,9 @@ class SecondaryIndexTestBase extends 
HoodieSparkClientTestBase {
   override def setUp(): Unit = {
     initPath()
     initSparkContexts()
-    setTableName("hoodie_test_si")
+    initHoodieStorage()
+    initTestDataGenerator()
+    setTableName("hoodie_test")
     spark = sqlContext.sparkSession
   }
 
@@ -62,4 +69,54 @@ class SecondaryIndexTestBase extends 
HoodieSparkClientTestBase {
     cleanupResources()
   }
 
+  def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: String): 
Unit = {
+    mergedDfList = 
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
 :: mergedDfList
+    val secondaryKey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs(columnName).toString)
+    val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
+    verifyFilePruning(hudiOpts, dataFilter)
+  }
+
+  private def attribute(partition: String): AttributeReference = {
+    AttributeReference(partition, StringType, nullable = true)()
+  }
+
+
+  private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression): Unit = {
+    // with data skipping
+    val commonOpts = opts + ("path" -> basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
+    val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
+    val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
+    assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+
+    // with no data skipping
+    fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
+    val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
+    assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
+  }
+
+  private def getLatestDataFilesCount(opts: Map[String, String], 
includeLogFiles: Boolean = true) = {
+    var totalLatestDataFiles = 0L
+    
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+      .values()
+      .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+        (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+          slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
+            + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    totalLatestDataFiles
+  }
+
+  private def getTableFileSystemView(opts: Map[String, String]): 
HoodieMetadataFileSystemView = {
+    new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, 
metadataWriter(getWriteConfig(opts)).getTableMetadata)
+  }
+
+  private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig 
= {
+    val props = 
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
+    HoodieWriteConfig.newBuilder()
+      .withProps(props)
+      .withPath(basePath)
+      .build()
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
index a5e4a148f7c..2812c865c31 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
@@ -309,7 +309,6 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     if (HoodieSparkUtils.gteqSpark3_2) {
       withTempDir { tmp =>
         Seq("cow").foreach { tableType =>
-          val databaseName = "default"
           val tableName = generateTableName
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
           spark.sql("set hoodie.metadata.enable=true")
@@ -333,17 +332,12 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
           spark.sql(s"insert into $tableName (id, name, ts, price) values(1, 
'a1', 1000, 10)")
           spark.sql(s"insert into $tableName (id, name, ts, price) values(2, 
'a2', 200000, 100)")
           spark.sql(s"insert into $tableName (id, name, ts, price) values(3, 
'a3', 2000000000, 1000)")
-
-          var metaClient = createMetaClient(spark, basePath)
-
-          var createIndexSql = s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
-
-          spark.sql(createIndexSql)
-          spark.sql(s"select key, type, ColumnStatsMetadata from 
hudi_metadata('$tableName') where type = 3").show(false)
-
-          metaClient = createMetaClient(spark, basePath)
+          // create functional index
+          spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+          // validate index created successfully
+          val metaClient = createMetaClient(spark, basePath)
           assertTrue(metaClient.getIndexMetadata.isPresent)
-          var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+          val functionalIndexMetadata = metaClient.getIndexMetadata.get()
           assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
           assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
similarity index 50%
rename from 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexWithSql.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 0331c43ae49..9de8b065f59 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -19,9 +19,10 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieSparkUtils}
 import org.apache.spark.sql.Row
 import org.junit.jupiter.api.{Tag, Test}
 import org.scalatest.Assertions.assertResult
@@ -30,64 +31,69 @@ import org.scalatest.Assertions.assertResult
  * Test cases for secondary index
  */
 @Tag("functional")
-class TestSecondaryIndexWithSql extends SecondaryIndexTestBase {
+class TestSecondaryIndexPruning extends SecondaryIndexTestBase {
 
   @Test
-  def testSecondaryIndexWithSQL(): Unit = {
+  def testSecondaryIndexWithFilters(): Unit = {
     if (HoodieSparkUtils.gteqSpark3_2) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts + (
+        DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.COPY_ON_WRITE.name(),
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+
       spark.sql(
         s"""
            |create table $tableName (
            |  ts bigint,
-           |  id string,
-           |  rider string,
-           |  driver string,
-           |  fare int,
-           |  city string,
-           |  state string
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
            |) using hudi
            | options (
-           |  primaryKey ='id',
-           |  type = 'mor',
-           |  preCombineField = 'ts',
+           |  primaryKey ='record_key_col',
            |  hoodie.metadata.enable = 'true',
            |  hoodie.metadata.record.index.enable = 'true',
-           |  hoodie.metadata.index.secondary.enable = 'true',
-           |  hoodie.datasource.write.recordkey.field = 'id'
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true'
            | )
-           | partitioned by(state)
+           | partitioned by(partition_key_col)
            | location '$basePath'
        """.stripMargin)
-      spark.sql(
-        s"""
-           | insert into $tableName
-           | values
-           | (1695159649087, '334e26e9-8355-45cc-97c6-c31daf0df330', 
'rider-A', 'driver-K', 19, 'san_francisco', 'california'),
-           | (1695091554787, 'e96c4396-3fad-413a-a942-4cb36106d720', 
'rider-B', 'driver-M', 27, 'austin', 'texas')
-           | """.stripMargin
-      )
-
-      // validate record_index created successfully
-      val metadataDF = spark.sql(s"select key from hudi_metadata('$basePath') 
where type=5")
-      assert(metadataDF.count() == 2)
-
-      var metaClient = HoodieTableMetaClient.builder()
-        .setBasePath(basePath)
-        .setConf(HoodieTestUtils.getDefaultStorageConf)
-        .build()
-      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
       // create secondary index
-      spark.sql(s"create index idx_city on $tableName using 
secondary_index(city)")
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
       metaClient = HoodieTableMetaClient.builder()
         .setBasePath(basePath)
         .setConf(HoodieTestUtils.getDefaultStorageConf)
         .build()
-      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_city"))
-      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate data skipping
+      verifyQueryPredicate(hudiOpts, "not_record_key_col")
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1"),
+        Seq("cde", "row2"),
+        Seq("def", "row3")
+      )
 
+      // create another secondary index on non-string column
+      spark.sql(s"create index idx_ts on $tableName using secondary_index(ts)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_ts"))
+      // validate data skipping
+      verifyQueryPredicate(hudiOpts, "ts")
+      // validate the secondary index records themselves
       checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
-        Seq("austin", "e96c4396-3fad-413a-a942-4cb36106d720"),
-        Seq("san_francisco", "334e26e9-8355-45cc-97c6-c31daf0df330")
+        Seq("1", "row1"),
+        Seq("2", "row2"),
+        Seq("3", "row3"),
+        Seq("abc", "row1"),
+        Seq("cde", "row2"),
+        Seq("def", "row3")
       )
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 26f1a901d89..de76c4a59a5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -20,6 +20,8 @@
 package org.apache.spark.sql.hudi.command.index
 
 import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 
 class TestSecondaryIndex extends HoodieSparkSqlTestBase {
@@ -67,7 +69,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
             "Secondary index already exists: idx_price_1"
           )
 
-          spark.sql(s"show indexes from $tableName").show()
           checkAnswer(s"show indexes from $tableName")(
             Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
             Seq("idx_price", "price", "lucene", 
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
@@ -76,7 +77,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
           checkAnswer(s"drop index idx_name on $tableName")()
           checkException(s"drop index idx_name on $tableName")("Secondary 
index not exists: idx_name")
 
-          spark.sql(s"show indexes from $tableName").show()
           checkAnswer(s"show indexes from $tableName")(
             Seq("idx_price", "price", "lucene", 
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
           )
@@ -93,4 +93,101 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Secondary Index Creation With hudi_metadata TVF") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir {
+        tmp => {
+          val tableName = generateTableName
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+          createTempTableAndInsert(tableName, basePath)
+
+          // validate record_index created successfully
+          val metadataDF = spark.sql(s"select key from 
hudi_metadata('$basePath') where type=5")
+          assert(metadataDF.count() == 2)
+
+          var metaClient = HoodieTableMetaClient.builder()
+            .setBasePath(basePath)
+            .setConf(HoodieTestUtils.getDefaultStorageConf)
+            .build()
+          
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+          // create secondary index
+          spark.sql(s"create index idx_city on $tableName using 
secondary_index(city)")
+          metaClient = HoodieTableMetaClient.builder()
+            .setBasePath(basePath)
+            .setConf(HoodieTestUtils.getDefaultStorageConf)
+            .build()
+          
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_city"))
+          
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+
+          checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
+            Seq("austin", "e96c4396-3fad-413a-a942-4cb36106d720"),
+            Seq("san_francisco", "334e26e9-8355-45cc-97c6-c31daf0df330")
+          )
+        }
+      }
+    }
+  }
+
+  test("Test Secondary Index Creation Failure For Multiple Fields") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir {
+        tmp => {
+          val tableName = generateTableName
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+          createTempTableAndInsert(tableName, basePath)
+
+          // validate record_index created successfully
+          val metadataDF = spark.sql(s"select key from 
hudi_metadata('$basePath') where type=5")
+          assert(metadataDF.count() == 2)
+
+          val metaClient = HoodieTableMetaClient.builder()
+            .setBasePath(basePath)
+            .setConf(HoodieTestUtils.getDefaultStorageConf)
+            .build()
+          
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+          // create secondary index throws error when trying to create on 
multiple fields at a time
+          checkException(sql = s"create index idx_city on $tableName using 
secondary_index(city,state)")(
+            "Only one column can be indexed for functional or secondary index."
+          )
+        }
+      }
+    }
+  }
+
+  private def createTempTableAndInsert(tableName: String, basePath: String) = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  ts bigint,
+         |  id string,
+         |  rider string,
+         |  driver string,
+         |  fare int,
+         |  city string,
+         |  state string
+         |) using hudi
+         | options (
+         |  primaryKey ='id',
+         |  type = 'mor',
+         |  preCombineField = 'ts',
+         |  hoodie.metadata.enable = 'true',
+         |  hoodie.metadata.record.index.enable = 'true',
+         |  hoodie.metadata.index.secondary.enable = 'true',
+         |  hoodie.datasource.write.recordkey.field = 'id'
+         | )
+         | partitioned by(state)
+         | location '$basePath'
+       """.stripMargin)
+    spark.sql(
+      s"""
+         | insert into $tableName
+         | values
+         | (1695159649087, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 
'driver-K', 19, 'san_francisco', 'california'),
+         | (1695091554787, 'e96c4396-3fad-413a-a942-4cb36106d720', 'rider-B', 
'driver-M', 27, 'austin', 'texas')
+         | """.stripMargin
+    )
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b48e4f4cb1a..8e89ddd9c66 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -27,7 +27,6 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
 import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, 
getSparkConfForTest}
-
 import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -37,12 +36,14 @@ import org.apache.spark.util.Utils
 import org.joda.time.DateTimeZone
 import org.scalactic.source
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
+import org.slf4j.LoggerFactory
 
 import java.io.File
 import java.util.TimeZone
 
 class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
   org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
+  private val LOG = LoggerFactory.getLogger(getClass)
 
   private lazy val sparkWareHouse = {
     val dir = Utils.createTempDir()
@@ -80,16 +81,25 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
   }
 
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any /* Assertion */)(implicit pos: source.Position): Unit = {
+    var ignoreTestErr = false
     super.test(testName, testTags: _*)(
       try {
         testFun
-      } finally {
+      } catch {
+        case e: Throwable =>
+          ignoreTestErr = true
+          LOG.warn("Test error due to exception: " + e.getMessage, e)
+      }
+      finally {
         val catalog = spark.sessionState.catalog
         catalog.listDatabases().foreach{db =>
           catalog.listTables(db).foreach {table =>
             catalog.dropTable(table, true, true)
           }
         }
+        if (ignoreTestErr) {
+          spark.stop()
+        }
       }
     )
   }

Reply via email to