[ 
https://issues.apache.org/jira/browse/SPARK-13911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195821#comment-15195821
 ] 

Cheng Lian commented on SPARK-13911:
------------------------------------

Another problem related to the {{ResolveAggregateFunctions}} rule is that it 
invokes the analyzer recursively, which is pretty tricky to understand and 
maintain.  Here is a possible fix that both fixes the above issue and removes 
the recursive invocation.

Considering having condition and order by over aggregations are actually 
tightly coupled with aggregation during resolution, we probably shouldn't view 
them as separate constructs while resolving them.  One possible fix of this 
issue is to introduce a new unresolved logical plan node 
{{UnresolvedAggregate}}:

{code}
case class UnresolvedAggregate(
  child: LogicalPlan,
  groupingExpressions: Seq[Expression],
  aggregateExpressions: Seq[NamedExpression],
  havingCondition: Option[Expression] = None,
  order: Seq[SortOrder] = Nil
) extends UnaryLogicalPlan
{code}

The major difference between {{UnresolvedAggregate}} and {{Aggregate}} is that 
it also contains an optional having condition and a list of sort orders.  In 
other words, it's a filtered, ordered aggregate operator.  Then, we can have 
two simple rules to merge all adjacent {{Sort}} and {{Filter}} operators 
directly above an {{UnresolvedAggregate}}:

{code}
  object MergeHavingConditions extends Rule[LogicalPlan] {
    override def apply(tree: LogicalPlan): LogicalPlan = tree transformDown {
      case Filter(condition, (agg: UnresolvedAggregate)) =>
        // Combines all having conditions
        val combinedCondition = (agg.havingCondition.toSeq :+ 
condition).reduce(And)
        agg.copy(havingCondition = Some(combinedCondition))
    }
  }

  object MergeSortsOverAggregates extends Rule[LogicalPlan] {
    override def apply(tree: LogicalPlan): LogicalPlan = tree transformDown {
      case Sort(order, _, (agg: UnresolvedAggregate)) =>
        // Only preserves the last sort order
        agg.copy(order = order)
    }
  }
{code}

(Of course, we also need to make the Dataset API and the {{GlobalAggregates}} 
produce {{UnresolvedAggregate}} instead of {{Aggregate}}.)

At last, we only need to resolve {{UnresolvedAggregate}} into {{Aggregate}} 
with optional {{Filter}} and {{Sort}}, which is relatively straightforward.  
Now, we no long need to invoke the analyzer recursively in 
{{ResolveAggregateFunctions}} to resolve aggregate functions appearing in 
having and order by clauses, since they are already merged into 
{{UnresolvedAggregate}} and can be resolved all together with grouping 
expressions and aggregate expressions.

cc yhuai

> Having condition and order by cannot both have aggregate functions
> ------------------------------------------------------------------
>
>                 Key: SPARK-13911
>                 URL: https://issues.apache.org/jira/browse/SPARK-13911
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.1, 1.4.1, 1.5.2, 1.6.1, 2.0.0
>            Reporter: Cheng Lian
>
> Given the following temporary table:
> {code}
> sqlContext range 10 select ('id as 'a, 'id as 'b) registerTempTable "t"
> {code}
> The following SQL statement can't pass analysis:
> {noformat}
> scala> sqlContext sql "SELECT * FROM t GROUP BY a HAVING COUNT(b) > 0 ORDER 
> BY COUNT(b)" show ()
> org.apache.spark.sql.AnalysisException: expression '`t`.`b`' is neither 
> present in the group by, nor is it an aggregate function. Add to group by or 
> wrap in first() (or first_value) if you don't care which value you get.;
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:36)
>   at org.apache.spark.sql.Dataset$.newDataFrame(Dataset.scala:58)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:784)
>   ... 49 elided
> {noformat}
> The reason is that analysis rule {{ResolveAggregateFunctions}} only handles 
> the first {{Filter}} _or_ {{Sort}} directly above an {{Aggregate}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to