Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153724030
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession:
SparkSession) extends Rule
}
}
+object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+
+ plan transform {
+
+ case aggregate@Aggregate(_, aExp, _) if
validateAggregateExpressions(aExp) =>
+ val newExpressions = aExp.flatMap {
+ case alias@Alias(attrExpression: AggregateExpression, _) =>
+ attrExpression.aggregateFunction match {
+ case Average(attr: AttributeReference) =>
+ Seq(Alias(attrExpression
+ .copy(aggregateFunction = Sum(attr),
+ resultId = NamedExpression.newExprId), attr.name +
"_sum")(),
+ Alias(attrExpression
+ .copy(aggregateFunction = Count(attr),
+ resultId = NamedExpression.newExprId), attr.name +
"_count")())
+ case Average(cast@Cast(attr: AttributeReference, _)) =>
+ Seq(Alias(attrExpression
+ .copy(aggregateFunction = Sum(cast),
+ resultId = NamedExpression.newExprId),
+ attr.name + "_sum")(),
+ Alias(attrExpression
+ .copy(aggregateFunction = Count(cast),
+ resultId = NamedExpression.newExprId), attr.name +
"_count")())
+ case _ => Seq(alias)
+ }
+ case namedExpr: NamedExpression => Seq(namedExpr)
+ }
+ aggregate.copy(aggregateExpressions =
newExpressions.asInstanceOf[Seq[NamedExpression]])
+ case plan: LogicalPlan => plan
+ }
+ }
+
+ /**
+ * Called by PreAggregateLoadingRules to validate if plan is valid for
applying rules or not.
+ * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg
i.e Query UDF then it is
+ * valid.
+ *
+ * @param namedExpression
+ * @return
+ */
+ private def validateAggregateExpressions(namedExpression:
Seq[NamedExpression]): Boolean = {
+ val filteredExpressions =
namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
+ filteredExpressions
+ .exists {
+ expr => !expr.name.equalsIgnoreCase("PreAgg") &&
+ expr.name.equalsIgnoreCase("preAggLoad")
--- End diff --
format properly
---