Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154670368
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -328,9 +462,145 @@ case class CarbonPreAggregateQueryRules(sparkSession:
SparkSession) extends Rule
Aggregate(updatedGroupExp,
updatedAggExp,
newChild)
+ // case for aggregation query with order by
+ case Project(_, Sort(sortOrders, global, Aggregate(groupingExp,
+ aggregateExp,
+ subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(groupingExp,
+ aggregateExp,
+ subQuery,
+ None,
+ aggDataMapSchema,
+ attributes,
+ childPlan)
+ val (updatedProjectList, updatedSortOrder) =
transformPlanForOrderBy(updatedAggExp,
+ sortOrders,
+ aggDataMapSchema,
+ attributes)
+ Project(updatedProjectList,
+ Sort(updatedSortOrder, global, Aggregate(updatedGroupExp,
updatedAggExp, newChild)))
+ // case for handling aggregation query with filter and order by
+ case Project(_, Sort(sortOrders, global, Aggregate(groupingExp,
+ aggregateExp,
+ Filter(expression, subQuery@CarbonSubqueryAlias(_, l:
LogicalRelation)))))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
+ val (updatedGroupExp, updatedAggExp, newChild,
updatedFilterExpression) =
+ getUpdatedExpressions(groupingExp,
+ aggregateExp,
+ subQuery,
+ Some(expression),
+ aggDataMapSchema,
+ attributes,
+ childPlan)
+ val (updatedProjectList, updatedSortOrder) =
transformPlanForOrderBy(updatedAggExp,
+ sortOrders,
+ aggDataMapSchema,
+ attributes)
+ Project(updatedProjectList,
+ Sort(updatedSortOrder, global, Aggregate(updatedGroupExp,
updatedAggExp,
+ Filter(updatedFilterExpression.get, newChild))))
+ // case for handling aggregation with order by when only projection
column exits
+ case Sort(sortOrders, global, Aggregate(groupingExp,
+ aggregateExp,
+ subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
+ if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ .hasDataMapSchema =>
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(groupingExp,
+ aggregateExp,
+ subQuery,
+ None,
+ aggDataMapSchema,
+ attributes,
+ childPlan)
+ val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
+ sortOrders,
+ aggDataMapSchema,
+ attributes)
+ Sort(updatedSortOrder, global, Aggregate(updatedGroupExp,
updatedAggExp, newChild))
+ // case for handling aggregation with order by and filter when only
projection column exits
+ case Sort(sortOrders, global, Aggregate(groupingExp,
+ aggregateExp,
+ Filter(expression, subQuery@CarbonSubqueryAlias(_, l:
LogicalRelation))))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
+ val (updatedGroupExp, updatedAggExp, newChild,
updatedFilterExpression) =
+ getUpdatedExpressions(groupingExp,
+ aggregateExp,
+ subQuery,
+ Some(expression),
+ aggDataMapSchema,
+ attributes,
+ childPlan)
+ val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
+ sortOrders,
+ aggDataMapSchema,
+ attributes)
+ Sort(updatedSortOrder, global, Aggregate(updatedGroupExp,
updatedAggExp, newChild))
}
}
+ /**
+ * Below method will be used to updated the maintable plan for order by
query
+ * In case of order by we need to update project list and sort order
attributes.
+ *
+ * @param aggregateExp
+ * child table aggregate expression
+ * @param sortOrders
+ * sort order expression in maintable plan
+ * @param aggDataMapSchema
+ * child data map schema
+ * @param attributes
+ * child attributes
+ * @return updated project list and updated sort order
+ */
+ def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression],
+ sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema,
+ attributes: Seq[AttributeReference]): (Seq[NamedExpression],
Seq[SortOrder]) = {
+ val updatedProjectList = new ArrayBuffer[NamedExpression]()
+ // getting the updated project list from aggregate expression
+ aggregateExp.foreach{f => f.transform {
+ // for projection column
+ case alias@Alias(attr: AttributeReference, name) =>
+ updatedProjectList += AttributeReference(name, attr.dataType,
attr.nullable)(alias.exprId,
+ alias.qualifier,
+ alias.isGenerated)
+ alias
+ // for aggregaton column
+ case alias@Alias(attr: AggregateExpression, name) =>
+ updatedProjectList += AttributeReference(name, attr.dataType,
attr.nullable)(alias.exprId,
+ alias.qualifier,
+ alias.isGenerated)
+ alias
+ }
+ }
+ val updatedSortOrders = new ArrayBuffer[SortOrder]()
+ // getting the updated sort order
+ sortOrders.map {
+ order => order.child match {
--- End diff --
move up `order =>`
---