This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 744a73d [SPARK-34538][SQL] Hive Metastore support filter by not-in
744a73d is described below
commit 744a73df9eddffaaec1f3f1b6f4f3bf5ab19c4ec
Author: ulysses-you <[email protected]>
AuthorDate: Thu Mar 11 15:19:47 2021 +0000
[SPARK-34538][SQL] Hive Metastore support filter by not-in
### What changes were proposed in this pull request?
Add `Not(In)` and `Not(InSet)` pattern when convert filter to metastore.
### Why are the changes needed?
`NOT IN` is a useful condition to prune partition, it would be better to
support it.
Technically, we can convert `c not in(x,y)` to `c != x and c != y`, then
push it to metastore.
Avoid metastore overflow and respect the config
`spark.sql.hive.metastorePartitionPruningInSetThreshold`, `Not(InSet)` won't
push to metastore if it's value exceeds the threshold.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes #31646 from ulysses-you/SPARK-34538.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 4 +-
.../apache/spark/sql/hive/client/HiveShim.scala | 27 +++++++++
.../spark/sql/hive/client/FiltersSuite.scala | 49 +++++++++++++++
.../hive/client/HivePartitionFilteringSuite.scala | 70 ++++++++++++++++++++++
4 files changed, 149 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e225b3a..610f436 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -863,7 +863,9 @@ object SQLConf {
.doc("The threshold of set size for InSet predicate when pruning
partitions through Hive " +
"Metastore. When the set size exceeds the threshold, we rewrite the
InSet predicate " +
"to be greater than or equal to the minimum value in set and less than
or equal to the " +
- "maximum value in set. Larger values may cause Hive Metastore stack
overflow.")
+ "maximum value in set. Larger values may cause Hive Metastore stack
overflow. But for " +
+ "InSet inside Not with values exceeding the threshold, we won't push
it to Hive Metastore."
+ )
.version("3.1.0")
.internal()
.intConf
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index db67480..2f7fe96 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
values.map(value => s"$name = $value").mkString("(", " or ", ")")
}
+ def convertNotInToAnd(name: String, values: Seq[String]): String = {
+ values.map(value => s"$name != $value").mkString("(", " and ", ")")
+ }
+
+ def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
+ case Literal(null, _) => true
+ case _ => false
+ }
+
val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold
@@ -763,10 +772,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}
def convert(expr: Expression): Option[String] = expr match {
+ case Not(InSet(_, values)) if values.size > inSetThreshold =>
+ None
+
+ case Not(In(_, list)) if hasNullLiteral(list) => None
+ case Not(InSet(_, list)) if list.contains(null) => None
+
case In(ExtractAttribute(SupportedAttribute(name)),
ExtractableLiterals(values))
if useAdvanced =>
Some(convertInToOr(name, values))
+ case Not(In(ExtractAttribute(SupportedAttribute(name)),
ExtractableLiterals(values)))
+ if useAdvanced =>
+ Some(convertNotInToAnd(name, values))
+
case InSet(child, values) if useAdvanced && values.size > inSetThreshold
=>
val dataType = child.dataType
// Skip null here is safe, more details could see at
ExtractableLiterals.
@@ -779,10 +798,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
if useAdvanced && child.dataType == DateType =>
Some(convertInToOr(name, values))
+ case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
+ ExtractableDateValues(values))) if useAdvanced && child.dataType ==
DateType =>
+ Some(convertNotInToAnd(name, values))
+
case InSet(ExtractAttribute(SupportedAttribute(name)),
ExtractableValues(values))
if useAdvanced =>
Some(convertInToOr(name, values))
+ case Not(InSet(ExtractAttribute(SupportedAttribute(name)),
ExtractableValues(values)))
+ if useAdvanced =>
+ Some(convertNotInToAnd(name, values))
+
case op @ SpecialBinaryComparison(
ExtractAttribute(SupportedAttribute(name)),
ExtractableLiteral(value)) =>
Some(s"$name ${op.symbol} $value")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
index 79b34bd..fcdc973 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
@@ -108,6 +108,47 @@ class FiltersSuite extends SparkFunSuite with Logging with
PlanTest {
(a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil,
"datecol != 2019-01-01")
+ filterTest("not-in, string filter",
+ (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil,
+ """(strcol != "a" and strcol != "b")""")
+
+ filterTest("not-in, string filter with null",
+ (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"),
Literal(null))))) :: Nil,
+ "")
+
+ filterTest("not-in, date filter",
+ (Not(In(a("datecol", DateType),
+ Seq(Literal(Date.valueOf("2021-01-01")),
Literal(Date.valueOf("2021-01-02")))))) :: Nil,
+ """(datecol != 2021-01-01 and datecol != 2021-01-02)""")
+
+ filterTest("not-in, date filter with null",
+ (Not(In(a("datecol", DateType),
+ Seq(Literal(Date.valueOf("2021-01-01")),
Literal(Date.valueOf("2021-01-02")),
+ Literal(null))))) :: Nil,
+ "")
+
+ filterTest("not-inset, string filter",
+ (Not(InSet(a("strcol", StringType), Set(Literal("a").eval(),
Literal("b").eval())))) :: Nil,
+ """(strcol != "a" and strcol != "b")""")
+
+ filterTest("not-inset, string filter with null",
+ (Not(InSet(a("strcol", StringType),
+ Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval()))))
:: Nil,
+ "")
+
+ filterTest("not-inset, date filter",
+ (Not(InSet(a("datecol", DateType),
+ Set(Literal(Date.valueOf("2020-01-01")).eval(),
+ Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil,
+ """(datecol != 2020-01-01 and datecol != 2020-01-02)""")
+
+ filterTest("not-inset, date filter with null",
+ (Not(InSet(a("datecol", DateType),
+ Set(Literal(Date.valueOf("2020-01-01")).eval(),
+ Literal(Date.valueOf("2020-01-02")).eval(),
+ Literal(null).eval())))) :: Nil,
+ "")
+
// Applying the predicate `x IN (NULL)` should return an empty set, but
since this optimization
// will be applied by Catalyst, this filter converter does not need to
account for this.
filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE",
@@ -187,6 +228,14 @@ class FiltersSuite extends SparkFunSuite with Logging with
PlanTest {
}
}
+ test("Don't push not inset if it's values exceeds the threshold") {
+ withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key
-> "2") {
+ val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3)))
+ val converted = shim.convertFilters(testTable, Seq(filter),
conf.sessionLocalTimeZone)
+ assert(converted.isEmpty)
+ }
+ }
+
test("SPARK-34538: Skip InSet null value during push filter to Hive
metastore") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key
-> "3") {
val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2))
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
index ebab105..16e1a41 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
@@ -418,6 +418,76 @@ class HivePartitionFilteringSuite(version: String)
dateStrValue)
}
+ test("getPartitionsByFilter: not in/inset string type") {
+ def check(condition: Expression, result: Seq[String]): Unit = {
+ testMetastorePartitionFiltering(
+ condition,
+ dsValue,
+ hValue,
+ result,
+ dateValue,
+ dateStrValue
+ )
+ }
+
+ check(
+ Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))),
+ Seq("ba", "bb")
+ )
+ check(
+ Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab"), Literal(null)))),
+ chunkValue
+ )
+
+ check(
+ Not(InSet(attr("chunk"), Set(Literal("aa").eval(),
Literal("ab").eval()))),
+ Seq("ba", "bb")
+ )
+ check(
+ Not(InSet(attr("chunk"), Set("aa", "ab", null))),
+ chunkValue
+ )
+ }
+
+ test("getPartitionsByFilter: not in/inset date type") {
+ def check(condition: Expression, result: Seq[String]): Unit = {
+ testMetastorePartitionFiltering(
+ condition,
+ dsValue,
+ hValue,
+ chunkValue,
+ result,
+ dateStrValue
+ )
+ }
+
+ check(
+ Not(In(attr("d"),
+ Seq(Literal(Date.valueOf("2019-01-01")),
+ Literal(Date.valueOf("2019-01-02"))))),
+ Seq("2019-01-03")
+ )
+ check(
+ Not(In(attr("d"),
+ Seq(Literal(Date.valueOf("2019-01-01")),
+ Literal(Date.valueOf("2019-01-02")), Literal(null)))),
+ dateValue
+ )
+
+ check(
+ Not(InSet(attr("d"),
+ Set(Literal(Date.valueOf("2019-01-01")).eval(),
+ Literal(Date.valueOf("2019-01-02")).eval()))),
+ Seq("2019-01-03")
+ )
+ check(
+ Not(InSet(attr("d"),
+ Set(Literal(Date.valueOf("2019-01-01")).eval(),
+ Literal(Date.valueOf("2019-01-02")).eval(), null))),
+ dateValue
+ )
+ }
+
test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") {
testMetastorePartitionFiltering(
attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]