This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 704527d76155e42cc02ac1b0c973d3c164245b54
Author: bhat-vinay <[email protected]>
AuthorDate: Tue Apr 9 19:14:42 2024 +0530

    [HUDI-7559] [1/n] Fix RecordLevelIndexSupport::filterQueryWithRecordKey 
(#10947)
    
    RecordLevelIndexSupport::filterQueryWithRecordKey() throws a NPE if the 
EqualTo
    query predicate is not of the form `AttributeReference = Literal`. This is 
because
    RecordLevelIndexSupport:::getAttributeLiteralTuple() returns null in such 
cases which
    is then derefercend unconditionally.
    
    This bug was rendering the functional index to not be used even when the 
query predicate
    had spark functions on which functional index is built. Hence these 
column-stats based functional index
    was not pruning files.
    
    This PR makes the following minor changes.
    1. Move some methods in RecordLevelIndexSupport into an object to make it 
static (to aid in unit testing)
    2. Fix filterQueryWithRecordKey() by checking for null return values from 
the call to getAttributeLiteralTuple
    3. Add unit tests in TestRecordLevelIndexSupport.scala
    
    Co-authored-by: Vinaykumar Bhat <[email protected]>
---
 .../org/apache/hudi/RecordLevelIndexSupport.scala  | 106 +++++++++++----------
 .../apache/hudi/TestRecordLevelIndexSupport.scala  |  88 +++++++++++++++++
 2 files changed, 145 insertions(+), 49 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 3580e7ccfe8..3a0e3f78e9b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -76,46 +76,6 @@ class RecordLevelIndexSupport(spark: SparkSession,
     Option.apply(recordKeyOpt.orElse(null))
   }
 
-  /**
-   * Matches the configured simple record key with the input attribute name.
-   * @param attributeName The attribute name provided in the query
-   * @return true if input attribute name matches the configured simple record 
key
-   */
-  private def attributeMatchesRecordKey(attributeName: String): Boolean = {
-    val recordKeyOpt = getRecordKeyConfig
-    if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) {
-      true
-    } else {
-      HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == 
recordKeyOpt.get
-    }
-  }
-
-  /**
-   * Returns the attribute and literal pair given the operands of a binary 
operator. The pair is returned only if one of
-   * the operand is an attribute and other is literal. In other cases it 
returns an empty Option.
-   * @param expression1 - Left operand of the binary operator
-   * @param expression2 - Right operand of the binary operator
-   * @return Attribute and literal pair
-   */
-  private def getAttributeLiteralTuple(expression1: Expression, expression2: 
Expression): Option[(AttributeReference, Literal)] = {
-    expression1 match {
-      case attr: AttributeReference => expression2 match {
-        case literal: Literal =>
-          Option.apply(attr, literal)
-        case _ =>
-          Option.empty
-      }
-      case literal: Literal => expression2 match {
-        case attr: AttributeReference =>
-          Option.apply(attr, literal)
-        case _ =>
-          Option.empty
-      }
-      case _ => Option.empty
-    }
-
-  }
-
   /**
    * Given query filters, it filters the EqualTo and IN queries on simple 
record key columns and returns a tuple of
    * list of such queries and list of record key literals present in the query.
@@ -130,7 +90,8 @@ class RecordLevelIndexSupport(spark: SparkSession,
       var recordKeyQueries: List[Expression] = List.empty
       var recordKeys: List[String] = List.empty
       for (query <- queryFilters) {
-        filterQueryWithRecordKey(query).foreach({
+        val recordKeyOpt = getRecordKeyConfig
+        RecordLevelIndexSupport.filterQueryWithRecordKey(query, 
recordKeyOpt).foreach({
           case (exp: Expression, recKeys: List[String]) =>
             recordKeys = recordKeys ++ recKeys
             recordKeyQueries = recordKeyQueries :+ exp
@@ -141,6 +102,15 @@ class RecordLevelIndexSupport(spark: SparkSession,
     }
   }
 
+  /**
+   * Return true if metadata table is enabled and record index metadata 
partition is available.
+   */
+  def isIndexAvailable: Boolean = {
+    metadataConfig.enabled && 
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+  }
+}
+
+object RecordLevelIndexSupport {
   /**
    * If the input query is an EqualTo or IN query on simple record key 
columns, the function returns a tuple of
    * list of the query and list of record key literals present in the query 
otherwise returns an empty option.
@@ -148,20 +118,27 @@ class RecordLevelIndexSupport(spark: SparkSession,
    * @param queryFilter The query that need to be filtered.
    * @return Tuple of filtered query and list of record key literals that need 
to be matched
    */
-  private def filterQueryWithRecordKey(queryFilter: Expression): 
Option[(Expression, List[String])] = {
+  def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: 
Option[String]): Option[(Expression, List[String])] = {
     queryFilter match {
       case equalToQuery: EqualTo =>
-        val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, 
equalToQuery.right).orNull
-        if (attribute != null && attribute.name != null && 
attributeMatchesRecordKey(attribute.name)) {
-          Option.apply(equalToQuery, List.apply(literal.value.toString))
+        val attributeLiteralTuple = 
getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
+        if (attributeLiteralTuple != null) {
+          val attribute = attributeLiteralTuple._1
+          val literal = attributeLiteralTuple._2
+          if (attribute != null && attribute.name != null && 
attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
+            Option.apply(equalToQuery, List.apply(literal.value.toString))
+          } else {
+            Option.empty
+          }
         } else {
           Option.empty
         }
+
       case inQuery: In =>
         var validINQuery = true
         inQuery.value match {
           case attribute: AttributeReference =>
-            if (!attributeMatchesRecordKey(attribute.name)) {
+            if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
               validINQuery = false
             }
           case _ => validINQuery = false
@@ -181,9 +158,40 @@ class RecordLevelIndexSupport(spark: SparkSession,
   }
 
   /**
-   * Return true if metadata table is enabled and record index metadata 
partition is available.
+   * Returns the attribute and literal pair given the operands of a binary 
operator. The pair is returned only if one of
+   * the operand is an attribute and other is literal. In other cases it 
returns an empty Option.
+   * @param expression1 - Left operand of the binary operator
+   * @param expression2 - Right operand of the binary operator
+   * @return Attribute and literal pair
    */
-  def isIndexAvailable: Boolean = {
-    metadataConfig.enabled && 
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+  private def getAttributeLiteralTuple(expression1: Expression, expression2: 
Expression): Option[(AttributeReference, Literal)] = {
+    expression1 match {
+      case attr: AttributeReference => expression2 match {
+        case literal: Literal =>
+          Option.apply(attr, literal)
+        case _ =>
+          Option.empty
+      }
+      case literal: Literal => expression2 match {
+        case attr: AttributeReference =>
+          Option.apply(attr, literal)
+        case _ =>
+          Option.empty
+      }
+      case _ => Option.empty
+    }
+  }
+
+  /**
+   * Matches the configured simple record key with the input attribute name.
+   * @param attributeName The attribute name provided in the query
+   * @return true if input attribute name matches the configured simple record 
key
+   */
+  private def attributeMatchesRecordKey(attributeName: String, recordKeyOpt: 
Option[String]): Boolean = {
+    if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) {
+      true
+    } else {
+      HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == 
recordKeyOpt.get
+    }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala
new file mode 100644
index 00000000000..d52af12880f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, FromUnixTime, GreaterThan, In, Literal, Not}
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.util.TimeZone
+
+class TestRecordLevelIndexSupport {
+  @Test
+  def testFilterQueryWithRecordKey(): Unit = {
+    // Case 1: EqualTo filters not on simple AttributeReference and 
non-Literal should return empty result
+    val fmt = "yyyy-MM-dd HH:mm:ss"
+    val fromUnixTime = FromUnixTime(Literal(0L), Literal(fmt), 
Some(TimeZone.getDefault.getID))
+    var testFilter: Expression = EqualTo(fromUnixTime, Literal("2020-01-01 
00:10:20"))
+    var result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.empty)
+    assertTrue(result.isEmpty)
+
+    // Case 2: EqualTo filters not on Literal and not on simple 
AttributeReference should return empty result
+    testFilter = EqualTo(Literal("2020-01-01 00:10:20"), fromUnixTime)
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.empty)
+    assertTrue(result.isEmpty)
+
+    // Case 3: EqualTo filters on simple AttributeReference and non-Literal 
should return empty result
+    testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = 
true)(), fromUnixTime)
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.empty)
+    assertTrue(result.isEmpty)
+
+    // Case 4: EqualTo filters on simple AttributeReference and Literal which 
should return non-empty result
+    testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = 
true)(), Literal("row1"))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
+    assertTrue(result.isDefined)
+    assertEquals(result, Option.apply(testFilter, List.apply("row1")))
+
+    // case 5: EqualTo on fields other than record key should return empty 
result
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply("blah"))
+    assertTrue(result.isEmpty)
+
+    // Case 6: In filter on fields other than record key should return empty 
result
+    testFilter = In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(Literal("xyz"), Literal("abc")))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply("blah"))
+    assertTrue(result.isEmpty)
+
+    // Case 7: In filter on record key should return non-empty result
+    testFilter = In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(Literal("xyz"), Literal("abc")))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
+    assertTrue(result.isDefined)
+
+    // Case 8: In filter on simple AttributeReference(on record-key) and 
non-Literal should return empty result
+    testFilter = In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(fromUnixTime))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
+    assertTrue(result.isEmpty)
+
+    // Case 9: Anything other than EqualTo and In predicate is not supported. 
Hence it returns empty result
+    testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(Literal("xyz"), Literal("abc"))))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
+    assertTrue(result.isEmpty)
+
+    testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = 
true)(), List.apply(fromUnixTime)))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
+    assertTrue(result.isEmpty)
+
+    testFilter = GreaterThan(AttributeReference("_row_key", StringType, 
nullable = true)(), Literal("row1"))
+    result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, 
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
+    assertTrue(result.isEmpty)
+  }
+}

Reply via email to