This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6cdc32f [SPARK-32622][SQL][TEST] Add case-sensitivity test for ORC
predicate pushdown
6cdc32f is described below
commit 6cdc32fb33cf717dbc6ad6e674f8c942535683ba
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Aug 17 13:19:49 2020 -0700
[SPARK-32622][SQL][TEST] Add case-sensitivity test for ORC predicate
pushdown
### What changes were proposed in this pull request?
During working on SPARK-25557, we found that ORC predicate pushdown doesn't
have case-sensitivity test. This PR proposes to add case-sensitivity test for
ORC predicate pushdown.
### Why are the changes needed?
Increasing test coverage for ORC predicate pushdown.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass Jenkins tests.
Closes #29427 from viirya/SPARK-25557-followup3.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b33066f42bd474f5f80b14221f97d09a76e0b398)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../execution/datasources/orc/OrcFilterSuite.scala | 97 +++++++++++++++++++++-
.../execution/datasources/orc/OrcFilterSuite.scala | 97 +++++++++++++++++++++-
2 files changed, 190 insertions(+), 4 deletions(-)
diff --git
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 88b4b24..beb7232 100644
---
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -25,8 +25,8 @@ import scala.collection.JavaConverters._
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -469,5 +469,98 @@ class OrcFilterSuite extends OrcTest with
SharedSparkSession {
).get.toString
}
}
+
+ test("SPARK-32622: case sensitivity in predicate pushdown") {
+ withTempPath { dir =>
+ val count = 10
+ val tableName = "spark_32622"
+ val tableDir1 = dir.getAbsoluteFile + "/table1"
+
+ // Physical ORC files have both `A` and `a` fields.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as
a")
+ .write.mode("overwrite").orc(tableDir1)
+ }
+
+ // Metastore table has both `A` and `a` fields too.
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION
'$tableDir1'
+ """.stripMargin)
+
+ checkAnswer(sql(s"select a, A from $tableName"), (0 until
count).map(c => Row(c, c - 1)))
+
+ val actual1 = stripSparkFilter(sql(s"select A from $tableName where
A < 0"))
+ assert(actual1.count() == 1)
+
+ val actual2 = stripSparkFilter(sql(s"select A from $tableName where
a < 0"))
+ assert(actual2.count() == 0)
+ }
+
+ // Exception thrown for ambiguous case.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val e = intercept[AnalysisException] {
+ sql(s"select a from $tableName where a < 0").collect()
+ }
+ assert(e.getMessage.contains(
+ "Reference 'a' is ambiguous"))
+ }
+ }
+
+ // Metastore table has only `A` field.
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG) USING ORC LOCATION
'$tableDir1'
+ """.stripMargin)
+
+ val e = intercept[SparkException] {
+ sql(s"select A from $tableName where A < 0").collect()
+ }
+ assert(e.getCause.isInstanceOf[RuntimeException] &&
e.getCause.getMessage.contains(
+ """Found duplicate field(s) "A": [A, a] in case-insensitive
mode"""))
+ }
+ }
+
+ // Physical ORC files have only `A` field.
+ val tableDir2 = dir.getAbsoluteFile + "/table2"
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ spark.range(count).repartition(count).selectExpr("id - 1 as A")
+ .write.mode("overwrite").orc(tableDir2)
+ }
+
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (a LONG) USING ORC LOCATION
'$tableDir2'
+ """.stripMargin)
+
+ checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c
=> Row(c - 1)))
+
+ val actual = stripSparkFilter(sql(s"select a from $tableName where a
< 0"))
+ // TODO: ORC predicate pushdown should work under case-insensitive
analysis.
+ // assert(actual.count() == 1)
+ }
+ }
+
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG) USING ORC LOCATION
'$tableDir2'
+ """.stripMargin)
+
+ checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c
=> Row(c - 1)))
+
+ val actual = stripSparkFilter(sql(s"select A from $tableName where A
< 0"))
+ assert(actual.count() == 1)
+ }
+ }
+ }
+ }
}
diff --git
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 2263179..a3e450c 100644
---
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -25,8 +25,8 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -470,5 +470,98 @@ class OrcFilterSuite extends OrcTest with
SharedSparkSession {
).get.toString
}
}
+
+ test("SPARK-32622: case sensitivity in predicate pushdown") {
+ withTempPath { dir =>
+ val count = 10
+ val tableName = "spark_32622"
+ val tableDir1 = dir.getAbsoluteFile + "/table1"
+
+ // Physical ORC files have both `A` and `a` fields.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as
a")
+ .write.mode("overwrite").orc(tableDir1)
+ }
+
+ // Metastore table has both `A` and `a` fields too.
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION
'$tableDir1'
+ """.stripMargin)
+
+ checkAnswer(sql(s"select a, A from $tableName"), (0 until
count).map(c => Row(c, c - 1)))
+
+ val actual1 = stripSparkFilter(sql(s"select A from $tableName where
A < 0"))
+ assert(actual1.count() == 1)
+
+ val actual2 = stripSparkFilter(sql(s"select A from $tableName where
a < 0"))
+ assert(actual2.count() == 0)
+ }
+
+ // Exception thrown for ambiguous case.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val e = intercept[AnalysisException] {
+ sql(s"select a from $tableName where a < 0").collect()
+ }
+ assert(e.getMessage.contains(
+ "Reference 'a' is ambiguous"))
+ }
+ }
+
+ // Metastore table has only `A` field.
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG) USING ORC LOCATION
'$tableDir1'
+ """.stripMargin)
+
+ val e = intercept[SparkException] {
+ sql(s"select A from $tableName where A < 0").collect()
+ }
+ assert(e.getCause.isInstanceOf[RuntimeException] &&
e.getCause.getMessage.contains(
+ """Found duplicate field(s) "A": [A, a] in case-insensitive
mode"""))
+ }
+ }
+
+ // Physical ORC files have only `A` field.
+ val tableDir2 = dir.getAbsoluteFile + "/table2"
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ spark.range(count).repartition(count).selectExpr("id - 1 as A")
+ .write.mode("overwrite").orc(tableDir2)
+ }
+
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (a LONG) USING ORC LOCATION
'$tableDir2'
+ """.stripMargin)
+
+ checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c
=> Row(c - 1)))
+
+ val actual = stripSparkFilter(sql(s"select a from $tableName where a
< 0"))
+ // TODO: ORC predicate pushdown should work under case-insensitive
analysis.
+ // assert(actual.count() == 1)
+ }
+ }
+
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG) USING ORC LOCATION
'$tableDir2'
+ """.stripMargin)
+
+ checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c
=> Row(c - 1)))
+
+ val actual = stripSparkFilter(sql(s"select A from $tableName where A
< 0"))
+ assert(actual.count() == 1)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]