[CARBONDATA-2550][CARBONDATA-2576][MV] Fix limit and average function issue in MV query
Problem: Limit is not working on mv queries and the average is also not working. Solution: Correct the limit queries and average queries through making as full refresh query. This closes #2480 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/118f4588 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/118f4588 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/118f4588 Branch: refs/heads/branch-1.4 Commit: 118f45883413b9e5c0a60f2e9a98cae15821e8f7 Parents: 9680fd4 Author: ravipesala <[email protected]> Authored: Sun Jul 15 17:37:09 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue Jul 31 00:10:41 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/mv/datamap/MVHelper.scala | 21 ++++++++++---- .../mv/rewrite/MVCreateTestCase.scala | 30 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/118f4588/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index 8f79d64..389f5be 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -162,9 +162,15 @@ object MVHelper { private def isFullReload(logicalPlan: LogicalPlan): Boolean = { var isFullReload = false logicalPlan.transformAllExpressions { - case a: Alias => - a - case agg: AggregateExpression => agg + case a: Alias => a + case agg: AggregateExpression => + // If average function present then go for full refresh + var reload = agg.aggregateFunction match { + case avg: Average => true + case _ => false + } + isFullReload = reload || isFullReload + agg case c: Cast => isFullReload = c.child.find { case agg: AggregateExpression => false @@ -390,7 +396,8 @@ object MVHelper { val relation = g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select] val aliasMap = getAttributeMap(relation.outputList, g.outputList) - val updatedFlagSpec: Seq[Seq[ArrayBuffer[SortOrder]]] = updateSortOrder( + // Update the flagspec as per the mv table attributes. + val updatedFlagSpec: Seq[Seq[Any]] = updateFlagSpec( keepAlias = false, select, relation, @@ -434,10 +441,10 @@ object MVHelper { /** * Updates the flagspec of given select plan with attributes of relation select plan */ - private def updateSortOrder(keepAlias: Boolean, + private def updateFlagSpec(keepAlias: Boolean, select: Select, relation: Select, - aliasMap: Map[AttributeKey, NamedExpression]) = { + aliasMap: Map[AttributeKey, NamedExpression]): Seq[Seq[Any]] = { val updatedFlagSpec = select.flagSpec.map { f => f.map { case list: ArrayBuffer[SortOrder] => @@ -450,6 +457,8 @@ object MVHelper { keepAlias = false) SortOrder(expressions.head, s.direction, s.sameOrderExpressions) } + // In case of limit it goes to other. + case other => other } } updatedFlagSpec http://git-wip-us.apache.org/repos/asf/carbondata/blob/118f4588/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 264eb96..222c9f0 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -784,6 +784,36 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql("""drop database if exists xy cascade""") } + test("jira carbondata-2550") { + + sql("drop table if exists mvtable1") + sql("drop datamap if exists map1") + sql("create table mvtable1(name string,age int,salary int) stored by 'carbondata'") + sql(" insert into mvtable1 select 'n1',12,12") + sql(" insert into mvtable1 select 'n1',12,12") + sql(" insert into mvtable1 select 'n3',12,12") + sql(" insert into mvtable1 select 'n4',12,12") + sql("create datamap map1 using 'mv' as select name,sum(salary) from mvtable1 group by name") + sql("rebuild datamap map1") + val frame = sql("select name,sum(salary) from mvtable1 group by name limit 1") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "map1")) + sql("drop datamap if exists map1") + sql("drop table if exists mvtable1") + } + + test("jira carbondata-2576") { + + sql("drop datamap if exists datamap_comp_maxsumminavg") + sql("create datamap datamap_comp_maxsumminavg using 'mv' as select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname") + sql("rebuild datamap datamap_comp_maxsumminavg") + val frame = sql( + "select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_comp_maxsumminavg")) + sql("drop datamap if exists datamap_comp_maxsumminavg") + } + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { val tables = logicalPlan collect { case l: LogicalRelation => l.catalogTable.get
