[ 
https://issues.apache.org/jira/browse/SPARK-8641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell updated SPARK-8641:
-------------------------------------
    Description: 
*Rationale*
The window operator currently uses Hive UDAFs for all aggregation operations. 
This is fine in terms of performance and functionality. However they limit 
extensibility, and they are quite opaque in terms of processing and memory 
usage. The later blocks advanced optimizations such as code generation and 
tungsten style (advanced) memory management.

*Requirements*
We want to adress this by replacing the Hive UDAFs with native Spark SQL UDAFs. 
A redesign of the Spark UDAFs is currently underway, see SPARK-4366. The new 
window UDAFs should use this new standard, in order to make them as future 
proof as possible. Although we are replacing the standard Hive UDAFs, other 
existing Hive UDAFs should still be supported.

The new window UDAFs should, at least, cover all existing Hive standard window 
UDAFs:
# FIRST_VALUE
# LAST_VALUE
# LEAD
# LAG
# ROW_NUMBER
# RANK
# DENSE_RANK
# PERCENT_RANK
# NTILE
# CUME_DIST

All these function imply a row order; this means that in order to use these 
functions properly an
ORDER BY clause must be defined.

The first and last value UDAFs are already present in Spark SQL. The only thing 
which needs to be added is skip NULL functionality.

LEAD and LAG are not aggregates. These expressions return the value of an 
expression a number of rows before (LAG) or ahead (LEAD) of the current row. 
These expression put a constraint on the Window frame in which they are 
executed: this can only be a Row frame with equal offsets.

The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW).

RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent on 
the actual value of values in the ORDER BY clause. The ORDER BY expression(s) 
must be made available before these functions are evaluated. All these 
functions will have a fixed frame, but this will be dependent on the 
implementation (probably a running row frame).

PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the 
partition being evaluated. The partition size must either be made available 
during evaluation (this is perfectly feasible in the current implementation) or 
the expression must be divided over two window and a merging expression, for 
instance PERCENT_RANK() would look like this:
{noformat}
(RANK() OVER (PARTITION BY x ORDER BY y) - 1) / (COUNT(*) OVER (PARTITION BY x) 
- 1)
{noformat}

*Design*
The old WindowFunction interface will be replaced by the following 
(initial/very early) design (including sub-classes):
{noformat}
/**
 * A window function is a function that can only be evaluated in the context of 
a window operator.
 */
trait WindowFunction {
  self: Expression =>

  /**
   * Define the frame in which the window operator must be executed.
   */
  def frame: WindowFrame = UnspecifiedFrame
}

/**
 * Base class for LEAD/LAG offset window functions.
 *
 * These are ordinary expressions, the idea is that the Window operator will 
process these in a
 * separate (specialized) window frame.
 */
abstract class OffsetWindowFunction(val child: Expression, val offset: Int, val 
default: Expression) {
  override def deterministic: Boolean = false
  ...
}

case class Lead(child: Expression, offset: Int, default: Expression) extends 
OffsetWindowFunction(child, offset, default) {
  override val frame = SpecifiedWindowFrame(RowFrame, ValuePreceding(offset), 
ValuePreceding(offset))

  ...
}

case class Lag(child: Expression, offset: Int, default: Expression) extends 
OffsetWindowFunction(child, offset, default) {
  override val frame = SpecifiedWindowFrame(RowFrame, ValueFollowing(offset), 
ValueFollowing(offset))

  ...
}

case class RowNumber() extends AlgebraicAggregate with WindowFunction {
  override def deterministic: Boolean = false
  override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)
  ...
}

abstact class RankLike(val order: Seq[Expression] = Nil) extends 
AlgebraicAggregate with WindowFunction {
  override def deterministic: Boolean = true

  // This can be injected by either the Planner or the Window operator.
  def withOrderSpec(orderSpec: Seq[Expression]): AggregateWindowFuntion

  // This will be injected by the Window operator.
  // Only needed by: PERCENT_RANK(), NTILE(..) & CUME_DIST(). Maybe put this in 
a subclass.
  def withPartitionSize(size: MutableLiteral): AggregateWindowFuntion

  // We can do this as long as partition size is available before execution...
  override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)
  ...
}

case class Rank(order: Seq[Expression] = Nil) extends RankLike(order) {
  ...
}

case class DenseRank(order: Seq[Expression] = Nil) extends RankLike(order) {
  ...
}

case class PercentRank(order: Seq[Expression] = Nil) extends RankLike(order) {
  ...
}

case class NTile(order: Seq[Expression] = Nil, buckets: Int) extends 
RankLike(order) {
  override def deterministic: Boolean = false
  ...
}

case class CumeDist(order: Seq[Expression] = Nil) extends RankLike(order) {
  ...
}
{noformat}

This change will have impact on quite a few other classes as well:
* org.apache.spark.sql.catalyst.expressions.WindowExpression
* org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions)
* org.apache.spark.sql.execution.Window (Add another window frame processor, 
Add support for new UDAFs)
* org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff)
* org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction)
* org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this)
* org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this)

*Unknowns & Challenges*
There are still a few unknowns and challengers, mainly because the work on 
SPARK-4366 is still in full swing:
* How will we retain Hive UDAF functionality?
* What will a WindowExpression containing an AggregateFunction look like? Will 
there be an intermediate AggregateExpression2? Or is this only present when 
distinct values and/or a non-Complete processing mode is requested?
* The new implementation moves the responsibility of distinct processing to the 
operator. It also
adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate 
(it is assumed that the current AggregateFunction doesn't require a third). Are 
there posibilities of code reuse? Or
do we have to implement everything from scratch?

  was:
The current Window implementation uses Hive UDAFs for all aggregation 
operations. In this ticket we will move to this functionality to Native Spark 
Expressions. The rationale for this is that although Hive UDAFs are very well 
written, they remain opaque in processing and memory management; this makes 
them hard to optimize.

This ticket and its PR will build on the work being done in SPARK-4366.


> Native Spark Window Functions
> -----------------------------
>
>                 Key: SPARK-8641
>                 URL: https://issues.apache.org/jira/browse/SPARK-8641
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Herman van Hovell
>
> *Rationale*
> The window operator currently uses Hive UDAFs for all aggregation operations. 
> This is fine in terms of performance and functionality. However they limit 
> extensibility, and they are quite opaque in terms of processing and memory 
> usage. The later blocks advanced optimizations such as code generation and 
> tungsten style (advanced) memory management.
> *Requirements*
> We want to adress this by replacing the Hive UDAFs with native Spark SQL 
> UDAFs. A redesign of the Spark UDAFs is currently underway, see SPARK-4366. 
> The new window UDAFs should use this new standard, in order to make them as 
> future proof as possible. Although we are replacing the standard Hive UDAFs, 
> other existing Hive UDAFs should still be supported.
> The new window UDAFs should, at least, cover all existing Hive standard 
> window UDAFs:
> # FIRST_VALUE
> # LAST_VALUE
> # LEAD
> # LAG
> # ROW_NUMBER
> # RANK
> # DENSE_RANK
> # PERCENT_RANK
> # NTILE
> # CUME_DIST
> All these function imply a row order; this means that in order to use these 
> functions properly an
> ORDER BY clause must be defined.
> The first and last value UDAFs are already present in Spark SQL. The only 
> thing which needs to be added is skip NULL functionality.
> LEAD and LAG are not aggregates. These expressions return the value of an 
> expression a number of rows before (LAG) or ahead (LEAD) of the current row. 
> These expression put a constraint on the Window frame in which they are 
> executed: this can only be a Row frame with equal offsets.
> The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS 
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW).
> RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent 
> on the actual value of values in the ORDER BY clause. The ORDER BY 
> expression(s) must be made available before these functions are evaluated. 
> All these functions will have a fixed frame, but this will be dependent on 
> the implementation (probably a running row frame).
> PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the 
> partition being evaluated. The partition size must either be made available 
> during evaluation (this is perfectly feasible in the current implementation) 
> or the expression must be divided over two window and a merging expression, 
> for instance PERCENT_RANK() would look like this:
> {noformat}
> (RANK() OVER (PARTITION BY x ORDER BY y) - 1) / (COUNT(*) OVER (PARTITION BY 
> x) - 1)
> {noformat}
> *Design*
> The old WindowFunction interface will be replaced by the following 
> (initial/very early) design (including sub-classes):
> {noformat}
> /**
>  * A window function is a function that can only be evaluated in the context 
> of a window operator.
>  */
> trait WindowFunction {
>   self: Expression =>
>   /**
>    * Define the frame in which the window operator must be executed.
>    */
>   def frame: WindowFrame = UnspecifiedFrame
> }
> /**
>  * Base class for LEAD/LAG offset window functions.
>  *
>  * These are ordinary expressions, the idea is that the Window operator will 
> process these in a
>  * separate (specialized) window frame.
>  */
> abstract class OffsetWindowFunction(val child: Expression, val offset: Int, 
> val default: Expression) {
>   override def deterministic: Boolean = false
>   ...
> }
> case class Lead(child: Expression, offset: Int, default: Expression) extends 
> OffsetWindowFunction(child, offset, default) {
>   override val frame = SpecifiedWindowFrame(RowFrame, ValuePreceding(offset), 
> ValuePreceding(offset))
>   ...
> }
> case class Lag(child: Expression, offset: Int, default: Expression) extends 
> OffsetWindowFunction(child, offset, default) {
>   override val frame = SpecifiedWindowFrame(RowFrame, ValueFollowing(offset), 
> ValueFollowing(offset))
>   ...
> }
> case class RowNumber() extends AlgebraicAggregate with WindowFunction {
>   override def deterministic: Boolean = false
>   override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
> CurrentRow)
>   ...
> }
> abstact class RankLike(val order: Seq[Expression] = Nil) extends 
> AlgebraicAggregate with WindowFunction {
>   override def deterministic: Boolean = true
>   // This can be injected by either the Planner or the Window operator.
>   def withOrderSpec(orderSpec: Seq[Expression]): AggregateWindowFuntion
>   // This will be injected by the Window operator.
>   // Only needed by: PERCENT_RANK(), NTILE(..) & CUME_DIST(). Maybe put this 
> in a subclass.
>   def withPartitionSize(size: MutableLiteral): AggregateWindowFuntion
>   // We can do this as long as partition size is available before execution...
>   override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
> CurrentRow)
>   ...
> }
> case class Rank(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> case class DenseRank(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> case class PercentRank(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> case class NTile(order: Seq[Expression] = Nil, buckets: Int) extends 
> RankLike(order) {
>   override def deterministic: Boolean = false
>   ...
> }
> case class CumeDist(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> {noformat}
> This change will have impact on quite a few other classes as well:
> * org.apache.spark.sql.catalyst.expressions.WindowExpression
> * org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions)
> * org.apache.spark.sql.execution.Window (Add another window frame processor, 
> Add support for new UDAFs)
> * org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff)
> * org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction)
> * org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this)
> * org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this)
> *Unknowns & Challenges*
> There are still a few unknowns and challengers, mainly because the work on 
> SPARK-4366 is still in full swing:
> * How will we retain Hive UDAF functionality?
> * What will a WindowExpression containing an AggregateFunction look like? 
> Will there be an intermediate AggregateExpression2? Or is this only present 
> when distinct values and/or a non-Complete processing mode is requested?
> * The new implementation moves the responsibility of distinct processing to 
> the operator. It also
> adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate 
> (it is assumed that the current AggregateFunction doesn't require a third). 
> Are there posibilities of code reuse? Or
> do we have to implement everything from scratch?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to