This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 74308cdbd24 [HUDI-6207] Spark bucket index query pruning  (#10191)
74308cdbd24 is described below

commit 74308cdbd24bcf6c4836249b6abdf4d75ae492b8
Author: KnightChess <[email protected]>
AuthorDate: Mon May 20 16:01:14 2024 +0800

    [HUDI-6207] Spark bucket index query pruning  (#10191)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |   5 +
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  12 +
 .../exception/InvalidHoodieFileNameException.java  |  27 ++
 .../scala/org/apache/hudi/BucketIndexSupport.scala | 219 +++++++++++
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |   3 +-
 .../hudi/functional/TestBucketIndexSupport.scala   | 399 +++++++++++++++++++++
 .../spark/sql/hudi/dml/TestDataSkippingQuery.scala |  78 ++++
 7 files changed, 742 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index ffe902f7d4e..c80c5a2de8a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -334,6 +334,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       .withDocumentation("Only applies when #recordIndexUseCaching is set. 
Determine what level of persistence is used to cache input RDDs. "
           + "Refer to org.apache.spark.storage.StorageLevel for different 
values");
 
+  public static final ConfigProperty<Boolean> BUCKET_QUERY_INDEX = 
ConfigProperty
+      .key("hoodie.bucket.index.query.pruning")
+      .defaultValue(true)
+      .withDocumentation("Control if table with bucket index use bucket query 
or not");
+
   /**
    * Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 0829216216c..07230a9a69e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.exception.InvalidHoodieFileNameException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.HoodieStorage;
@@ -300,6 +301,17 @@ public class FSUtils {
     return matcher.group(3);
   }
 
+  public static String getFileIdFromFileName(String fileName) {
+    if (FSUtils.isLogFile(fileName)) {
+      Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
+      if (!matcher.find()) {
+        throw new InvalidHoodieFileNameException(fileName, "LogFile");
+      }
+      return matcher.group(1);
+    }
+    return FSUtils.getFileId(fileName);
+  }
+
   public static String getFileIdFromLogPath(StoragePath path) {
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.find()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/InvalidHoodieFileNameException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidHoodieFileNameException.java
new file mode 100644
index 00000000000..14c5d51dcdc
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidHoodieFileNameException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class InvalidHoodieFileNameException extends HoodieException {
+
+  public InvalidHoodieFileNameException(String fileName, String type) {
+    super("Invalid fileName " + fileName + " of type " + type);
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
new file mode 100644
index 00000000000..e1b555efb58
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.HoodieIndex.IndexType
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.keygen.KeyGenerator
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EmptyRow, 
Expression, Literal}
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
+import org.apache.spark.util.collection.BitSet
+import org.slf4j.LoggerFactory
+
+import scala.collection.{JavaConverters, mutable}
+
+class BucketIndexSupport(spark: SparkSession,
+                         metadataConfig: HoodieMetadataConfig,
+                         metaClient: HoodieTableMetaClient)
+  extends SparkBaseIndexSupport (spark, metadataConfig, metaClient){
+
+  private val log = LoggerFactory.getLogger(getClass)
+
+  private val keyGenerator =
+    HoodieSparkKeyGeneratorFactory.createKeyGenerator(metadataConfig.getProps)
+
+  private lazy val avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema(false)
+
+  override def getIndexName: String = "BUCKET"
+
+  /**
+   * Return true if table can use bucket index
+   * - has bucket hash field
+   * - table is bucket index writer
+   * - only support simple bucket engine
+   */
+  def isIndexAvailable: Boolean = {
+    indexBucketHashFieldsOpt.isDefined &&
+      metadataConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE, 
"").equalsIgnoreCase(IndexType.BUCKET.name()) &&
+      
metadataConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.SIMPLE.name())
 &&
+      metadataConfig.getBooleanOrDefault(HoodieIndexConfig.BUCKET_QUERY_INDEX)
+  }
+
+  override def invalidateCaches(): Unit = {
+    // no caches for this index type, do nothing
+  }
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean): 
Option[Set[String]] = {
+
+    val bucketIdsBitMapByFilter = 
filterQueriesWithBucketHashField(queryFilters)
+
+    if (bucketIdsBitMapByFilter.isDefined && 
bucketIdsBitMapByFilter.get.cardinality() > 0) {
+      val allFilesName = getPrunedFileNames(prunedPartitionsAndFileSlices)
+      Option.apply(getCandidateFiles(allFilesName, 
bucketIdsBitMapByFilter.get))
+    } else {
+      Option.empty
+    }
+  }
+
+  def getCandidateFiles(allFilesName: Set[String], bucketIds: BitSet): 
Set[String] = {
+    val candidateFiles: mutable.Set[String] = mutable.Set.empty
+    for (fileName <- allFilesName) {
+      val fileId = FSUtils.getFileIdFromFileName(fileName)
+      val fileBucketId = BucketIdentifier.bucketIdFromFileId(fileId)
+      if (bucketIds.get(fileBucketId)) {
+        candidateFiles += fileName
+      }
+    }
+    candidateFiles.toSet
+  }
+
+  def filterQueriesWithBucketHashField(queryFilters: Seq[Expression]): 
Option[BitSet] = {
+    val bucketNumber = 
metadataConfig.getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS)
+    if (indexBucketHashFieldsOpt.isEmpty || queryFilters.isEmpty) {
+      None
+    } else {
+      var matchedBuckets: BitSet = null
+      if (indexBucketHashFieldsOpt.get.size == 1) {
+        matchedBuckets = 
getBucketsBySingleHashFields(queryFilters.reduce(And), 
indexBucketHashFieldsOpt.get.get(0), bucketNumber)
+      } else {
+        matchedBuckets = getBucketsByMultipleHashFields(queryFilters,
+          
JavaConverters.asScalaBufferConverter(indexBucketHashFieldsOpt.get).asScala.toSet,
 bucketNumber)
+      }
+
+      val numBucketsSelected = matchedBuckets.cardinality()
+
+      // None means all the buckets need to be scanned
+      if (numBucketsSelected == bucketNumber) {
+        log.info("The query predicates do not include equality expressions for 
all the hashing fields, fall back to the other indices")
+        None
+      } else {
+        Some(matchedBuckets)
+      }
+    }
+  }
+
+  // multiple hash fields only support Equality expression by And
+  private def getBucketsByMultipleHashFields(queryFilters: Seq[Expression], 
indexBucketHashFields: Set[String], numBuckets: Int): BitSet = {
+    val hashValuePairs = queryFilters.map(expr => getEqualityFieldPair(expr, 
indexBucketHashFields)).filter(pair => pair != null)
+    val matchedBuckets = new BitSet(numBuckets)
+    if (hashValuePairs.size != indexBucketHashFields.size) {
+      matchedBuckets.setUntil(numBuckets)
+    } else {
+      val record = new GenericData.Record(avroSchema)
+      hashValuePairs.foreach(p => record.put(p.getKey, p.getValue))
+      val hoodieKey = keyGenerator.getKey(record)
+      matchedBuckets.set(BucketIdentifier.getBucketId(hoodieKey, 
indexBucketHashFieldsOpt.get, numBuckets))
+    }
+    matchedBuckets
+  }
+
+  private def getEqualityFieldPair(expr: Expression, equalityFields: 
Set[String]): Pair[String, Any] = {
+    expr match {
+      case expressions.Equality(a: Attribute, Literal(v, _)) if 
equalityFields.contains(a.name) =>
+        Pair.of(a.name, v)
+      case _ =>
+        null
+    }
+  }
+
+  private def getBucketsBySingleHashFields(expr: Expression, bucketColumnName: 
String, numBuckets: Int): BitSet = {
+
+    def getBucketNumber(attr: Attribute, v: Any): Int = {
+      val record = new GenericData.Record(avroSchema)
+      record.put(attr.name, v)
+      val hoodieKey = keyGenerator.getKey(record)
+      BucketIdentifier.getBucketId(hoodieKey, indexBucketHashFieldsOpt.get, 
numBuckets)
+    }
+
+    def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet 
= {
+      val matchedBuckets = new BitSet(numBuckets)
+      iter
+        .map(v => getBucketNumber(attr, v))
+        .foreach(bucketNum => matchedBuckets.set(bucketNum))
+      matchedBuckets
+    }
+
+    def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
+      val matchedBuckets = new BitSet(numBuckets)
+      matchedBuckets.set(getBucketNumber(attr, v))
+      matchedBuckets
+    }
+
+    expr match {
+      case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == 
bucketColumnName =>
+        getBucketSetFromValue(a, v)
+      case expressions.In(a: Attribute, list)
+        if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName 
=>
+        getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
+      case expressions.InSet(a: Attribute, hset) if a.name == bucketColumnName 
=>
+        getBucketSetFromIterable(a, hset)
+      case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
+        getBucketSetFromValue(a, null)
+      case expressions.IsNaN(a: Attribute)
+        if a.name == bucketColumnName && a.dataType == FloatType =>
+        getBucketSetFromValue(a, Float.NaN)
+      case expressions.IsNaN(a: Attribute)
+        if a.name == bucketColumnName && a.dataType == DoubleType =>
+        getBucketSetFromValue(a, Double.NaN)
+      case expressions.And(left, right) =>
+        getBucketsBySingleHashFields(left, bucketColumnName, numBuckets) &
+          getBucketsBySingleHashFields(right, bucketColumnName, numBuckets)
+      case expressions.Or(left, right) =>
+        getBucketsBySingleHashFields(left, bucketColumnName, numBuckets) |
+          getBucketsBySingleHashFields(right, bucketColumnName, numBuckets)
+      case _ =>
+        val matchedBuckets = new BitSet(numBuckets)
+        matchedBuckets.setUntil(numBuckets)
+        matchedBuckets
+    }
+  }
+
+  /**
+   * returns the configured bucket field for the table,
+   * will fall back to record key fields if the bucket fields are not set up.
+   */
+  private val indexBucketHashFieldsOpt: Option[java.util.List[String]] = {
+    val bucketHashFields = 
metadataConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD,
+      metadataConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS))
+    if (bucketHashFields == null || bucketHashFields.isEmpty) {
+      Option.apply(null)
+    } else {
+      
Option.apply(JavaConverters.seqAsJavaListConverter(bucketHashFields.split(",")).asJava)
+    }
+  }
+
+  def getKeyGenerator: KeyGenerator = {
+    keyGenerator
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index e92e6345e83..c59eb68182a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -102,6 +102,7 @@ case class HoodieFileIndex(spark: SparkSession,
    */
   @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = 
List(
     new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
+    new BucketIndexSupport(spark, metadataConfig, metaClient),
     new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
     new FunctionalIndexSupport(spark, metadataConfig, metaClient),
     new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
@@ -344,7 +345,7 @@ case class HoodieFileIndex(spark: SparkSession,
     //       and candidate files are obtained from these file slices.
 
     lazy val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
-    if (isMetadataTableEnabled && isDataSkippingEnabled) {
+    if (isDataSkippingEnabled) {
       for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
         if (indexSupport.isIndexAvailable) {
           val prunedFileNames = indexSupport.computeCandidateFileNames(this, 
queryFilters, queryReferencedColumns,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
new file mode 100644
index 00000000000..8222030f8c6
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.BucketIndexSupport
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+
+@Tag("functional")
+class TestBucketIndexSupport extends HoodieSparkClientTestBase with 
PredicateHelper {
+
+  var spark: SparkSession = _
+
+  val avroSchemaStr = "{\"namespace\": \"example.avro\", \"type\": \"record\", 
" + "\"name\": \"logicalTypes\"," +
+    "\"fields\": [" +
+    "{\"name\": \"A\", \"type\": [\"null\", \"long\"], \"default\": null}," +
+    "{\"name\": \"B\", \"type\": [\"null\", \"string\"], \"default\": null}," +
+    "{\"name\": \"C\", \"type\": [\"null\", \"long\"], \"default\": null}," +
+    "{\"name\": \"D\", \"type\": [\"null\", \"string\"], \"default\": null}" +
+    "]}"
+
+  val structSchema: StructType =
+    StructType(
+      Seq(
+        StructField("A", LongType),
+        StructField("B", StringType),
+        StructField("C", LongType),
+        StructField("D", VarcharType(32))
+      )
+    )
+
+  val avroSchema = new Schema.Parser().parse(avroSchemaStr)
+
+  @BeforeEach
+  override def setUp() {
+    initPath()
+    initSparkContexts()
+
+    setTableName("hoodie_test")
+    initMetaClient()
+
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach
+  override def tearDown() = {
+    cleanupSparkContexts()
+  }
+
+  @Test
+  def testSingleHashFieldsExpression: Unit = {
+    val bucketNumber = 19
+    val configProperties = new TypedProperties()
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, "A")
+    configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A")
+    configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
"A")
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, 
String.valueOf(bucketNumber))
+    metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(), 
avroSchemaStr)
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(ENABLE.key, true)).build()
+    val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig, 
metaClient)
+    val keyGenerator = bucketIndexSupport.getKeyGenerator
+    assert(keyGenerator.isInstanceOf[NonpartitionedKeyGenerator])
+
+    // init
+    val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties)
+    var record = new GenericData.Record(avroSchema)
+    record.put("A", "1")
+    val bucket1Id4 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "2")
+    val bucket2Id5 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "3")
+    val bucket3Id6 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "4")
+    val bucket4Id7 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "5")
+    val bucket5Id8 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+    assert(bucket1Id4 == 4 && bucket2Id5 == 5 && bucket3Id6 == 6 && bucket4Id7 
== 7 && bucket5Id8 == 8)
+
+    // fileIdStr
+    val token = FSUtils.makeWriteToken(1, 0, 1)
+    val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+
+    val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, 
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+    var equalTo = "A = 3"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket3Id6), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+    equalTo = "A = 3 And A = 4 and B = '6'"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.empty, fallback = 
false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.empty, 
allFileNames, fallback = false)
+    equalTo = "A = 5 And B = 'abc'"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName), allFileNames, fallback = false)
+    equalTo = "A = C and A = 1"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket1Id4), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket1Id4FileName), allFileNames, fallback = false)
+    equalTo = "A = 5 Or A = 2"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8, 
bucket2Id5), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames, fallback = 
false)
+    equalTo = "A = 5 Or A = 2 Or A = 8"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8, 
bucket2Id5), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames, fallback = 
false)
+    equalTo = "A = 5 Or (A = 2 and B = 'abc')"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8, 
bucket2Id5), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames, fallback = 
false)
+    equalTo = "A = 5 And (A = 2 Or B = 'abc')"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName), allFileNames, fallback = false)
+    equalTo = "A = 5 And (A = 2 Or B = 'abc')"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName), allFileNames, fallback = false)
+
+    var inExpr = "A in (3)"
+    exprBucketAnswerCheck(bucketIndexSupport, inExpr, List.apply(bucket3Id6), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, inExpr, 
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+    inExpr = "A in (3, 5)"
+    exprBucketAnswerCheck(bucketIndexSupport, inExpr, List.apply(bucket3Id6, 
bucket5Id8), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, inExpr, 
Set.apply(bucket3Id6FileName, bucket5Id8FileName), allFileNames, fallback = 
false)
+
+    var complexExpr = "A = 3 And A in (3)"
+    exprBucketAnswerCheck(bucketIndexSupport, complexExpr, 
List.apply(bucket3Id6), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, complexExpr, 
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+    complexExpr = "A = 3 Or A in (3, 5)"
+    exprBucketAnswerCheck(bucketIndexSupport, complexExpr, 
List.apply(bucket3Id6, bucket5Id8), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, complexExpr, 
Set.apply(bucket3Id6FileName, bucket5Id8FileName), allFileNames, fallback = 
false)
+    complexExpr = "A = 3 Or A in (5, 2)"
+    exprBucketAnswerCheck(bucketIndexSupport, complexExpr, 
List.apply(bucket3Id6, bucket5Id8, bucket2Id5), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, complexExpr, 
Set.apply(bucket3Id6FileName, bucket5Id8FileName, bucket2Id5FileName), 
allFileNames, fallback = false)
+    complexExpr = "A = 3 and C in (3, 5)"
+    exprBucketAnswerCheck(bucketIndexSupport, complexExpr, 
List.apply(bucket3Id6), fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, complexExpr, 
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+
+
+    // fall back other index
+    var fallBack = "B = 'abc'"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    fallBack = "A = 5 Or B = 'abc'"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    fallBack = "A = C"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    // todo optimize this scene
+    fallBack = "A = C and C = 1"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+
+    // in
+    fallBack = "C in (3)"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+
+    // complex
+    fallBack = "A = 3 Or C in (3, 5)"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+  }
+
+  @Test
+  def testMultipleHashFieldsExpress(): Unit = {
+    val bucketNumber = 19
+    val configProperties = new TypedProperties()
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, 
"A,B")
+    configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A,B")
+    configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
"A,B")
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, 
String.valueOf(bucketNumber))
+    metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(), 
avroSchemaStr)
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(ENABLE.key, true)).build()
+    val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig, 
metaClient)
+    val keyGenerator = bucketIndexSupport.getKeyGenerator
+    assert(keyGenerator.isInstanceOf[NonpartitionedKeyGenerator])
+
+    // init
+    val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties)
+    var record = new GenericData.Record(avroSchema)
+    record.put("A", "1")
+    record.put("B", "2")
+    val bucket1Id4 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "2")
+    record.put("B", "3")
+    val bucket2Id5 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "3")
+    record.put("B", "4")
+    val bucket3Id6 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "4")
+    record.put("B", "5")
+    val bucket4Id7 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "5")
+    record.put("B", "6")
+    val bucket5Id8 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    assert(bucket1Id4 == 3 && bucket2Id5 == 16 && bucket3Id6 == 10 && 
bucket4Id7 == 4 && bucket5Id8 == 17)
+
+    // fileIdStr
+    val token = FSUtils.makeWriteToken(1, 0, 1)
+    val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+    val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, 
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+    var equalTo = "A = 2 and B = '3'"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket2Id5), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket2Id5FileName), allFileNames, fallback = false)
+
+    equalTo = "A = 4 and B = '5'"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket4Id7), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket4Id7FileName), allFileNames, fallback = false)
+
+    // fall back other index
+    var fallBack = "B = '5'"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    fallBack = "A = 3"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    fallBack = "A = 3 or B = '4'"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+  }
+
+
+  def testMultipleHashFieldsExpressionWithComplexKey(): Unit = {
+    val bucketNumber = 19
+    val configProperties = new TypedProperties()
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, 
"A,B")
+    configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A,B")
+    configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
"A,B")
+    
configProperties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, 
"C")
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, 
String.valueOf(bucketNumber))
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(ENABLE.key, true)).build()
+    val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig, 
metaClient)
+    val keyGenerator = bucketIndexSupport.getKeyGenerator
+    assert(keyGenerator.isInstanceOf[ComplexKeyGenerator])
+
+    // init
+    val testKeyGenerator = new ComplexKeyGenerator(configProperties)
+    var record = new GenericData.Record(avroSchema)
+    record.put("A", "1")
+    record.put("B", "2")
+    val bucket1Id4 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "2")
+    record.put("B", "3")
+    val bucket2Id5 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "3")
+    record.put("B", "4")
+    val bucket3Id6 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "4")
+    record.put("B", "5")
+    val bucket4Id7 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "5")
+    record.put("B", "6")
+    val bucket5Id8 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B", 
bucketNumber)
+    assert(bucket1Id4 == 3 && bucket2Id5 == 16 && bucket3Id6 == 10 && 
bucket4Id7 == 4 && bucket5Id8 == 17)
+
+    // fileIdStr
+    val token = FSUtils.makeWriteToken(1, 0, 1)
+    val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + 
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+    val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, 
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+    var equalTo = "A = 2 and B = '3'"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket2Id5), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket2Id5FileName), allFileNames, fallback = false)
+
+    equalTo = "A = 4 and B = '5'"
+    exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket4Id7), 
fallback = false)
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket4Id7FileName), allFileNames, fallback = false)
+
+    // fall back other index
+    var fallBack = "B = '5'"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    fallBack = "A = 3"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+    fallBack = "A = 3 or B = '4'"
+    exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback = 
true)
+  }
+
+  def exprBucketAnswerCheck(bucketIndexSupport: BucketIndexSupport, exprRaw: 
String, expectResult: List[Int], fallback: Boolean): Unit = {
+    val resolveExpr = HoodieCatalystExpressionUtils.resolveExpr(spark, 
exprRaw, structSchema)
+    val optimizerPlan = 
spark.sessionState.optimizer.execute(DummyExpressionHolder(Seq(resolveExpr)))
+    val optimizerExpr = 
optimizerPlan.asInstanceOf[DummyExpressionHolder].exprs.head
+
+    val bucketSet = 
bucketIndexSupport.filterQueriesWithBucketHashField(splitConjunctivePredicates(optimizerExpr))
+    if (fallback) {
+      // will match all file, set is None, fallback other index
+      assert(bucketSet.isEmpty)
+    }
+    if (expectResult.nonEmpty) {
+      assert(bucketSet.isDefined)
+      // can not check size, because Or can contain other file
+      //      assert(bucketSet.get.cardinality() == expectResult.size)
+      expectResult.foreach(expectId => assert(bucketSet.get.get(expectId)))
+    } else {
+      assert(bucketSet.isEmpty || bucketSet.get.cardinality() == 0)
+    }
+  }
+
+  def exprFilePathAnswerCheck(bucketIndexSupport: BucketIndexSupport, exprRaw: 
String, expectResult: Set[String],
+                              allFileStatus: Set[String], fallback: Boolean): 
Unit = {
+    val resolveExpr = HoodieCatalystExpressionUtils.resolveExpr(spark, 
exprRaw, structSchema)
+    val optimizerPlan = 
spark.sessionState.optimizer.execute(DummyExpressionHolder(Seq(resolveExpr)))
+    val optimizerExpr = 
optimizerPlan.asInstanceOf[DummyExpressionHolder].exprs.head
+
+    val bucketSet = 
bucketIndexSupport.filterQueriesWithBucketHashField(splitConjunctivePredicates(optimizerExpr))
+    if (fallback) {
+      // will match all file, set is None, fallback other index
+      assert(bucketSet.isEmpty)
+    }
+    if (expectResult.nonEmpty) {
+      assert(bucketSet.isDefined)
+      // can not check size, because Or can contain other file
+      //      assert(bucketSet.get.cardinality() == expectResult.size)
+      val candidateFiles = bucketIndexSupport.getCandidateFiles(allFileStatus, 
bucketSet.get)
+      assert(expectResult == candidateFiles)
+    } else {
+      assert(bucketSet.isEmpty || bucketSet.get.cardinality() == 0)
+    }
+  }
+
+  @Test
+  def testBucketQueryIsAvaliable(): Unit = {
+    val configProperties = new TypedProperties()
+    configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key(), "A")
+    configProperties.setProperty(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET")
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(ENABLE.key, false)).build()
+    val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig, 
metaClient)
+
+    assert(bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS.key(), "A,B")
+    assert(bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), 
"A")
+    assert(bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.INDEX_TYPE.key(), "SIMPLE")
+    assert(!bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET")
+    assert(bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE, 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())
+    assert(!bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE, 
HoodieIndex.BucketIndexEngineType.SIMPLE.name())
+    assert(bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.BUCKET_QUERY_INDEX, "false")
+    assert(!bucketIndexSupport.isIndexAvailable)
+    metadataConfig.setValue(HoodieIndexConfig.BUCKET_QUERY_INDEX, "true")
+    assert(bucketIndexSupport.isIndexAvailable)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
index 23255b763ff..9ae8e3a2cdd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
@@ -113,4 +113,82 @@ class TestDataSkippingQuery extends HoodieSparkSqlTestBase 
{
       )
     }
   }
+
+  test("bucket index query") {
+    // table bucket prop can not be read in the query sql now, so need to set 
these configs
+    withSQLConf("hoodie.enable.data.skipping" -> "true",
+      "hoodie.bucket.index.hash.field" -> "id",
+      "hoodie.bucket.index.num.buckets" -> "20",
+      "hoodie.index.type" -> "BUCKET") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             | primaryKey = 'id,name',
+             | preCombineField = 'ts',
+             | hoodie.index.type = 'BUCKET',
+             | hoodie.bucket.index.hash.field = 'id',
+             | hoodie.bucket.index.num.buckets = '20')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1', 10, 1000, "2021-01-05"),
+             | (2, 'a2', 20, 2000, "2021-01-06"),
+             | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1 and name = 'a1'")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 2 or id = 5")(
+          Seq(2, "a2", 20.0, 2000, "2021-01-06")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
in (2, 3)")(
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
!= 4")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+        spark.sql("set hoodie.bucket.index.query.pruning = false")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1 and name = 'a1'")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 2 or id = 5")(
+          Seq(2, "a2", 20.0, 2000, "2021-01-06")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
in (2, 3)")(
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
!= 4")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+        spark.sql("set hoodie.bucket.index.query.pruning = true")
+      }
+    }
+  }
 }


Reply via email to