This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9996c67b92 [SPARK-41990][SQL] Use `FieldReference.column` instead of
`apply` in V1 to V2 filter conversion
d9996c67b92 is described below
commit d9996c67b92d7a6d591fcbe8a42d87e3d9b8fd79
Author: huaxingao <[email protected]>
AuthorDate: Sat Jan 14 22:52:49 2023 -0800
[SPARK-41990][SQL] Use `FieldReference.column` instead of `apply` in V1 to
V2 filter conversion
### What changes were proposed in this pull request?
Use `FieldReference.column` instead of `FieldReference.apply` in V1 to V2
filter conversion
### Why are the changes needed?
Previously, filtering by composite field name doesn't work
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #39564 from huaxingao/field_reference.
Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/sources/filters.scala | 36 ++++++++++++++--------
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 19 ++++++++++++
2 files changed, 42 insertions(+), 13 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 66ec4a6c7b9..080d17b47fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.sources
import org.apache.spark.annotation.{Evolving, Stable}
import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.parser.ParseException
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
-import org.apache.spark.sql.connector.expressions.{FieldReference,
LiteralValue}
+import org.apache.spark.sql.connector.expressions.{FieldReference,
LiteralValue, NamedReference}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse =>
V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, Not => V2Not, Or =>
V2Or, Predicate}
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String
@@ -74,6 +75,15 @@ sealed abstract class Filter {
* Converts V1 filter to V2 filter
*/
private[sql] def toV2: Predicate
+
+ protected def toV2Column(attribute: String): NamedReference = {
+ try {
+ FieldReference(attribute)
+ } catch {
+ case _: ParseException =>
+ FieldReference.column(attribute)
+ }
+ }
}
/**
@@ -91,7 +101,7 @@ case class EqualTo(attribute: String, value: Any) extends
Filter {
override def toV2: Predicate = {
val literal = Literal(value)
new Predicate("=",
- Array(FieldReference(attribute), LiteralValue(literal.value,
literal.dataType)))
+ Array(toV2Column(attribute), LiteralValue(literal.value,
literal.dataType)))
}
}
@@ -111,7 +121,7 @@ case class EqualNullSafe(attribute: String, value: Any)
extends Filter {
override def toV2: Predicate = {
val literal = Literal(value)
new Predicate("<=>",
- Array(FieldReference(attribute), LiteralValue(literal.value,
literal.dataType)))
+ Array(toV2Column(attribute), LiteralValue(literal.value,
literal.dataType)))
}
}
@@ -130,7 +140,7 @@ case class GreaterThan(attribute: String, value: Any)
extends Filter {
override def toV2: Predicate = {
val literal = Literal(value)
new Predicate(">",
- Array(FieldReference(attribute), LiteralValue(literal.value,
literal.dataType)))
+ Array(toV2Column(attribute), LiteralValue(literal.value,
literal.dataType)))
}
}
@@ -149,7 +159,7 @@ case class GreaterThanOrEqual(attribute: String, value:
Any) extends Filter {
override def toV2: Predicate = {
val literal = Literal(value)
new Predicate(">=",
- Array(FieldReference(attribute), LiteralValue(literal.value,
literal.dataType)))
+ Array(toV2Column(attribute), LiteralValue(literal.value,
literal.dataType)))
}
}
@@ -168,7 +178,7 @@ case class LessThan(attribute: String, value: Any) extends
Filter {
override def toV2: Predicate = {
val literal = Literal(value)
new Predicate("<",
- Array(FieldReference(attribute), LiteralValue(literal.value,
literal.dataType)))
+ Array(toV2Column(attribute), LiteralValue(literal.value,
literal.dataType)))
}
}
@@ -187,7 +197,7 @@ case class LessThanOrEqual(attribute: String, value: Any)
extends Filter {
override def toV2: Predicate = {
val literal = Literal(value)
new Predicate("<=",
- Array(FieldReference(attribute), LiteralValue(literal.value,
literal.dataType)))
+ Array(toV2Column(attribute), LiteralValue(literal.value,
literal.dataType)))
}
}
@@ -230,7 +240,7 @@ case class In(attribute: String, values: Array[Any])
extends Filter {
val literal = Literal(value)
LiteralValue(literal.value, literal.dataType)
}
- new Predicate("IN", FieldReference(attribute) +: literals)
+ new Predicate("IN", toV2Column(attribute) +: literals)
}
}
@@ -245,7 +255,7 @@ case class In(attribute: String, values: Array[Any])
extends Filter {
@Stable
case class IsNull(attribute: String) extends Filter {
override def references: Array[String] = Array(attribute)
- override def toV2: Predicate = new Predicate("IS_NULL",
Array(FieldReference(attribute)))
+ override def toV2: Predicate = new Predicate("IS_NULL",
Array(toV2Column(attribute)))
}
/**
@@ -259,7 +269,7 @@ case class IsNull(attribute: String) extends Filter {
@Stable
case class IsNotNull(attribute: String) extends Filter {
override def references: Array[String] = Array(attribute)
- override def toV2: Predicate = new Predicate("IS_NOT_NULL",
Array(FieldReference(attribute)))
+ override def toV2: Predicate = new Predicate("IS_NOT_NULL",
Array(toV2Column(attribute)))
}
/**
@@ -308,7 +318,7 @@ case class Not(child: Filter) extends Filter {
case class StringStartsWith(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)
override def toV2: Predicate = new Predicate("STARTS_WITH",
- Array(FieldReference(attribute),
LiteralValue(UTF8String.fromString(value), StringType)))
+ Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value),
StringType)))
}
/**
@@ -324,7 +334,7 @@ case class StringStartsWith(attribute: String, value:
String) extends Filter {
case class StringEndsWith(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)
override def toV2: Predicate = new Predicate("ENDS_WITH",
- Array(FieldReference(attribute),
LiteralValue(UTF8String.fromString(value), StringType)))
+ Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value),
StringType)))
}
/**
@@ -340,7 +350,7 @@ case class StringEndsWith(attribute: String, value: String)
extends Filter {
case class StringContains(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)
override def toV2: Predicate = new Predicate("CONTAINS",
- Array(FieldReference(attribute),
LiteralValue(UTF8String.fromString(value), StringType)))
+ Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value),
StringType)))
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b87fee6cec2..3e317dc9547 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -276,6 +276,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
"INSERT INTO test.datetime VALUES ('2018-07-12', '2018-07-12
09:51:15.0')").executeUpdate()
conn.commit()
+ conn.prepareStatement(
+ "CREATE TABLE test.composite_name (`last name` TEXT(32) NOT NULL, id
INTEGER NOT NULL)")
+ .executeUpdate()
+ conn.prepareStatement("INSERT INTO test.composite_name VALUES ('smith',
1)").executeUpdate()
+ conn.prepareStatement("INSERT INTO test.composite_name VALUES ('jones',
2)").executeUpdate()
+ conn.commit()
+
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW composite_name
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url', dbtable 'TEST.COMPOSITE_NAME', user 'testUser',
password 'testPass')
+ """.stripMargin.replaceAll("\n", " "))
+
// Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
}
@@ -1963,4 +1977,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession
{
}
}
}
+
+ test("SPARK-41990: Filter with composite name") {
+ val df = sql("SELECT * FROM composite_name WHERE `last name` = 'smith'")
+ assert(df.collect.toSet === Set(Row("smith", 1)))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]