[CARBONDATA-2051] Added like query ends with and contains with filter push down 
suport to carbondata

Problem
Current like filter with start with expression is only pushed down to 
carbondata. In case of ends with and contains like filter all the data is given 
back to spark and then spark applies the filter on it.
This behavior is fine for the queries which has lesser number of queried 
columns. But as the number of columns and data increases there is performance 
impact because the data being sent to spark becomes more thereby increasing the 
IO.
If like filter is push down then first filter column is read and blocks are 
pruned. In this cases the data returned to the spark is after applying the 
filter and only blocklets matching the data are fully read. This reduces IO and 
increases the query performance.

Solution
Modify code to push down like query with ends and contains with filter

This closes #1830


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/543a903c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/543a903c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/543a903c

Branch: refs/heads/carbonstore
Commit: 543a903c6993f553ada309ec3ae23e0385013422
Parents: 5ea538f
Author: manishgupta88 <[email protected]>
Authored: Thu Jan 18 14:23:17 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Thu Jan 18 22:55:08 2018 +0530

----------------------------------------------------------------------
 .../filterexpr/AllDataTypesTestCaseFilter.scala   | 12 ++++++++++++
 .../apache/spark/sql/CarbonBoundReference.scala   |  8 ++++++++
 .../strategy/CarbonLateDecodeStrategy.scala       |  4 ++++
 .../spark/sql/optimizer/CarbonFilters.scala       | 18 ++++++++++++++++++
 4 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index e90ed3c..15ac1f4 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -52,6 +52,18 @@ class AllDataTypesTestCaseFilter extends QueryTest with 
BeforeAndAfterAll {
       sql("select empno,empname from alldatatypestableFilter where 
regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"),
       sql("select empno,empname from alldatatypestableFilter_hive where 
regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"))
   }
+
+  test("verify like query ends with filter push down") {
+    val df = sql("select * from alldatatypestableFilter where empname like 
'%nandh'").queryExecution
+      .sparkPlan
+    assert(df.metadata.get("PushedFilters").get.contains("CarbonEndsWith"))
+  }
+
+  test("verify like query contains with filter push down") {
+    val df = sql("select * from alldatatypestableFilter where empname like 
'%nand%'").queryExecution
+      .sparkPlan
+    assert(df.metadata.get("PushedFilters").get.contains("CarbonContainsWith"))
+  }
   
   override def afterAll {
     sql("drop table alldatatypestableFilter")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index 21e56b0..a043342 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -48,3 +48,11 @@ case class CarbonBoundReference(colExp: ColumnExpression, 
dataType: DataType, nu
 
   override def newInstance(): NamedExpression = throw new 
UnsupportedOperationException
 }
+
+case class CarbonEndsWith(expr: Expression) extends Filter {
+  override def references: Array[String] = null
+}
+
+case class CarbonContainsWith(expr: Expression) extends Filter {
+  override def references: Array[String] = null
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 6ef3d47..4b1d11b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -612,6 +612,10 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case StartsWith(a: Attribute, Literal(v, t)) =>
         Some(sources.StringStartsWith(a.name, v.toString))
+      case c@EndsWith(a: Attribute, Literal(v, t)) =>
+        Some(CarbonEndsWith(c))
+      case c@Contains(a: Attribute, Literal(v, t)) =>
+        Some(CarbonContainsWith(c))
       case others => None
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 341b513..4d91375 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.CastExpressionOptimization
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.CarbonContainsWith
+import org.apache.spark.sql.CarbonEndsWith
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
@@ -119,6 +121,18 @@ object CarbonFilters {
           val r = new LessThanExpression(
             getCarbonExpression(name), getCarbonLiteralExpression(name, 
maxValueLimit))
           Some(new AndExpression(l, r))
+        case CarbonEndsWith(expr: Expression) =>
+          Some(new SparkUnknownExpression(expr.transform {
+            case AttributeReference(name, dataType, _, _) =>
+              CarbonBoundReference(new CarbonColumnExpression(name.toString,
+                CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), 
dataType, expr.nullable)
+          }))
+        case CarbonContainsWith(expr: Expression) =>
+          Some(new SparkUnknownExpression(expr.transform {
+            case AttributeReference(name, dataType, _, _) =>
+              CarbonBoundReference(new CarbonColumnExpression(name.toString,
+                CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), 
dataType, expr.nullable)
+          }))
         case CastExpr(expr: Expression) =>
           Some(transformExpression(expr))
         case _ => None
@@ -249,6 +263,10 @@ object CarbonFilters {
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
         case StartsWith(a: Attribute, Literal(v, t)) =>
           Some(sources.StringStartsWith(a.name, v.toString))
+        case c@EndsWith(a: Attribute, Literal(v, t)) =>
+          Some(CarbonEndsWith(c))
+        case c@Contains(a: Attribute, Literal(v, t)) =>
+          Some(CarbonContainsWith(c))
         case c@Cast(a: Attribute, _) =>
           Some(CastExpr(c))
         case others =>

Reply via email to