[CARBONDATA-2022]Fixed table alias issue in PreAggregate **Problem:**Query with table alias is Not hitting pre Aggregate table. Solution: Problem is table alias query is plan is coming as SubQueryAlias(alias, SubqueryAlias) ans this case is not present in tranform query plan for pre aggregate
This closes #1794 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/913837fb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/913837fb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/913837fb Branch: refs/heads/fgdatamap Commit: 913837fbcf6d51f21b4f1130158ea3281b7a89b3 Parents: 36b7982 Author: BJangir <[email protected]> Authored: Thu Jan 11 15:30:59 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Jan 12 19:07:29 2018 +0530 ---------------------------------------------------------------------- .../TestPreAggregateTableSelection.scala | 18 ++++- .../sql/hive/CarbonPreAggregateRules.scala | 85 ++++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/913837fb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index 2e0dcc4..9bbba3a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -240,12 +240,28 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE grouptable(id int, name string, city string, age string) STORED BY" + " 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='name,age')") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table grouptable") - sql("create datamap agg9 on table grouptable using 'preaggregate' as select sum(id) from grouptable group by city") + sql( + "create datamap agg9 on table grouptable using 'preaggregate' as select sum(id) from grouptable group by city") val df = sql("select sum(id) from grouptable group by city") preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9") checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7))) } + test("test PreAggregate table selection 30") { + val df = sql("select a.name from mainTable a group by a.name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + test("test PreAggregate table selection 31") { + val df = sql("select a.name as newName from mainTable a group by a.name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + test("test PreAggregate table selection 32") { + val df = sql("select a.name as newName from mainTable a where a.name='vishal' group by a.name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists lineitem") http://git-wip-us.apache.org/repos/asf/carbondata/blob/913837fb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 8811a4e..de554c5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -260,6 +260,48 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule */ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { val updatedPlan = logicalPlan.transform { + case agg@Aggregate( + grExp, + aggExp, + CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: LogicalRelation))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(l) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, + carbonTable, + agg) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + } else { + agg + } + } else { + agg + } // case for aggregation query case agg@Aggregate( grExp, @@ -352,6 +394,49 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } else { agg } + case agg@Aggregate( + grExp, + aggExp, + Filter(expression, CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(l) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, + carbonTable, + agg) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + } else { + agg + } + } else { + agg + } + } updatedPlan }
