Repository: spark
Updated Branches:
refs/heads/branch-2.4 77156f8c8 -> 144cb949d
[SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates
## What changes were proposed in this pull request?
This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark
2.3.2. Currently, for column names with `.`, the pushed predicates are ignored.
**Test Data**
```scala
scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot")
scala> df.write.mode("overwrite").orc("/tmp/orc")
```
**Spark 2.3.2**
```scala
scala> spark.sql("set spark.sql.orc.impl=native")
scala> spark.sql("set spark.sql.orc.filterPushdown=true")
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
| 5|
| 7|
| 8|
+------------+
Time taken: 1542 ms
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
| 5|
| 7|
| 8|
+------------+
Time taken: 152 ms
```
**Spark 2.4.0 RC3**
```scala
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
| 5|
| 7|
| 8|
+------------+
Time taken: 4074 ms
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
| 5|
| 7|
| 8|
+------------+
Time taken: 1771 ms
```
## How was this patch tested?
Pass the Jenkins with a newly added test case.
Closes #22597 from dongjoon-hyun/SPARK-25579.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
(cherry picked from commit 2c664edc060a41340eb374fd44b5d32c3c06a15c)
Signed-off-by: hyukjinkwon <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144cb949
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144cb949
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144cb949
Branch: refs/heads/branch-2.4
Commit: 144cb949d597e6cd0e662f2320e983cb6903ecfb
Parents: 77156f8
Author: Dongjoon Hyun <[email protected]>
Authored: Tue Oct 16 20:30:23 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Tue Oct 16 20:30:40 2018 +0800
----------------------------------------------------------------------
.../execution/datasources/orc/OrcFilters.scala | 37 +++++++++++++++-----
.../datasources/orc/OrcQuerySuite.scala | 28 +++++----------
.../sql/execution/datasources/orc/OrcTest.scala | 10 ++++++
3 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index dbafc46..5b93a60 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -67,6 +67,16 @@ private[sql] object OrcFilters {
}
}
+ // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.`
characters
+ // in order to distinguish predicate pushdown for nested columns.
+ private def quoteAttributeNameIfNeeded(name: String) : String = {
+ if (!name.contains("`") && name.contains(".")) {
+ s"`$name`"
+ } else {
+ name
+ }
+ }
+
/**
* Create ORC filter as a SearchArgument instance.
*/
@@ -178,38 +188,47 @@ private[sql] object OrcFilters {
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
case EqualTo(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().equals(attribute, getType(attribute),
castedValue).end())
+ Some(builder.startAnd().equals(quotedName, getType(attribute),
castedValue).end())
case EqualNullSafe(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute),
castedValue).end())
+ Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute),
castedValue).end())
case LessThan(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().lessThan(attribute, getType(attribute),
castedValue).end())
+ Some(builder.startAnd().lessThan(quotedName, getType(attribute),
castedValue).end())
case LessThanOrEqual(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().lessThanEquals(attribute, getType(attribute),
castedValue).end())
+ Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute),
castedValue).end())
case GreaterThan(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startNot().lessThanEquals(attribute, getType(attribute),
castedValue).end())
+ Some(builder.startNot().lessThanEquals(quotedName, getType(attribute),
castedValue).end())
case GreaterThanOrEqual(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startNot().lessThan(attribute, getType(attribute),
castedValue).end())
+ Some(builder.startNot().lessThan(quotedName, getType(attribute),
castedValue).end())
case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().isNull(attribute, getType(attribute)).end())
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startNot().isNull(attribute, getType(attribute)).end())
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v,
dataTypeMap(attribute)))
- Some(builder.startAnd().in(attribute, getType(attribute),
+ Some(builder.startAnd().in(quotedName, getType(attribute),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
case _ => None
http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index e9dccbf..998b7b3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -445,16 +445,7 @@ abstract class OrcQueryTest extends OrcTest {
test("Support for pushing down filters for decimal types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
- withTempPath { file =>
- // It needs to repartition data so that we can have several ORC files
- // in order to skip stripes in ORC.
- spark.createDataFrame(data).toDF("a").repartition(10)
- .write.orc(file.getCanonicalPath)
- val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
- val actual = stripSparkFilter(df).count()
-
- assert(actual < 10)
- }
+ checkPredicatePushDown(spark.createDataFrame(data).toDF("a"), 10, "a ==
2")
}
}
@@ -465,16 +456,7 @@ abstract class OrcQueryTest extends OrcTest {
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
Tuple1(new Timestamp(milliseconds))
}
- withTempPath { file =>
- // It needs to repartition data so that we can have several ORC files
- // in order to skip stripes in ORC.
- spark.createDataFrame(data).toDF("a").repartition(10)
- .write.orc(file.getCanonicalPath)
- val df = spark.read.orc(file.getCanonicalPath).where(s"a ==
'$timeString'")
- val actual = stripSparkFilter(df).count()
-
- assert(actual < 10)
- }
+ checkPredicatePushDown(spark.createDataFrame(data).toDF("a"), 10, s"a ==
'$timeString'")
}
}
@@ -674,6 +656,12 @@ class OrcQuerySuite extends OrcQueryTest with
SharedSQLContext {
}
}
+ test("SPARK-25579 ORC PPD should support column names with dot") {
+ withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ checkPredicatePushDown(spark.range(10).toDF("col.dot"), 10, "`col.dot`
== 2")
+ }
+ }
+
test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and
sql/core") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
val e = intercept[AnalysisException] {
http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 38b34a0..a35c536 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -106,4 +106,14 @@ abstract class OrcTest extends QueryTest with SQLTestUtils
with BeforeAndAfterAl
df: DataFrame, path: File): Unit = {
df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}
+
+ protected def checkPredicatePushDown(df: DataFrame, numRows: Int, predicate:
String): Unit = {
+ withTempPath { file =>
+ // It needs to repartition data so that we can have several ORC files
+ // in order to skip stripes in ORC.
+ df.repartition(numRows).write.orc(file.getCanonicalPath)
+ val actual =
stripSparkFilter(spark.read.orc(file.getCanonicalPath).where(predicate)).count()
+ assert(actual < numRows)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]