[CARBONDATA-2045][PreAggregate]Fixed Pre Aggregate failure when specific 
segment is set

Fixed issue : Query from segment set is not effective when pre-aggregate table 
is present

This closes #1823


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/953efce4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/953efce4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/953efce4

Branch: refs/heads/carbonstore
Commit: 953efce4a9516704d72571a9300e1d67b1ca1880
Parents: b4dc866
Author: kumarvishal <[email protected]>
Authored: Wed Jan 17 17:57:21 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Sat Jan 20 00:22:47 2018 +0530

----------------------------------------------------------------------
 .../sql/hive/CarbonPreAggregateRules.scala      | 295 ++++++++++---------
 1 file changed, 155 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/953efce4/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index d629c2a..79cbe05 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -268,39 +268,43 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
         val carbonTable = getCarbonTable(l)
-        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
-        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
-        val isValidPlan = extractQueryColumnsFromAggExpression(
-          grExp,
-          aggExp,
-          carbonTable,
-          list,
-          aggregateExpressions)
-        if(isValidPlan) {
-          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
-            aggregateExpressions,
+        if(isSpecificSegmentNotPresent(carbonTable)) {
+          val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+          val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
+          val isValidPlan = extractQueryColumnsFromAggExpression(
+            grExp,
+            aggExp,
             carbonTable,
-            agg)
-          if(null != aggDataMapSchema && null!= childPlan) {
-            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
-            val (updatedGroupExp, updatedAggExp, newChild, None) =
-              getUpdatedExpressions(grExp,
-                aggExp,
-                child,
-                None,
-                aggDataMapSchema,
-                attributes,
-                childPlan,
-                carbonTable,
-                agg)
-            Aggregate(updatedGroupExp,
-              updatedAggExp,
-              CarbonReflectionUtils
-                .getSubqueryAlias(sparkSession,
-                  Some(alias1),
-                  CarbonReflectionUtils
-                    .getSubqueryAlias(sparkSession, Some(alias2), newChild, 
None),
-                  None))
+            list,
+            aggregateExpressions)
+          if (isValidPlan) {
+            val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
+              aggregateExpressions,
+              carbonTable,
+              agg)
+            if (null != aggDataMapSchema && null != childPlan) {
+              val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
+              val (updatedGroupExp, updatedAggExp, newChild, None) =
+                getUpdatedExpressions(grExp,
+                  aggExp,
+                  child,
+                  None,
+                  aggDataMapSchema,
+                  attributes,
+                  childPlan,
+                  carbonTable,
+                  agg)
+              Aggregate(updatedGroupExp,
+                updatedAggExp,
+                CarbonReflectionUtils
+                  .getSubqueryAlias(sparkSession,
+                    Some(alias1),
+                    CarbonReflectionUtils
+                      .getSubqueryAlias(sparkSession, Some(alias2), newChild, 
None),
+                    None))
+            } else {
+              agg
+            }
           } else {
             agg
           }
@@ -316,35 +320,39 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
         val carbonTable = getCarbonTable(l)
-        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
-        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
-        val isValidPlan = extractQueryColumnsFromAggExpression(
-          grExp,
-          aggExp,
-          carbonTable,
-          list,
-          aggregateExpressions)
-        if(isValidPlan) {
-          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
-            aggregateExpressions,
+        if(isSpecificSegmentNotPresent(carbonTable)) {
+          val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+          val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
+          val isValidPlan = extractQueryColumnsFromAggExpression(
+            grExp,
+            aggExp,
             carbonTable,
-            agg)
-          if(null != aggDataMapSchema && null!= childPlan) {
-            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
-            val (updatedGroupExp, updatedAggExp, newChild, None) =
-              getUpdatedExpressions(grExp,
-                aggExp,
-                child,
-                None,
-                aggDataMapSchema,
-                attributes,
-                childPlan,
-                carbonTable,
-                agg)
-            Aggregate(updatedGroupExp,
-              updatedAggExp,
-              CarbonReflectionUtils
-                .getSubqueryAlias(sparkSession, Some(alias), newChild, None))
+            list,
+            aggregateExpressions)
+          if (isValidPlan) {
+            val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
+              aggregateExpressions,
+              carbonTable,
+              agg)
+            if (null != aggDataMapSchema && null != childPlan) {
+              val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
+              val (updatedGroupExp, updatedAggExp, newChild, None) =
+                getUpdatedExpressions(grExp,
+                  aggExp,
+                  child,
+                  None,
+                  aggDataMapSchema,
+                  attributes,
+                  childPlan,
+                  carbonTable,
+                  agg)
+              Aggregate(updatedGroupExp,
+                updatedAggExp,
+                CarbonReflectionUtils
+                  .getSubqueryAlias(sparkSession, Some(alias), newChild, None))
+            } else {
+              agg
+            }
           } else {
             agg
           }
@@ -360,45 +368,47 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
         val carbonTable = getCarbonTable(l)
-        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
-        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
-        var isValidPlan = extractQueryColumnsFromAggExpression(
-          grExp,
-          aggExp,
-          carbonTable,
-          list,
-          aggregateExpressions)
-
-        if (isValidPlan) {
-          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
-        }
-
-        // getting the columns from filter expression
-        if (isValidPlan) {
-          extractColumnFromExpression(expression, list, carbonTable, true)
-        }
-        if (isValidPlan) {
-          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
-            aggregateExpressions,
+        if(isSpecificSegmentNotPresent(carbonTable)) {
+          val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+          val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
+          var isValidPlan = extractQueryColumnsFromAggExpression(
+            grExp,
+            aggExp,
             carbonTable,
-            agg)
-          if (null != aggDataMapSchema && null != childPlan) {
-            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
-            val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
-              getUpdatedExpressions(grExp,
-                aggExp,
-                child,
-                Some(expression),
-                aggDataMapSchema,
-                attributes,
-                childPlan,
-                carbonTable,
-                agg)
-            Aggregate(updatedGroupExp,
-              updatedAggExp,
-              Filter(updatedFilterExpression.get,
-                CarbonReflectionUtils
-                  .getSubqueryAlias(sparkSession, Some(alias), newChild, 
None)))
+            list,
+            aggregateExpressions)
+          if (isValidPlan) {
+            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(expression)
+          }
+          // getting the columns from filter expression
+          if (isValidPlan) {
+            extractColumnFromExpression(expression, list, carbonTable, true)
+          }
+          if (isValidPlan) {
+            val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
+              aggregateExpressions,
+              carbonTable,
+              agg)
+            if (null != aggDataMapSchema && null != childPlan) {
+              val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
+              val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
+                getUpdatedExpressions(grExp,
+                  aggExp,
+                  child,
+                  Some(expression),
+                  aggDataMapSchema,
+                  attributes,
+                  childPlan,
+                  carbonTable,
+                  agg)
+              Aggregate(updatedGroupExp,
+                updatedAggExp,
+                Filter(updatedFilterExpression.get,
+                  CarbonReflectionUtils
+                    .getSubqueryAlias(sparkSession, Some(alias), newChild, 
None)))
+            } else {
+              agg
+            }
           } else {
             agg
           }
@@ -415,49 +425,54 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
            
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
              metaData.hasAggregateDataMapSchema =>
         val carbonTable = getCarbonTable(l)
-        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
-        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
-        var isValidPlan = extractQueryColumnsFromAggExpression(
-          grExp,
-          aggExp,
-          carbonTable,
-          list,
-          aggregateExpressions)
+        if(isSpecificSegmentNotPresent(carbonTable)) {
+          val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+          val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
+          var isValidPlan = extractQueryColumnsFromAggExpression(
+            grExp,
+            aggExp,
+            carbonTable,
+            list,
+            aggregateExpressions)
 
-        if (isValidPlan) {
-          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
-        }
+          if (isValidPlan) {
+            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(expression)
+          }
 
-        // getting the columns from filter expression
-        if (isValidPlan) {
-          extractColumnFromExpression(expression, list, carbonTable, true)
-        }
-        if (isValidPlan) {
-          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
-            aggregateExpressions,
-            carbonTable,
-            agg)
-          if (null != aggDataMapSchema && null != childPlan) {
-            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
-            val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
-              getUpdatedExpressions(grExp,
-                aggExp,
-                child,
-                Some(expression),
-                aggDataMapSchema,
-                attributes,
-                childPlan,
-                carbonTable,
-                agg)
-            Aggregate(updatedGroupExp,
-              updatedAggExp,
-              Filter(updatedFilterExpression.get,
-                CarbonReflectionUtils
-                  .getSubqueryAlias(sparkSession,
-                    Some(alias1),
-                    CarbonReflectionUtils
-                      .getSubqueryAlias(sparkSession, Some(alias2), newChild, 
None),
-                    None)))
+          // getting the columns from filter expression
+          if (isValidPlan) {
+            extractColumnFromExpression(expression, list, carbonTable, true)
+          }
+
+          if (isValidPlan) {
+            val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
+              aggregateExpressions,
+              carbonTable,
+              agg)
+            if (null != aggDataMapSchema && null != childPlan) {
+              val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
+              val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
+                getUpdatedExpressions(grExp,
+                  aggExp,
+                  child,
+                  Some(expression),
+                  aggDataMapSchema,
+                  attributes,
+                  childPlan,
+                  carbonTable,
+                  agg)
+              Aggregate(updatedGroupExp,
+                updatedAggExp,
+                Filter(updatedFilterExpression.get,
+                  CarbonReflectionUtils
+                    .getSubqueryAlias(sparkSession,
+                      Some(alias1),
+                      CarbonReflectionUtils
+                        .getSubqueryAlias(sparkSession, Some(alias2), 
newChild, None),
+                      None)))
+            } else {
+              agg
+            }
           } else {
             agg
           }
@@ -652,7 +667,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
    * @param carbonTable parent table
    * @return is specific segment is present in session params
    */
-  def isSpecificSegmentPresent(carbonTable: CarbonTable) : Boolean = {
+  def isSpecificSegmentNotPresent(carbonTable: CarbonTable) : Boolean = {
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
       carbonSessionInfo.getSessionParams

Reply via email to