[CARBONDATA-1940][PreAgg] Fixed bug for creation of preaggregate table with group by clause
1. Refactored code to create pre-aggregate table with group by clause 2. Added related test cases This closes #1724 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/36b79826 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/36b79826 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/36b79826 Branch: refs/heads/fgdatamap Commit: 36b798262f271fe160c83498f6910944ca934962 Parents: 0adb32d Author: Geetika Gupta <[email protected]> Authored: Tue Dec 26 17:29:38 2017 +0530 Committer: kunal642 <[email protected]> Committed: Fri Jan 12 14:46:06 2018 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggCreateCommand.scala | 12 ++++++++++++ .../preaggregate/TestPreAggregateLoad.scala | 16 ++++++++++++++++ .../TestPreAggregateTableSelection.scala | 11 +++++++++++ .../command/preaaggregate/PreAggregateUtil.scala | 17 +++++++++++++++++ .../spark/sql/hive/CarbonPreAggregateRules.scala | 6 +++++- 5 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/36b79826/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index cb72732..5784bf2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -195,6 +195,18 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("drop datamap agg0 on table maintable") } + test("test pre agg create table 20") { + sql("create datamap agg0 on table mainTable using 'preaggregate' as select column3, sum(column3),column5, sum(column5) from maintable group by column3,column5,column2") + val df = sql("select * from maintable_agg0") + val carbontable = getCarbontable(df.queryExecution.analyzed) + assert(carbontable.getAllMeasures.size()==2) + assert(carbontable.getAllDimensions.size()==3) + carbontable.getAllDimensions.asScala.foreach{ f => + assert(!f.getEncoder.contains(Encoding.DICTIONARY)) + } + sql("drop datamap agg0 on table maintable") + } + def getCarbontable(plan: LogicalPlan) : CarbonTable ={ var carbonTable : CarbonTable = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/36b79826/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index d794f32..fb5f81d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -238,4 +238,20 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { sql("drop table if exists maintable") } + test("test load into preaggregate table having group by clause") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)") + sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)") + sql("set carbon.input.segments.default.maintable=0") + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id,name""" + .stripMargin) + checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52, "xyz")) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/36b79826/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index 322827e..2e0dcc4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -235,6 +235,17 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2") } + test("test pre-agg table with group by condition") { + sql("drop table if exists grouptable") + sql("CREATE TABLE grouptable(id int, name string, city string, age string) STORED BY" + + " 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='name,age')") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table grouptable") + sql("create datamap agg9 on table grouptable using 'preaggregate' as select sum(id) from grouptable group by city") + val df = sql("select sum(id) from grouptable group by city") + preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9") + checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7))) + } + override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists lineitem") http://git-wip-us.apache.org/repos/asf/carbondata/blob/36b79826/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 86d0c6a..153c1a4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -155,6 +155,23 @@ object PreAggregateUtil { throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ") } + groupByExp map { + case attr: AttributeReference => + val columnRelation = getColumnRelation( + attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + fieldToDataMapFieldMap += createField( + attr.name, + attr.dataType, + parentTableName = parentTableName, + columnTableRelationList = Seq(columnRelation)) + case _ => + throw new MalformedCarbonCommandException(s"Unsupported Function in select Statement:${ + selectStmt }") + } fieldToDataMapFieldMap } http://git-wip-us.apache.org/repos/asf/carbondata/blob/36b79826/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index bd8b4c6..8811a4e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -1008,7 +1008,7 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel] val namedExpressionList = scala.collection.mutable.LinkedHashSet.empty[NamedExpression] plan transform { - case aggregate@Aggregate(_, + case aggregate@Aggregate(groupingExpressions, aExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) if validateAggregateExpressions(aExp) && @@ -1066,6 +1066,10 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) case alias@Alias(_: Expression, _) => namedExpressionList += alias } + groupingExpressions foreach { + case namedExpr: NamedExpression => namedExpressionList += namedExpr + case _ => namedExpressionList + } aggregate.copy(aggregateExpressions = namedExpressionList.toSeq) case plan: LogicalPlan => plan }
