Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153724030
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
    @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
       }
     }
     
    +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +
    +    plan transform {
    +
    +      case aggregate@Aggregate(_, aExp, _) if 
validateAggregateExpressions(aExp) =>
    +        val newExpressions = aExp.flatMap {
    +          case alias@Alias(attrExpression: AggregateExpression, _) =>
    +            attrExpression.aggregateFunction match {
    +              case Average(attr: AttributeReference) =>
    +                Seq(Alias(attrExpression
    +                  .copy(aggregateFunction = Sum(attr),
    +                    resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
    +                  Alias(attrExpression
    +                    .copy(aggregateFunction = Count(attr),
    +                      resultId = NamedExpression.newExprId), attr.name + 
"_count")())
    +              case Average(cast@Cast(attr: AttributeReference, _)) =>
    +                Seq(Alias(attrExpression
    +                  .copy(aggregateFunction = Sum(cast),
    +                    resultId = NamedExpression.newExprId),
    +                  attr.name + "_sum")(),
    +                  Alias(attrExpression
    +                    .copy(aggregateFunction = Count(cast),
    +                      resultId = NamedExpression.newExprId), attr.name + 
"_count")())
    +              case _ => Seq(alias)
    +            }
    +          case namedExpr: NamedExpression => Seq(namedExpr)
    +        }
    +        aggregate.copy(aggregateExpressions = 
newExpressions.asInstanceOf[Seq[NamedExpression]])
    +      case plan: LogicalPlan => plan
    +    }
    +  }
    +
    +  /**
    +   * Called by PreAggregateLoadingRules to validate if plan is valid for 
applying rules or not.
    +   * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg 
i.e Query UDF then it is
    +   * valid.
    +   *
    +   * @param namedExpression
    +   * @return
    +   */
    +  private def validateAggregateExpressions(namedExpression: 
Seq[NamedExpression]): Boolean = {
    +    val filteredExpressions = 
namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
    +    filteredExpressions
    +      .exists {
    +        expr => !expr.name.equalsIgnoreCase("PreAgg") &&
    +                        expr.name.equalsIgnoreCase("preAggLoad")
    --- End diff --
    
    format properly


---

Reply via email to