This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 704527d76155e42cc02ac1b0c973d3c164245b54 Author: bhat-vinay <[email protected]> AuthorDate: Tue Apr 9 19:14:42 2024 +0530 [HUDI-7559] [1/n] Fix RecordLevelIndexSupport::filterQueryWithRecordKey (#10947) RecordLevelIndexSupport::filterQueryWithRecordKey() throws a NPE if the EqualTo query predicate is not of the form `AttributeReference = Literal`. This is because RecordLevelIndexSupport:::getAttributeLiteralTuple() returns null in such cases which is then derefercend unconditionally. This bug was rendering the functional index to not be used even when the query predicate had spark functions on which functional index is built. Hence these column-stats based functional index was not pruning files. This PR makes the following minor changes. 1. Move some methods in RecordLevelIndexSupport into an object to make it static (to aid in unit testing) 2. Fix filterQueryWithRecordKey() by checking for null return values from the call to getAttributeLiteralTuple 3. Add unit tests in TestRecordLevelIndexSupport.scala Co-authored-by: Vinaykumar Bhat <[email protected]> --- .../org/apache/hudi/RecordLevelIndexSupport.scala | 106 +++++++++++---------- .../apache/hudi/TestRecordLevelIndexSupport.scala | 88 +++++++++++++++++ 2 files changed, 145 insertions(+), 49 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index 3580e7ccfe8..3a0e3f78e9b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -76,46 +76,6 @@ class RecordLevelIndexSupport(spark: SparkSession, Option.apply(recordKeyOpt.orElse(null)) } - /** - * Matches the configured simple record key with the input attribute name. - * @param attributeName The attribute name provided in the query - * @return true if input attribute name matches the configured simple record key - */ - private def attributeMatchesRecordKey(attributeName: String): Boolean = { - val recordKeyOpt = getRecordKeyConfig - if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) { - true - } else { - HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get - } - } - - /** - * Returns the attribute and literal pair given the operands of a binary operator. The pair is returned only if one of - * the operand is an attribute and other is literal. In other cases it returns an empty Option. - * @param expression1 - Left operand of the binary operator - * @param expression2 - Right operand of the binary operator - * @return Attribute and literal pair - */ - private def getAttributeLiteralTuple(expression1: Expression, expression2: Expression): Option[(AttributeReference, Literal)] = { - expression1 match { - case attr: AttributeReference => expression2 match { - case literal: Literal => - Option.apply(attr, literal) - case _ => - Option.empty - } - case literal: Literal => expression2 match { - case attr: AttributeReference => - Option.apply(attr, literal) - case _ => - Option.empty - } - case _ => Option.empty - } - - } - /** * Given query filters, it filters the EqualTo and IN queries on simple record key columns and returns a tuple of * list of such queries and list of record key literals present in the query. @@ -130,7 +90,8 @@ class RecordLevelIndexSupport(spark: SparkSession, var recordKeyQueries: List[Expression] = List.empty var recordKeys: List[String] = List.empty for (query <- queryFilters) { - filterQueryWithRecordKey(query).foreach({ + val recordKeyOpt = getRecordKeyConfig + RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ case (exp: Expression, recKeys: List[String]) => recordKeys = recordKeys ++ recKeys recordKeyQueries = recordKeyQueries :+ exp @@ -141,6 +102,15 @@ class RecordLevelIndexSupport(spark: SparkSession, } } + /** + * Return true if metadata table is enabled and record index metadata partition is available. + */ + def isIndexAvailable: Boolean = { + metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + } +} + +object RecordLevelIndexSupport { /** * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. @@ -148,20 +118,27 @@ class RecordLevelIndexSupport(spark: SparkSession, * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - private def filterQueryWithRecordKey(queryFilter: Expression): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { queryFilter match { case equalToQuery: EqualTo => - val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name)) { - Option.apply(equalToQuery, List.apply(literal.value.toString)) + val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull + if (attributeLiteralTuple != null) { + val attribute = attributeLiteralTuple._1 + val literal = attributeLiteralTuple._2 + if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { + Option.apply(equalToQuery, List.apply(literal.value.toString)) + } else { + Option.empty + } } else { Option.empty } + case inQuery: In => var validINQuery = true inQuery.value match { case attribute: AttributeReference => - if (!attributeMatchesRecordKey(attribute.name)) { + if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { validINQuery = false } case _ => validINQuery = false @@ -181,9 +158,40 @@ class RecordLevelIndexSupport(spark: SparkSession, } /** - * Return true if metadata table is enabled and record index metadata partition is available. + * Returns the attribute and literal pair given the operands of a binary operator. The pair is returned only if one of + * the operand is an attribute and other is literal. In other cases it returns an empty Option. + * @param expression1 - Left operand of the binary operator + * @param expression2 - Right operand of the binary operator + * @return Attribute and literal pair */ - def isIndexAvailable: Boolean = { - metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + private def getAttributeLiteralTuple(expression1: Expression, expression2: Expression): Option[(AttributeReference, Literal)] = { + expression1 match { + case attr: AttributeReference => expression2 match { + case literal: Literal => + Option.apply(attr, literal) + case _ => + Option.empty + } + case literal: Literal => expression2 match { + case attr: AttributeReference => + Option.apply(attr, literal) + case _ => + Option.empty + } + case _ => Option.empty + } + } + + /** + * Matches the configured simple record key with the input attribute name. + * @param attributeName The attribute name provided in the query + * @return true if input attribute name matches the configured simple record key + */ + private def attributeMatchesRecordKey(attributeName: String, recordKeyOpt: Option[String]): Boolean = { + if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) { + true + } else { + HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala new file mode 100644 index 00000000000..d52af12880f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, FromUnixTime, GreaterThan, In, Literal, Not} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Test + +import java.util.TimeZone + +class TestRecordLevelIndexSupport { + @Test + def testFilterQueryWithRecordKey(): Unit = { + // Case 1: EqualTo filters not on simple AttributeReference and non-Literal should return empty result + val fmt = "yyyy-MM-dd HH:mm:ss" + val fromUnixTime = FromUnixTime(Literal(0L), Literal(fmt), Some(TimeZone.getDefault.getID)) + var testFilter: Expression = EqualTo(fromUnixTime, Literal("2020-01-01 00:10:20")) + var result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty) + assertTrue(result.isEmpty) + + // Case 2: EqualTo filters not on Literal and not on simple AttributeReference should return empty result + testFilter = EqualTo(Literal("2020-01-01 00:10:20"), fromUnixTime) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty) + assertTrue(result.isEmpty) + + // Case 3: EqualTo filters on simple AttributeReference and non-Literal should return empty result + testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = true)(), fromUnixTime) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty) + assertTrue(result.isEmpty) + + // Case 4: EqualTo filters on simple AttributeReference and Literal which should return non-empty result + testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = true)(), Literal("row1")) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isDefined) + assertEquals(result, Option.apply(testFilter, List.apply("row1"))) + + // case 5: EqualTo on fields other than record key should return empty result + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply("blah")) + assertTrue(result.isEmpty) + + // Case 6: In filter on fields other than record key should return empty result + testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc"))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply("blah")) + assertTrue(result.isEmpty) + + // Case 7: In filter on record key should return non-empty result + testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc"))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isDefined) + + // Case 8: In filter on simple AttributeReference(on record-key) and non-Literal should return empty result + testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(fromUnixTime)) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + + // Case 9: Anything other than EqualTo and In predicate is not supported. Hence it returns empty result + testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc")))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + + testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(fromUnixTime))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + + testFilter = GreaterThan(AttributeReference("_row_key", StringType, nullable = true)(), Literal("row1")) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + } +}
