[CARBONDATA-2046]agg Query failed when non supported aggregate is present in Query
Root Cause :- isValidPlan variable was getting overwritten by CarbonReflectionUtils.hasPredicateSubquery(expression) . Solution :- CarbonReflectionUtils.hasPredicateSubquery(expression) method should be called when isValidPlan is true to avoid overwritten. This closes #1824 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/23bc051b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/23bc051b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/23bc051b Branch: refs/heads/carbonstore Commit: 23bc051b84c2d1f16147ba8f0df79bfc83c543f8 Parents: bef6af3 Author: BJangir <[email protected]> Authored: Wed Jan 17 22:32:10 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Jan 19 19:09:38 2018 +0530 ---------------------------------------------------------------------- .../TestPreAggregateTableSelection.scala | 14 +++++ .../sql/hive/CarbonPreAggregateRules.scala | 64 ++++++++++++++------ 2 files changed, 60 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/23bc051b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index 3fefbaf..17d95ef 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -285,6 +285,20 @@ test("test PreAggregate table selection with timeseries and normal together") { preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year") } + + test("test table selection when unsupported aggregate function is present") { + sql("drop table if exists maintabletime") + sql( + "create table maintabletime(year int,month int,name string,salary int,dob string) stored" + + " by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23'," + + "'sort_columns'='month,year,name')") + sql("insert into maintabletime select 10,11,'x',12,'2014-01-01 00:00:00'") + sql( + "create datamap agg0 on table maintabletime using 'preaggregate' as select name,sum(salary) from " + + "maintabletime group by name") + + sql("select var_samp(name) from maintabletime where name='Mikka' ") + } override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists lineitem") http://git-wip-us.apache.org/repos/asf/carbondata/blob/23bc051b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 5db4895..d629c2a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -263,7 +263,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case agg@Aggregate( grExp, aggExp, - CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: LogicalRelation))) + CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => @@ -295,7 +295,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule agg) Aggregate(updatedGroupExp, updatedAggExp, - newChild) + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, + Some(alias1), + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias2), newChild, None), + None)) } else { agg } @@ -306,7 +311,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case agg@Aggregate( grExp, aggExp, - child@CarbonSubqueryAlias(_, l: LogicalRelation)) + child@CarbonSubqueryAlias(alias, l: LogicalRelation)) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => @@ -338,7 +343,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule agg) Aggregate(updatedGroupExp, updatedAggExp, - newChild) + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias), newChild, None)) } else { agg } @@ -349,7 +355,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case agg@Aggregate( grExp, aggExp, - Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation))) + Filter(expression, child@CarbonSubqueryAlias(alias, l: LogicalRelation))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => @@ -362,17 +368,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule carbonTable, list, aggregateExpressions) + + if (isValidPlan) { + isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) + } + // getting the columns from filter expression - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) if (isValidPlan) { extractColumnFromExpression(expression, list, carbonTable, true) } - if(isValidPlan) { + if (isValidPlan) { val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, aggregateExpressions, carbonTable, agg) - if(null != aggDataMapSchema && null!= childPlan) { + if (null != aggDataMapSchema && null != childPlan) { val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = getUpdatedExpressions(grExp, @@ -387,7 +397,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Aggregate(updatedGroupExp, updatedAggExp, Filter(updatedFilterExpression.get, - newChild)) + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias), newChild, None))) } else { agg } @@ -395,33 +406,44 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule agg } case agg@Aggregate( - grExp, - aggExp, - Filter(expression, CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: LogicalRelation)))) + grExp, + aggExp, + Filter( + expression, + CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation)))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => val carbonTable = getCarbonTable(l) val list = scala.collection.mutable.HashSet.empty[QueryColumn] val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] - val isValidPlan = extractQueryColumnsFromAggExpression( + var isValidPlan = extractQueryColumnsFromAggExpression( grExp, aggExp, carbonTable, list, aggregateExpressions) - if(isValidPlan) { + + if (isValidPlan) { + isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) + } + + // getting the columns from filter expression + if (isValidPlan) { + extractColumnFromExpression(expression, list, carbonTable, true) + } + if (isValidPlan) { val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, aggregateExpressions, carbonTable, agg) - if(null != aggDataMapSchema && null!= childPlan) { + if (null != aggDataMapSchema && null != childPlan) { val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] - val (updatedGroupExp, updatedAggExp, newChild, None) = + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = getUpdatedExpressions(grExp, aggExp, child, - None, + Some(expression), aggDataMapSchema, attributes, childPlan, @@ -429,7 +451,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule agg) Aggregate(updatedGroupExp, updatedAggExp, - newChild) + Filter(updatedFilterExpression.get, + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, + Some(alias1), + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias2), newChild, None), + None))) } else { agg }
