Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153995458
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
* @return
*/
private def transformAggregatePlan(logicalPlan: LogicalPlan):
LogicalPlan = {
+ val validExpressionsMap =
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
logicalPlan transform {
case aggregate@Aggregate(_, aExp, _) =>
- val newExpressions = aExp.flatMap {
- case alias@Alias(attrExpression: AggregateExpression, _) =>
- attrExpression.aggregateFunction match {
- case Average(attr: AttributeReference) =>
- Seq(Alias(attrExpression
- .copy(aggregateFunction = Sum(attr),
- resultId = NamedExpression.newExprId), attr.name +
"_sum")(),
- Alias(attrExpression
- .copy(aggregateFunction = Count(attr),
- resultId = NamedExpression.newExprId), attr.name +
"_count")())
- case Average(cast@MatchCast(attr: AttributeReference, _)) =>
- Seq(Alias(attrExpression
- .copy(aggregateFunction = Sum(cast),
- resultId = NamedExpression.newExprId),
- attr.name + "_sum")(),
- Alias(attrExpression
- .copy(aggregateFunction = Count(cast),
- resultId = NamedExpression.newExprId), attr.name +
"_count")())
- case _ => Seq(alias)
- }
- case namedExpr: NamedExpression => Seq(namedExpr)
+ aExp.foreach {
+ case alias: Alias =>
+ validExpressionsMap ++=
validateAggregateFunctionAndGetAlias(alias)
+ case namedExpr: NamedExpression =>
validExpressionsMap.put(namedExpr.name, namedExpr)
}
- aggregate.copy(aggregateExpressions =
newExpressions.asInstanceOf[Seq[NamedExpression]])
+ aggregate
+ .copy(aggregateExpressions = validExpressionsMap.values.toSeq)
--- End diff --
ok
---