Repository: carbondata Updated Branches: refs/heads/master 68e5b52c4 -> 9a75ce53b
[CARBONDATA-2539][MV] Fix predicate subquery which uses leftsemi join Problem: References to the top plan is not getting right when predicate subquery is present. Solution: Correct the refrences. This closes #2477 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9a75ce53 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9a75ce53 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9a75ce53 Branch: refs/heads/master Commit: 9a75ce53bb062d25fb2b2b4ba7fd4cb611a256a7 Parents: 68e5b52 Author: ravipesala <ravi.pes...@gmail.com> Authored: Sun Jul 15 17:41:30 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Jul 25 14:28:37 2018 +0800 ---------------------------------------------------------------------- .../apache/carbondata/mv/datamap/MVHelper.scala | 42 +++++++++++++++++++- .../mv/rewrite/DefaultMatchMaker.scala | 5 +-- .../mv/rewrite/MVCreateTestCase.scala | 12 ++++++ 3 files changed, 54 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a75ce53/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index a52e4c9..8c5fa7b 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -459,9 +459,31 @@ object MVHelper { aliasMap = relation.aliasMap, flagSpec = updatedFlagSpec).setRewritten() } else { - val outputSel = - updateOutPutList(select.outputList, relation, aliasMap, keepAlias = false) + // First find the indices from the child outlist. + val indices = select.outputList.map{c => + g.outputList.indexWhere{ + case al : Alias if c.isInstanceOf[Alias] => + al.child.semanticEquals(c.asInstanceOf[Alias].child) + case al: Alias if al.child.semanticEquals(c) => true + case other if c.isInstanceOf[Alias] => + other.semanticEquals(c.asInstanceOf[Alias].child) + case other => + other.semanticEquals(c) || other.toAttribute.semanticEquals(c) + } + } val child = updateDataMap(g, rewrite).asInstanceOf[Matchable] + // Get the outList from converted child outList using already selected indices + val outputSel = + indices.map(child.outputList(_)).zip(select.outputList).map { case (l, r) => + l match { + case a: Alias if r.isInstanceOf[Alias] => + Alias(a.child, r.name)(exprId = r.exprId) + case a: Alias => a + case other if r.isInstanceOf[Alias] => + Alias(other, r.name)(exprId = r.exprId) + case other => other + } + } // TODO Remove the unnecessary columns from selection. // Only keep columns which are required by parent. val inputSel = child.outputList @@ -518,6 +540,22 @@ object MVHelper { } } + // Create the aliases using two plan outputs mappings. + def createAliases(mappings: Seq[(NamedExpression, NamedExpression)]): Seq[NamedExpression] = { + mappings.map{ case (o1, o2) => + o2 match { + case al: Alias if o1.name == o2.name && o1.exprId != o2.exprId => + Alias(al.child, o1.name)(exprId = o1.exprId) + case other => + if (o1.name != o2.name || o1.exprId != o2.exprId) { + Alias(o2, o1.name)(exprId = o1.exprId) + } else { + o2 + } + } + } + } + /** * Rewrite the updated mv query with corresponding MV table. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a75ce53/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala index 3a0fa10..f1c48ad 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter} +import org.apache.carbondata.mv.datamap.MVHelper import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _} import org.apache.carbondata.mv.plans.modular import org.apache.carbondata.mv.plans.modular.Flags._ @@ -689,9 +690,7 @@ object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateH }.getOrElse(gb_2c.outputList(index))) } - val oList = for ((o1, o2) <- mappings) yield { - if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2 - } + val oList = MVHelper.createAliases(mappings) val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2)) val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a75ce53/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index f0f9c82..6adb14e 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -873,6 +873,18 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql("drop datamap if exists datamap46") } + test("jira carbondata-2539") { + + sql("drop datamap if exists datamap_subqry") + sql("create datamap datamap_subqry using 'mv' as select empname, min(salary) from fact_table1 group by empname") + sql("rebuild datamap datamap_subqry") + val frame = sql( + "SELECT max(utilization) FROM fact_table1 WHERE salary IN (select min(salary) from fact_table1 group by empname ) group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_subqry")) + sql("drop datamap if exists datamap_subqry") + } + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { val tables = logicalPlan collect { case l: LogicalRelation => l.catalogTable.get