[CARBONDATA-2046]agg Query failed when non supported aggregate is present in 
Query

Root Cause :- isValidPlan variable was getting overwritten by 
CarbonReflectionUtils.hasPredicateSubquery(expression) .

Solution :- CarbonReflectionUtils.hasPredicateSubquery(expression) method 
should be called when isValidPlan is true to avoid overwritten.

This closes #1824


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/23bc051b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/23bc051b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/23bc051b

Branch: refs/heads/carbonstore
Commit: 23bc051b84c2d1f16147ba8f0df79bfc83c543f8
Parents: bef6af3
Author: BJangir <[email protected]>
Authored: Wed Jan 17 22:32:10 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Fri Jan 19 19:09:38 2018 +0530

----------------------------------------------------------------------
 .../TestPreAggregateTableSelection.scala        | 14 +++++
 .../sql/hive/CarbonPreAggregateRules.scala      | 64 ++++++++++++++------
 2 files changed, 60 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/23bc051b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 3fefbaf..17d95ef 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -285,6 +285,20 @@ test("test PreAggregate table selection with timeseries 
and normal together") {
     preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year")
 
   }
+
+  test("test table selection when unsupported aggregate function is present") {
+    sql("drop table if exists maintabletime")
+    sql(
+      "create table maintabletime(year int,month int,name string,salary 
int,dob string) stored" +
+      " by 'carbondata' 
tblproperties('sort_scope'='Global_sort','table_blocksize'='23'," +
+      "'sort_columns'='month,year,name')")
+    sql("insert into maintabletime select 10,11,'x',12,'2014-01-01 00:00:00'")
+    sql(
+      "create datamap agg0 on table maintabletime using 'preaggregate' as 
select name,sum(salary) from " +
+      "maintabletime group by name")
+
+    sql("select var_samp(name) from maintabletime  where name='Mikka' ")
+  }
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists lineitem")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/23bc051b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 5db4895..d629c2a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -263,7 +263,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
       case agg@Aggregate(
         grExp,
         aggExp,
-        CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: 
LogicalRelation)))
+        CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: 
LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
@@ -295,7 +295,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
                 agg)
             Aggregate(updatedGroupExp,
               updatedAggExp,
-              newChild)
+              CarbonReflectionUtils
+                .getSubqueryAlias(sparkSession,
+                  Some(alias1),
+                  CarbonReflectionUtils
+                    .getSubqueryAlias(sparkSession, Some(alias2), newChild, 
None),
+                  None))
           } else {
             agg
           }
@@ -306,7 +311,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
       case agg@Aggregate(
         grExp,
         aggExp,
-        child@CarbonSubqueryAlias(_, l: LogicalRelation))
+        child@CarbonSubqueryAlias(alias, l: LogicalRelation))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
@@ -338,7 +343,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
                 agg)
             Aggregate(updatedGroupExp,
               updatedAggExp,
-              newChild)
+              CarbonReflectionUtils
+                .getSubqueryAlias(sparkSession, Some(alias), newChild, None))
           } else {
             agg
           }
@@ -349,7 +355,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
       case agg@Aggregate(
         grExp,
         aggExp,
-        Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
+        Filter(expression, child@CarbonSubqueryAlias(alias, l: 
LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
@@ -362,17 +368,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
           carbonTable,
           list,
           aggregateExpressions)
+
+        if (isValidPlan) {
+          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
+        }
+
         // getting the columns from filter expression
-        isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
         if (isValidPlan) {
           extractColumnFromExpression(expression, list, carbonTable, true)
         }
-        if(isValidPlan) {
+        if (isValidPlan) {
           val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
             aggregateExpressions,
             carbonTable,
             agg)
-          if(null != aggDataMapSchema && null!= childPlan) {
+          if (null != aggDataMapSchema && null != childPlan) {
             val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
             val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
               getUpdatedExpressions(grExp,
@@ -387,7 +397,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
             Aggregate(updatedGroupExp,
               updatedAggExp,
               Filter(updatedFilterExpression.get,
-                newChild))
+                CarbonReflectionUtils
+                  .getSubqueryAlias(sparkSession, Some(alias), newChild, 
None)))
           } else {
             agg
           }
@@ -395,33 +406,44 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
           agg
         }
       case agg@Aggregate(
-      grExp,
-      aggExp,
-      Filter(expression, CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, 
l: LogicalRelation))))
+        grExp,
+        aggExp,
+        Filter(
+          expression,
+          CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: 
LogicalRelation))))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
         val carbonTable = getCarbonTable(l)
         val list = scala.collection.mutable.HashSet.empty[QueryColumn]
         val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
-        val isValidPlan = extractQueryColumnsFromAggExpression(
+        var isValidPlan = extractQueryColumnsFromAggExpression(
           grExp,
           aggExp,
           carbonTable,
           list,
           aggregateExpressions)
-        if(isValidPlan) {
+
+        if (isValidPlan) {
+          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
+        }
+
+        // getting the columns from filter expression
+        if (isValidPlan) {
+          extractColumnFromExpression(expression, list, carbonTable, true)
+        }
+        if (isValidPlan) {
           val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
             aggregateExpressions,
             carbonTable,
             agg)
-          if(null != aggDataMapSchema && null!= childPlan) {
+          if (null != aggDataMapSchema && null != childPlan) {
             val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
-            val (updatedGroupExp, updatedAggExp, newChild, None) =
+            val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
               getUpdatedExpressions(grExp,
                 aggExp,
                 child,
-                None,
+                Some(expression),
                 aggDataMapSchema,
                 attributes,
                 childPlan,
@@ -429,7 +451,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
                 agg)
             Aggregate(updatedGroupExp,
               updatedAggExp,
-              newChild)
+              Filter(updatedFilterExpression.get,
+                CarbonReflectionUtils
+                  .getSubqueryAlias(sparkSession,
+                    Some(alias1),
+                    CarbonReflectionUtils
+                      .getSubqueryAlias(sparkSession, Some(alias2), newChild, 
None),
+                    None)))
           } else {
             agg
           }

Reply via email to