This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2036074 [SPARK-26004][SQL] InMemoryTable support StartsWith predicate
push down
2036074 is described below
commit 2036074b996d3cdd6aaace31a79c825797a6997c
Author: Yuming Wang <[email protected]>
AuthorDate: Fri Mar 8 19:18:32 2019 +0800
[SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
## What changes were proposed in this pull request?
[SPARK-24638](https://issues.apache.org/jira/browse/SPARK-24638) adds
support for Parquet file `StartsWith` predicate push down.
`InMemoryTable` can also support this feature.
This is an example to explain how it works, Imagine that the `id` column
stored as below:
Partition ID | lowerBound | upperBound
-- | -- | --
p1 | '1' | '9'
p2 | '10' | '19'
p3 | '20' | '29'
p4 | '30' | '39'
p5 | '40' | '49'
A filter ```df.filter($"id".startsWith("2"))``` or ```id like '2%'```
then we substr lowerBound and upperBound:
Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0,
Length("2"))
-- | -- | --
p1 | '1' | '9'
p2 | '1' | '1'
p3 | '2' | '2'
p4 | '3' | '3'
p5 | '4' | '4'
We can see that we only need to read `p1` and `p3`.
## How was this patch tested?
unit tests and benchmark tests
benchmark test result:
```
================================================================================================
Pushdown benchmark for StringStartsWith
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz
StringStartsWith filter: (value like '10%'): Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized 12068 / 14198 1.3
767.3 1.0X
InMemoryTable Vectorized (Pushdown) 5457 / 8662 2.9
347.0 2.2X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz
StringStartsWith filter: (value like '1000%'): Best/Avg Time(ms)
Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized 5246 / 5355 3.0
333.5 1.0X
InMemoryTable Vectorized (Pushdown) 2185 / 2346 7.2
138.9 2.4X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz
StringStartsWith filter: (value like '786432%'): Best/Avg Time(ms)
Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized 5112 / 5312 3.1
325.0 1.0X
InMemoryTable Vectorized (Pushdown) 2292 / 2522 6.9
145.7 2.2X
```
Closes #23004 from wangyum/SPARK-26004.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/columnar/InMemoryTableScanExec.scala | 30 +++++++++++++++++++++-
.../columnar/PartitionBatchPruningSuite.scala | 9 +++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 8f8d801..b827878 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -194,7 +194,7 @@ case class InMemoryTableScanExec(
}
// Returned filter predicate should return false iff it is impossible for
the input expression
- // to evaluate to `true' based on statistics collected about this partition
batch.
+ // to evaluate to `true` based on statistics collected about this partition
batch.
@transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
@@ -237,6 +237,34 @@ case class InMemoryTableScanExec(
if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty
=>
list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
+
+ // This is an example to explain how it works, imagine that the id column
stored as follows:
+ // __________________________________________
+ // | Partition ID | lowerBound | upperBound |
+ // |--------------|------------|------------|
+ // | p1 | '1' | '9' |
+ // | p2 | '10' | '19' |
+ // | p3 | '20' | '29' |
+ // | p4 | '30' | '39' |
+ // | p5 | '40' | '49' |
+ // |______________|____________|____________|
+ //
+ // A filter: df.filter($"id".startsWith("2")).
+ // In this case it substr lowerBound and upperBound:
+ //
________________________________________________________________________________________
+ // | Partition ID | lowerBound.substr(0, Length("2")) |
upperBound.substr(0, Length("2")) |
+ //
|--------------|-----------------------------------|-----------------------------------|
+ // | p1 | '1' | '9'
|
+ // | p2 | '1' | '1'
|
+ // | p3 | '2' | '2'
|
+ // | p4 | '3' | '3'
|
+ // | p5 | '4' | '4'
|
+ //
|______________|___________________________________|___________________________________|
+ //
+ // We can see that we only need to read p1 and p3.
+ case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
+ statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
+ l <= statsFor(a).upperBound.substr(0, Length(l))
}
lazy val partitionFilters: Seq[Expression] = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index af493e9..b3a5c68 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -170,6 +170,15 @@ class PartitionBatchPruningSuite
}
}
+ // Support `StartsWith` predicate
+ checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like
'18%'", 1, 1)(
+ 180 to 189
+ )
+ checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like
'%'", 5, 11)(
+ 100 to 200
+ )
+ checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE '18%'
like s", 5, 11)(Seq())
+
// With disable IN_MEMORY_PARTITION_PRUNING option
test("disable IN_MEMORY_PARTITION_PRUNING") {
spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]