This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 74308cdbd24 [HUDI-6207] Spark bucket index query pruning (#10191)
74308cdbd24 is described below
commit 74308cdbd24bcf6c4836249b6abdf4d75ae492b8
Author: KnightChess <[email protected]>
AuthorDate: Mon May 20 16:01:14 2024 +0800
[HUDI-6207] Spark bucket index query pruning (#10191)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 5 +
.../java/org/apache/hudi/common/fs/FSUtils.java | 12 +
.../exception/InvalidHoodieFileNameException.java | 27 ++
.../scala/org/apache/hudi/BucketIndexSupport.scala | 219 +++++++++++
.../scala/org/apache/hudi/HoodieFileIndex.scala | 3 +-
.../hudi/functional/TestBucketIndexSupport.scala | 399 +++++++++++++++++++++
.../spark/sql/hudi/dml/TestDataSkippingQuery.scala | 78 ++++
7 files changed, 742 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index ffe902f7d4e..c80c5a2de8a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -334,6 +334,11 @@ public class HoodieIndexConfig extends HoodieConfig {
.withDocumentation("Only applies when #recordIndexUseCaching is set.
Determine what level of persistence is used to cache input RDDs. "
+ "Refer to org.apache.spark.storage.StorageLevel for different
values");
+ public static final ConfigProperty<Boolean> BUCKET_QUERY_INDEX =
ConfigProperty
+ .key("hoodie.bucket.index.query.pruning")
+ .defaultValue(true)
+ .withDocumentation("Control if table with bucket index use bucket query
or not");
+
/**
* Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 0829216216c..07230a9a69e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.exception.InvalidHoodieFileNameException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
@@ -300,6 +301,17 @@ public class FSUtils {
return matcher.group(3);
}
+ public static String getFileIdFromFileName(String fileName) {
+ if (FSUtils.isLogFile(fileName)) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
+ if (!matcher.find()) {
+ throw new InvalidHoodieFileNameException(fileName, "LogFile");
+ }
+ return matcher.group(1);
+ }
+ return FSUtils.getFileId(fileName);
+ }
+
public static String getFileIdFromLogPath(StoragePath path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.find()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/InvalidHoodieFileNameException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidHoodieFileNameException.java
new file mode 100644
index 00000000000..14c5d51dcdc
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidHoodieFileNameException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class InvalidHoodieFileNameException extends HoodieException {
+
+ public InvalidHoodieFileNameException(String fileName, String type) {
+ super("Invalid fileName " + fileName + " of type " + type);
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
new file mode 100644
index 00000000000..e1b555efb58
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.HoodieIndex.IndexType
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.keygen.KeyGenerator
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EmptyRow,
Expression, Literal}
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
+import org.apache.spark.util.collection.BitSet
+import org.slf4j.LoggerFactory
+
+import scala.collection.{JavaConverters, mutable}
+
+class BucketIndexSupport(spark: SparkSession,
+ metadataConfig: HoodieMetadataConfig,
+ metaClient: HoodieTableMetaClient)
+ extends SparkBaseIndexSupport (spark, metadataConfig, metaClient){
+
+ private val log = LoggerFactory.getLogger(getClass)
+
+ private val keyGenerator =
+ HoodieSparkKeyGeneratorFactory.createKeyGenerator(metadataConfig.getProps)
+
+ private lazy val avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema(false)
+
+ override def getIndexName: String = "BUCKET"
+
+ /**
+ * Return true if table can use bucket index
+ * - has bucket hash field
+ * - table is bucket index writer
+ * - only support simple bucket engine
+ */
+ def isIndexAvailable: Boolean = {
+ indexBucketHashFieldsOpt.isDefined &&
+ metadataConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE,
"").equalsIgnoreCase(IndexType.BUCKET.name()) &&
+
metadataConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.SIMPLE.name())
&&
+ metadataConfig.getBooleanOrDefault(HoodieIndexConfig.BUCKET_QUERY_INDEX)
+ }
+
+ override def invalidateCaches(): Unit = {
+ // no caches for this index type, do nothing
+ }
+
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean):
Option[Set[String]] = {
+
+ val bucketIdsBitMapByFilter =
filterQueriesWithBucketHashField(queryFilters)
+
+ if (bucketIdsBitMapByFilter.isDefined &&
bucketIdsBitMapByFilter.get.cardinality() > 0) {
+ val allFilesName = getPrunedFileNames(prunedPartitionsAndFileSlices)
+ Option.apply(getCandidateFiles(allFilesName,
bucketIdsBitMapByFilter.get))
+ } else {
+ Option.empty
+ }
+ }
+
+ def getCandidateFiles(allFilesName: Set[String], bucketIds: BitSet):
Set[String] = {
+ val candidateFiles: mutable.Set[String] = mutable.Set.empty
+ for (fileName <- allFilesName) {
+ val fileId = FSUtils.getFileIdFromFileName(fileName)
+ val fileBucketId = BucketIdentifier.bucketIdFromFileId(fileId)
+ if (bucketIds.get(fileBucketId)) {
+ candidateFiles += fileName
+ }
+ }
+ candidateFiles.toSet
+ }
+
+ def filterQueriesWithBucketHashField(queryFilters: Seq[Expression]):
Option[BitSet] = {
+ val bucketNumber =
metadataConfig.getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS)
+ if (indexBucketHashFieldsOpt.isEmpty || queryFilters.isEmpty) {
+ None
+ } else {
+ var matchedBuckets: BitSet = null
+ if (indexBucketHashFieldsOpt.get.size == 1) {
+ matchedBuckets =
getBucketsBySingleHashFields(queryFilters.reduce(And),
indexBucketHashFieldsOpt.get.get(0), bucketNumber)
+ } else {
+ matchedBuckets = getBucketsByMultipleHashFields(queryFilters,
+
JavaConverters.asScalaBufferConverter(indexBucketHashFieldsOpt.get).asScala.toSet,
bucketNumber)
+ }
+
+ val numBucketsSelected = matchedBuckets.cardinality()
+
+ // None means all the buckets need to be scanned
+ if (numBucketsSelected == bucketNumber) {
+ log.info("The query predicates do not include equality expressions for
all the hashing fields, fall back to the other indices")
+ None
+ } else {
+ Some(matchedBuckets)
+ }
+ }
+ }
+
+ // multiple hash fields only support Equality expression by And
+ private def getBucketsByMultipleHashFields(queryFilters: Seq[Expression],
indexBucketHashFields: Set[String], numBuckets: Int): BitSet = {
+ val hashValuePairs = queryFilters.map(expr => getEqualityFieldPair(expr,
indexBucketHashFields)).filter(pair => pair != null)
+ val matchedBuckets = new BitSet(numBuckets)
+ if (hashValuePairs.size != indexBucketHashFields.size) {
+ matchedBuckets.setUntil(numBuckets)
+ } else {
+ val record = new GenericData.Record(avroSchema)
+ hashValuePairs.foreach(p => record.put(p.getKey, p.getValue))
+ val hoodieKey = keyGenerator.getKey(record)
+ matchedBuckets.set(BucketIdentifier.getBucketId(hoodieKey,
indexBucketHashFieldsOpt.get, numBuckets))
+ }
+ matchedBuckets
+ }
+
+ private def getEqualityFieldPair(expr: Expression, equalityFields:
Set[String]): Pair[String, Any] = {
+ expr match {
+ case expressions.Equality(a: Attribute, Literal(v, _)) if
equalityFields.contains(a.name) =>
+ Pair.of(a.name, v)
+ case _ =>
+ null
+ }
+ }
+
+ private def getBucketsBySingleHashFields(expr: Expression, bucketColumnName:
String, numBuckets: Int): BitSet = {
+
+ def getBucketNumber(attr: Attribute, v: Any): Int = {
+ val record = new GenericData.Record(avroSchema)
+ record.put(attr.name, v)
+ val hoodieKey = keyGenerator.getKey(record)
+ BucketIdentifier.getBucketId(hoodieKey, indexBucketHashFieldsOpt.get,
numBuckets)
+ }
+
+ def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet
= {
+ val matchedBuckets = new BitSet(numBuckets)
+ iter
+ .map(v => getBucketNumber(attr, v))
+ .foreach(bucketNum => matchedBuckets.set(bucketNum))
+ matchedBuckets
+ }
+
+ def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
+ val matchedBuckets = new BitSet(numBuckets)
+ matchedBuckets.set(getBucketNumber(attr, v))
+ matchedBuckets
+ }
+
+ expr match {
+ case expressions.Equality(a: Attribute, Literal(v, _)) if a.name ==
bucketColumnName =>
+ getBucketSetFromValue(a, v)
+ case expressions.In(a: Attribute, list)
+ if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName
=>
+ getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
+ case expressions.InSet(a: Attribute, hset) if a.name == bucketColumnName
=>
+ getBucketSetFromIterable(a, hset)
+ case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
+ getBucketSetFromValue(a, null)
+ case expressions.IsNaN(a: Attribute)
+ if a.name == bucketColumnName && a.dataType == FloatType =>
+ getBucketSetFromValue(a, Float.NaN)
+ case expressions.IsNaN(a: Attribute)
+ if a.name == bucketColumnName && a.dataType == DoubleType =>
+ getBucketSetFromValue(a, Double.NaN)
+ case expressions.And(left, right) =>
+ getBucketsBySingleHashFields(left, bucketColumnName, numBuckets) &
+ getBucketsBySingleHashFields(right, bucketColumnName, numBuckets)
+ case expressions.Or(left, right) =>
+ getBucketsBySingleHashFields(left, bucketColumnName, numBuckets) |
+ getBucketsBySingleHashFields(right, bucketColumnName, numBuckets)
+ case _ =>
+ val matchedBuckets = new BitSet(numBuckets)
+ matchedBuckets.setUntil(numBuckets)
+ matchedBuckets
+ }
+ }
+
+ /**
+ * returns the configured bucket field for the table,
+ * will fall back to record key fields if the bucket fields are not set up.
+ */
+ private val indexBucketHashFieldsOpt: Option[java.util.List[String]] = {
+ val bucketHashFields =
metadataConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD,
+ metadataConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS))
+ if (bucketHashFields == null || bucketHashFields.isEmpty) {
+ Option.apply(null)
+ } else {
+
Option.apply(JavaConverters.seqAsJavaListConverter(bucketHashFields.split(",")).asJava)
+ }
+ }
+
+ def getKeyGenerator: KeyGenerator = {
+ keyGenerator
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index e92e6345e83..c59eb68182a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -102,6 +102,7 @@ case class HoodieFileIndex(spark: SparkSession,
*/
@transient private lazy val indicesSupport: List[SparkBaseIndexSupport] =
List(
new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
+ new BucketIndexSupport(spark, metadataConfig, metaClient),
new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
new FunctionalIndexSupport(spark, metadataConfig, metaClient),
new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
@@ -344,7 +345,7 @@ case class HoodieFileIndex(spark: SparkSession,
// and candidate files are obtained from these file slices.
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
- if (isMetadataTableEnabled && isDataSkippingEnabled) {
+ if (isDataSkippingEnabled) {
for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
if (indexSupport.isIndexAvailable) {
val prunedFileNames = indexSupport.computeCandidateFileNames(this,
queryFilters, queryReferencedColumns,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
new file mode 100644
index 00000000000..8222030f8c6
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.BucketIndexSupport
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+
+@Tag("functional")
+class TestBucketIndexSupport extends HoodieSparkClientTestBase with
PredicateHelper {
+
+ var spark: SparkSession = _
+
+ val avroSchemaStr = "{\"namespace\": \"example.avro\", \"type\": \"record\",
" + "\"name\": \"logicalTypes\"," +
+ "\"fields\": [" +
+ "{\"name\": \"A\", \"type\": [\"null\", \"long\"], \"default\": null}," +
+ "{\"name\": \"B\", \"type\": [\"null\", \"string\"], \"default\": null}," +
+ "{\"name\": \"C\", \"type\": [\"null\", \"long\"], \"default\": null}," +
+ "{\"name\": \"D\", \"type\": [\"null\", \"string\"], \"default\": null}" +
+ "]}"
+
+ val structSchema: StructType =
+ StructType(
+ Seq(
+ StructField("A", LongType),
+ StructField("B", StringType),
+ StructField("C", LongType),
+ StructField("D", VarcharType(32))
+ )
+ )
+
+ val avroSchema = new Schema.Parser().parse(avroSchemaStr)
+
+ @BeforeEach
+ override def setUp() {
+ initPath()
+ initSparkContexts()
+
+ setTableName("hoodie_test")
+ initMetaClient()
+
+ spark = sqlContext.sparkSession
+ }
+
+ @AfterEach
+ override def tearDown() = {
+ cleanupSparkContexts()
+ }
+
+ @Test
+ def testSingleHashFieldsExpression: Unit = {
+ val bucketNumber = 19
+ val configProperties = new TypedProperties()
+
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, "A")
+ configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A")
+ configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key,
"A")
+
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key,
String.valueOf(bucketNumber))
+ metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(),
avroSchemaStr)
+ val metadataConfig = HoodieMetadataConfig.newBuilder
+ .fromProperties(configProperties)
+ .enable(configProperties.getBoolean(ENABLE.key, true)).build()
+ val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig,
metaClient)
+ val keyGenerator = bucketIndexSupport.getKeyGenerator
+ assert(keyGenerator.isInstanceOf[NonpartitionedKeyGenerator])
+
+ // init
+ val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties)
+ var record = new GenericData.Record(avroSchema)
+ record.put("A", "1")
+ val bucket1Id4 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "2")
+ val bucket2Id5 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "3")
+ val bucket3Id6 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "4")
+ val bucket4Id7 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "5")
+ val bucket5Id8 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A", bucketNumber)
+ assert(bucket1Id4 == 4 && bucket2Id5 == 5 && bucket3Id6 == 6 && bucket4Id7
== 7 && bucket5Id8 == 8)
+
+ // fileIdStr
+ val token = FSUtils.makeWriteToken(1, 0, 1)
+ val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+
+ val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName,
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+ var equalTo = "A = 3"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket3Id6),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+ equalTo = "A = 3 And A = 4 and B = '6'"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.empty, fallback =
false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.empty,
allFileNames, fallback = false)
+ equalTo = "A = 5 And B = 'abc'"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket5Id8FileName), allFileNames, fallback = false)
+ equalTo = "A = C and A = 1"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket1Id4),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket1Id4FileName), allFileNames, fallback = false)
+ equalTo = "A = 5 Or A = 2"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8,
bucket2Id5), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames, fallback =
false)
+ equalTo = "A = 5 Or A = 2 Or A = 8"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8,
bucket2Id5), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames, fallback =
false)
+ equalTo = "A = 5 Or (A = 2 and B = 'abc')"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8,
bucket2Id5), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames, fallback =
false)
+ equalTo = "A = 5 And (A = 2 Or B = 'abc')"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket5Id8FileName), allFileNames, fallback = false)
+ equalTo = "A = 5 And (A = 2 Or B = 'abc')"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket5Id8),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket5Id8FileName), allFileNames, fallback = false)
+
+ var inExpr = "A in (3)"
+ exprBucketAnswerCheck(bucketIndexSupport, inExpr, List.apply(bucket3Id6),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, inExpr,
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+ inExpr = "A in (3, 5)"
+ exprBucketAnswerCheck(bucketIndexSupport, inExpr, List.apply(bucket3Id6,
bucket5Id8), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, inExpr,
Set.apply(bucket3Id6FileName, bucket5Id8FileName), allFileNames, fallback =
false)
+
+ var complexExpr = "A = 3 And A in (3)"
+ exprBucketAnswerCheck(bucketIndexSupport, complexExpr,
List.apply(bucket3Id6), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, complexExpr,
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+ complexExpr = "A = 3 Or A in (3, 5)"
+ exprBucketAnswerCheck(bucketIndexSupport, complexExpr,
List.apply(bucket3Id6, bucket5Id8), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, complexExpr,
Set.apply(bucket3Id6FileName, bucket5Id8FileName), allFileNames, fallback =
false)
+ complexExpr = "A = 3 Or A in (5, 2)"
+ exprBucketAnswerCheck(bucketIndexSupport, complexExpr,
List.apply(bucket3Id6, bucket5Id8, bucket2Id5), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, complexExpr,
Set.apply(bucket3Id6FileName, bucket5Id8FileName, bucket2Id5FileName),
allFileNames, fallback = false)
+ complexExpr = "A = 3 and C in (3, 5)"
+ exprBucketAnswerCheck(bucketIndexSupport, complexExpr,
List.apply(bucket3Id6), fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, complexExpr,
Set.apply(bucket3Id6FileName), allFileNames, fallback = false)
+
+
+ // fall back other index
+ var fallBack = "B = 'abc'"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ fallBack = "A = 5 Or B = 'abc'"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ fallBack = "A = C"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ // todo optimize this scene
+ fallBack = "A = C and C = 1"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+
+ // in
+ fallBack = "C in (3)"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+
+ // complex
+ fallBack = "A = 3 Or C in (3, 5)"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ }
+
+ @Test
+ def testMultipleHashFieldsExpress(): Unit = {
+ val bucketNumber = 19
+ val configProperties = new TypedProperties()
+
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key,
"A,B")
+ configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A,B")
+ configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key,
"A,B")
+
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key,
String.valueOf(bucketNumber))
+ metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(),
avroSchemaStr)
+ val metadataConfig = HoodieMetadataConfig.newBuilder
+ .fromProperties(configProperties)
+ .enable(configProperties.getBoolean(ENABLE.key, true)).build()
+ val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig,
metaClient)
+ val keyGenerator = bucketIndexSupport.getKeyGenerator
+ assert(keyGenerator.isInstanceOf[NonpartitionedKeyGenerator])
+
+ // init
+ val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties)
+ var record = new GenericData.Record(avroSchema)
+ record.put("A", "1")
+ record.put("B", "2")
+ val bucket1Id4 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "2")
+ record.put("B", "3")
+ val bucket2Id5 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "3")
+ record.put("B", "4")
+ val bucket3Id6 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "4")
+ record.put("B", "5")
+ val bucket4Id7 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "5")
+ record.put("B", "6")
+ val bucket5Id8 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ assert(bucket1Id4 == 3 && bucket2Id5 == 16 && bucket3Id6 == 10 &&
bucket4Id7 == 4 && bucket5Id8 == 17)
+
+ // fileIdStr
+ val token = FSUtils.makeWriteToken(1, 0, 1)
+ val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+ val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName,
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+ var equalTo = "A = 2 and B = '3'"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket2Id5),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket2Id5FileName), allFileNames, fallback = false)
+
+ equalTo = "A = 4 and B = '5'"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket4Id7),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket4Id7FileName), allFileNames, fallback = false)
+
+ // fall back other index
+ var fallBack = "B = '5'"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ fallBack = "A = 3"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ fallBack = "A = 3 or B = '4'"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ }
+
+
+ def testMultipleHashFieldsExpressionWithComplexKey(): Unit = {
+ val bucketNumber = 19
+ val configProperties = new TypedProperties()
+
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key,
"A,B")
+ configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A,B")
+ configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key,
"A,B")
+
configProperties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key,
"C")
+
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key,
String.valueOf(bucketNumber))
+ val metadataConfig = HoodieMetadataConfig.newBuilder
+ .fromProperties(configProperties)
+ .enable(configProperties.getBoolean(ENABLE.key, true)).build()
+ val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig,
metaClient)
+ val keyGenerator = bucketIndexSupport.getKeyGenerator
+ assert(keyGenerator.isInstanceOf[ComplexKeyGenerator])
+
+ // init
+ val testKeyGenerator = new ComplexKeyGenerator(configProperties)
+ var record = new GenericData.Record(avroSchema)
+ record.put("A", "1")
+ record.put("B", "2")
+ val bucket1Id4 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "2")
+ record.put("B", "3")
+ val bucket2Id5 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "3")
+ record.put("B", "4")
+ val bucket3Id6 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "4")
+ record.put("B", "5")
+ val bucket4Id7 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ record = new GenericData.Record(avroSchema)
+ record.put("A", "5")
+ record.put("B", "6")
+ val bucket5Id8 =
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record), "A,B",
bucketNumber)
+ assert(bucket1Id4 == 3 && bucket2Id5 == 16 && bucket3Id6 == 10 &&
bucket4Id7 == 4 && bucket5Id8 == 17)
+
+ // fileIdStr
+ val token = FSUtils.makeWriteToken(1, 0, 1)
+ val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+ val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000",
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) +
"-0",HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+ val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName,
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+ var equalTo = "A = 2 and B = '3'"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket2Id5),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket2Id5FileName), allFileNames, fallback = false)
+
+ equalTo = "A = 4 and B = '5'"
+ exprBucketAnswerCheck(bucketIndexSupport, equalTo, List.apply(bucket4Id7),
fallback = false)
+ exprFilePathAnswerCheck(bucketIndexSupport, equalTo,
Set.apply(bucket4Id7FileName), allFileNames, fallback = false)
+
+ // fall back other index
+ var fallBack = "B = '5'"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ fallBack = "A = 3"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ fallBack = "A = 3 or B = '4'"
+ exprBucketAnswerCheck(bucketIndexSupport, fallBack, List.empty, fallback =
true)
+ }
+
+ def exprBucketAnswerCheck(bucketIndexSupport: BucketIndexSupport, exprRaw:
String, expectResult: List[Int], fallback: Boolean): Unit = {
+ val resolveExpr = HoodieCatalystExpressionUtils.resolveExpr(spark,
exprRaw, structSchema)
+ val optimizerPlan =
spark.sessionState.optimizer.execute(DummyExpressionHolder(Seq(resolveExpr)))
+ val optimizerExpr =
optimizerPlan.asInstanceOf[DummyExpressionHolder].exprs.head
+
+ val bucketSet =
bucketIndexSupport.filterQueriesWithBucketHashField(splitConjunctivePredicates(optimizerExpr))
+ if (fallback) {
+ // will match all file, set is None, fallback other index
+ assert(bucketSet.isEmpty)
+ }
+ if (expectResult.nonEmpty) {
+ assert(bucketSet.isDefined)
+ // can not check size, because Or can contain other file
+ // assert(bucketSet.get.cardinality() == expectResult.size)
+ expectResult.foreach(expectId => assert(bucketSet.get.get(expectId)))
+ } else {
+ assert(bucketSet.isEmpty || bucketSet.get.cardinality() == 0)
+ }
+ }
+
+ def exprFilePathAnswerCheck(bucketIndexSupport: BucketIndexSupport, exprRaw:
String, expectResult: Set[String],
+ allFileStatus: Set[String], fallback: Boolean):
Unit = {
+ val resolveExpr = HoodieCatalystExpressionUtils.resolveExpr(spark,
exprRaw, structSchema)
+ val optimizerPlan =
spark.sessionState.optimizer.execute(DummyExpressionHolder(Seq(resolveExpr)))
+ val optimizerExpr =
optimizerPlan.asInstanceOf[DummyExpressionHolder].exprs.head
+
+ val bucketSet =
bucketIndexSupport.filterQueriesWithBucketHashField(splitConjunctivePredicates(optimizerExpr))
+ if (fallback) {
+ // will match all file, set is None, fallback other index
+ assert(bucketSet.isEmpty)
+ }
+ if (expectResult.nonEmpty) {
+ assert(bucketSet.isDefined)
+ // can not check size, because Or can contain other file
+ // assert(bucketSet.get.cardinality() == expectResult.size)
+ val candidateFiles = bucketIndexSupport.getCandidateFiles(allFileStatus,
bucketSet.get)
+ assert(expectResult == candidateFiles)
+ } else {
+ assert(bucketSet.isEmpty || bucketSet.get.cardinality() == 0)
+ }
+ }
+
+ @Test
+ def testBucketQueryIsAvaliable(): Unit = {
+ val configProperties = new TypedProperties()
+ configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key(), "A")
+ configProperties.setProperty(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET")
+ val metadataConfig = HoodieMetadataConfig.newBuilder
+ .fromProperties(configProperties)
+ .enable(configProperties.getBoolean(ENABLE.key, false)).build()
+ val bucketIndexSupport = new BucketIndexSupport(spark, metadataConfig,
metaClient)
+
+ assert(bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS.key(), "A,B")
+ assert(bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(),
"A")
+ assert(bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.INDEX_TYPE.key(), "SIMPLE")
+ assert(!bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET")
+ assert(bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE,
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())
+ assert(!bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE,
HoodieIndex.BucketIndexEngineType.SIMPLE.name())
+ assert(bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.BUCKET_QUERY_INDEX, "false")
+ assert(!bucketIndexSupport.isIndexAvailable)
+ metadataConfig.setValue(HoodieIndexConfig.BUCKET_QUERY_INDEX, "true")
+ assert(bucketIndexSupport.isIndexAvailable)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
index 23255b763ff..9ae8e3a2cdd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
@@ -113,4 +113,82 @@ class TestDataSkippingQuery extends HoodieSparkSqlTestBase
{
)
}
}
+
+ test("bucket index query") {
+ // table bucket prop can not be read in the query sql now, so need to set
these configs
+ withSQLConf("hoodie.enable.data.skipping" -> "true",
+ "hoodie.bucket.index.hash.field" -> "id",
+ "hoodie.bucket.index.num.buckets" -> "20",
+ "hoodie.index.type" -> "BUCKET") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.bucket.index.hash.field = 'id',
+ | hoodie.bucket.index.num.buckets = '20')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1 and name = 'a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 2 or id = 5")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
in (2, 3)")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
!= 4")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ spark.sql("set hoodie.bucket.index.query.pruning = false")
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1 and name = 'a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 2 or id = 5")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
in (2, 3)")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where id
!= 4")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ spark.sql("set hoodie.bucket.index.query.pruning = true")
+ }
+ }
+ }
}