[CARBONDATA-2045][PreAggregate]Fixed Pre Aggregate failure when specific segment is set
Fixed issue : Query from segment set is not effective when pre-aggregate table is present This closes #1823 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/953efce4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/953efce4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/953efce4 Branch: refs/heads/carbonstore Commit: 953efce4a9516704d72571a9300e1d67b1ca1880 Parents: b4dc866 Author: kumarvishal <[email protected]> Authored: Wed Jan 17 17:57:21 2018 +0530 Committer: ravipesala <[email protected]> Committed: Sat Jan 20 00:22:47 2018 +0530 ---------------------------------------------------------------------- .../sql/hive/CarbonPreAggregateRules.scala | 295 ++++++++++--------- 1 file changed, 155 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/953efce4/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index d629c2a..79cbe05 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -268,39 +268,43 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => val carbonTable = getCarbonTable(l) - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] - val isValidPlan = extractQueryColumnsFromAggExpression( - grExp, - aggExp, - carbonTable, - list, - aggregateExpressions) - if(isValidPlan) { - val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, - aggregateExpressions, + if(isSpecificSegmentNotPresent(carbonTable)) { + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, carbonTable, - agg) - if(null != aggDataMapSchema && null!= childPlan) { - val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] - val (updatedGroupExp, updatedAggExp, newChild, None) = - getUpdatedExpressions(grExp, - aggExp, - child, - None, - aggDataMapSchema, - attributes, - childPlan, - carbonTable, - agg) - Aggregate(updatedGroupExp, - updatedAggExp, - CarbonReflectionUtils - .getSubqueryAlias(sparkSession, - Some(alias1), - CarbonReflectionUtils - .getSubqueryAlias(sparkSession, Some(alias2), newChild, None), - None)) + list, + aggregateExpressions) + if (isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, + carbonTable, + agg) + if (null != aggDataMapSchema && null != childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, + Some(alias1), + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias2), newChild, None), + None)) + } else { + agg + } } else { agg } @@ -316,35 +320,39 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => val carbonTable = getCarbonTable(l) - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] - val isValidPlan = extractQueryColumnsFromAggExpression( - grExp, - aggExp, - carbonTable, - list, - aggregateExpressions) - if(isValidPlan) { - val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, - aggregateExpressions, + if(isSpecificSegmentNotPresent(carbonTable)) { + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, carbonTable, - agg) - if(null != aggDataMapSchema && null!= childPlan) { - val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] - val (updatedGroupExp, updatedAggExp, newChild, None) = - getUpdatedExpressions(grExp, - aggExp, - child, - None, - aggDataMapSchema, - attributes, - childPlan, - carbonTable, - agg) - Aggregate(updatedGroupExp, - updatedAggExp, - CarbonReflectionUtils - .getSubqueryAlias(sparkSession, Some(alias), newChild, None)) + list, + aggregateExpressions) + if (isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, + carbonTable, + agg) + if (null != aggDataMapSchema && null != childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias), newChild, None)) + } else { + agg + } } else { agg } @@ -360,45 +368,47 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => val carbonTable = getCarbonTable(l) - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] - var isValidPlan = extractQueryColumnsFromAggExpression( - grExp, - aggExp, - carbonTable, - list, - aggregateExpressions) - - if (isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) - } - - // getting the columns from filter expression - if (isValidPlan) { - extractColumnFromExpression(expression, list, carbonTable, true) - } - if (isValidPlan) { - val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, - aggregateExpressions, + if(isSpecificSegmentNotPresent(carbonTable)) { + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + var isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, carbonTable, - agg) - if (null != aggDataMapSchema && null != childPlan) { - val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] - val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = - getUpdatedExpressions(grExp, - aggExp, - child, - Some(expression), - aggDataMapSchema, - attributes, - childPlan, - carbonTable, - agg) - Aggregate(updatedGroupExp, - updatedAggExp, - Filter(updatedFilterExpression.get, - CarbonReflectionUtils - .getSubqueryAlias(sparkSession, Some(alias), newChild, None))) + list, + aggregateExpressions) + if (isValidPlan) { + isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) + } + // getting the columns from filter expression + if (isValidPlan) { + extractColumnFromExpression(expression, list, carbonTable, true) + } + if (isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, + carbonTable, + agg) + if (null != aggDataMapSchema && null != childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(grExp, + aggExp, + child, + Some(expression), + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + Filter(updatedFilterExpression.get, + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias), newChild, None))) + } else { + agg + } } else { agg } @@ -415,49 +425,54 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. metaData.hasAggregateDataMapSchema => val carbonTable = getCarbonTable(l) - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] - var isValidPlan = extractQueryColumnsFromAggExpression( - grExp, - aggExp, - carbonTable, - list, - aggregateExpressions) + if(isSpecificSegmentNotPresent(carbonTable)) { + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + var isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) - if (isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) - } + if (isValidPlan) { + isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) + } - // getting the columns from filter expression - if (isValidPlan) { - extractColumnFromExpression(expression, list, carbonTable, true) - } - if (isValidPlan) { - val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, - aggregateExpressions, - carbonTable, - agg) - if (null != aggDataMapSchema && null != childPlan) { - val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] - val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = - getUpdatedExpressions(grExp, - aggExp, - child, - Some(expression), - aggDataMapSchema, - attributes, - childPlan, - carbonTable, - agg) - Aggregate(updatedGroupExp, - updatedAggExp, - Filter(updatedFilterExpression.get, - CarbonReflectionUtils - .getSubqueryAlias(sparkSession, - Some(alias1), - CarbonReflectionUtils - .getSubqueryAlias(sparkSession, Some(alias2), newChild, None), - None))) + // getting the columns from filter expression + if (isValidPlan) { + extractColumnFromExpression(expression, list, carbonTable, true) + } + + if (isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, + carbonTable, + agg) + if (null != aggDataMapSchema && null != childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(grExp, + aggExp, + child, + Some(expression), + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + Filter(updatedFilterExpression.get, + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, + Some(alias1), + CarbonReflectionUtils + .getSubqueryAlias(sparkSession, Some(alias2), newChild, None), + None))) + } else { + agg + } } else { agg } @@ -652,7 +667,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * @param carbonTable parent table * @return is specific segment is present in session params */ - def isSpecificSegmentPresent(carbonTable: CarbonTable) : Boolean = { + def isSpecificSegmentNotPresent(carbonTable: CarbonTable) : Boolean = { val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { carbonSessionInfo.getSessionParams
