Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443128
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -98,238 +143,499 @@ case class
CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata
information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp),
child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation:
LogicalRelation)
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders,
carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation:
LogicalRelation)))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by
expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders,
carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like
aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) :
Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier,
Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]):
Option[Expression] = {
--- End diff --
Here also you use updateExpression method, no need of this method
---