Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443569
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -98,238 +143,499 @@ case class
CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata
information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp),
child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation:
LogicalRelation)
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders,
carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation:
LogicalRelation)))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by
expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders,
carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like
aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) :
Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier,
Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]):
Option[Expression] = {
+ if (conditionExp.isDefined) {
+ val filterExp = conditionExp.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ childExp.get._2
+ } else {
+ attr
}
- carbonTable
- // case for handling aggregation with order by when only
projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ })
+ } else {
+ conditionExp
+ }
+ }
+
+ /**
+ * Below method will be used to validate and transform the main table
plan to child table plan
+ * rules for transforming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression
to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from
aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum),
sum(column with count)).
+ * Note: During aggregate table creation for average table will be
created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table
attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ * 5. timeseries function
+ * 5.1 validate parent table has timeseries datamap
+ * 5.2 timeseries function is valid function or not
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ // case for aggregation query
+ case agg@Aggregate(grExp,
+ aggExp,
+ child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
--- End diff --
Indentation is wrong
---