[CARBONDATA-2051] Added like query ends with and contains with filter push down suport to carbondata
Problem Current like filter with start with expression is only pushed down to carbondata. In case of ends with and contains like filter all the data is given back to spark and then spark applies the filter on it. This behavior is fine for the queries which has lesser number of queried columns. But as the number of columns and data increases there is performance impact because the data being sent to spark becomes more thereby increasing the IO. If like filter is push down then first filter column is read and blocks are pruned. In this cases the data returned to the spark is after applying the filter and only blocklets matching the data are fully read. This reduces IO and increases the query performance. Solution Modify code to push down like query with ends and contains with filter This closes #1830 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/543a903c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/543a903c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/543a903c Branch: refs/heads/carbonstore Commit: 543a903c6993f553ada309ec3ae23e0385013422 Parents: 5ea538f Author: manishgupta88 <[email protected]> Authored: Thu Jan 18 14:23:17 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Jan 18 22:55:08 2018 +0530 ---------------------------------------------------------------------- .../filterexpr/AllDataTypesTestCaseFilter.scala | 12 ++++++++++++ .../apache/spark/sql/CarbonBoundReference.scala | 8 ++++++++ .../strategy/CarbonLateDecodeStrategy.scala | 4 ++++ .../spark/sql/optimizer/CarbonFilters.scala | 18 ++++++++++++++++++ 4 files changed, 42 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala index e90ed3c..15ac1f4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala @@ -52,6 +52,18 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll { sql("select empno,empname from alldatatypestableFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"), sql("select empno,empname from alldatatypestableFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'")) } + + test("verify like query ends with filter push down") { + val df = sql("select * from alldatatypestableFilter where empname like '%nandh'").queryExecution + .sparkPlan + assert(df.metadata.get("PushedFilters").get.contains("CarbonEndsWith")) + } + + test("verify like query contains with filter push down") { + val df = sql("select * from alldatatypestableFilter where empname like '%nand%'").queryExecution + .sparkPlan + assert(df.metadata.get("PushedFilters").get.contains("CarbonContainsWith")) + } override def afterAll { sql("drop table alldatatypestableFilter") http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala index 21e56b0..a043342 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala @@ -48,3 +48,11 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu override def newInstance(): NamedExpression = throw new UnsupportedOperationException } + +case class CarbonEndsWith(expr: Expression) extends Filter { + override def references: Array[String] = null +} + +case class CarbonContainsWith(expr: Expression) extends Filter { + override def references: Array[String] = null +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 6ef3d47..4b1d11b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -612,6 +612,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { CastExpressionOptimization.checkIfCastCanBeRemove(c) case StartsWith(a: Attribute, Literal(v, t)) => Some(sources.StringStartsWith(a.name, v.toString)) + case c@EndsWith(a: Attribute, Literal(v, t)) => + Some(CarbonEndsWith(c)) + case c@Contains(a: Attribute, Literal(v, t)) => + Some(CarbonContainsWith(c)) case others => None } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/543a903c/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 341b513..4d91375 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.CastExpressionOptimization import org.apache.spark.sql.types._ +import org.apache.spark.sql.CarbonContainsWith +import org.apache.spark.sql.CarbonEndsWith import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.CarbonSessionCatalog @@ -119,6 +121,18 @@ object CarbonFilters { val r = new LessThanExpression( getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit)) Some(new AndExpression(l, r)) + case CarbonEndsWith(expr: Expression) => + Some(new SparkUnknownExpression(expr.transform { + case AttributeReference(name, dataType, _, _) => + CarbonBoundReference(new CarbonColumnExpression(name.toString, + CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable) + })) + case CarbonContainsWith(expr: Expression) => + Some(new SparkUnknownExpression(expr.transform { + case AttributeReference(name, dataType, _, _) => + CarbonBoundReference(new CarbonColumnExpression(name.toString, + CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable) + })) case CastExpr(expr: Expression) => Some(transformExpression(expr)) case _ => None @@ -249,6 +263,10 @@ object CarbonFilters { CastExpressionOptimization.checkIfCastCanBeRemove(c) case StartsWith(a: Attribute, Literal(v, t)) => Some(sources.StringStartsWith(a.name, v.toString)) + case c@EndsWith(a: Attribute, Literal(v, t)) => + Some(CarbonEndsWith(c)) + case c@Contains(a: Attribute, Literal(v, t)) => + Some(CarbonContainsWith(c)) case c@Cast(a: Attribute, _) => Some(CastExpr(c)) case others =>
