[
https://issues.apache.org/jira/browse/SPARK-8641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012000#comment-15012000
]
Apache Spark commented on SPARK-8641:
-------------------------------------
User 'hvanhovell' has created a pull request for this issue:
https://github.com/apache/spark/pull/9819
> Native Spark Window Functions
> -----------------------------
>
> Key: SPARK-8641
> URL: https://issues.apache.org/jira/browse/SPARK-8641
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 1.5.0
> Reporter: Herman van Hovell
>
> *Rationale*
> The window operator currently uses Hive UDAFs for all aggregation operations.
> This is fine in terms of performance and functionality. However they limit
> extensibility, and they are quite opaque in terms of processing and memory
> usage. The later blocks advanced optimizations such as code generation and
> tungsten style (advanced) memory management.
> *Requirements*
> We want to adress this by replacing the Hive UDAFs with native Spark SQL
> UDAFs. A redesign of the Spark UDAFs is currently underway, see SPARK-4366.
> The new window UDAFs should use this new standard, in order to make them as
> future proof as possible. Although we are replacing the standard Hive UDAFs,
> other existing Hive UDAFs should still be supported.
> The new window UDAFs should, at least, cover all existing Hive standard
> window UDAFs:
> # FIRST_VALUE
> # LAST_VALUE
> # LEAD
> # LAG
> # ROW_NUMBER
> # RANK
> # DENSE_RANK
> # PERCENT_RANK
> # NTILE
> # CUME_DIST
> All these function imply a row order; this means that in order to use these
> functions properly an
> ORDER BY clause must be defined.
> The first and last value UDAFs are already present in Spark SQL. The only
> thing which needs to be added is skip NULL functionality.
> LEAD and LAG are not aggregates. These expressions return the value of an
> expression a number of rows before (LAG) or ahead (LEAD) of the current row.
> These expression put a constraint on the Window frame in which they are
> executed: this can only be a Row frame with equal offsets.
> The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW).
> RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent
> on the actual value of values in the ORDER BY clause. The ORDER BY
> expression(s) must be made available before these functions are evaluated.
> All these functions will have a fixed frame, but this will be dependent on
> the implementation (probably a running row frame).
> PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the
> partition being evaluated. The partition size must either be made available
> during evaluation (this is perfectly feasible in the current implementation)
> or the expression must be divided over two window and a merging expression,
> for instance PERCENT_RANK() would look like this:
> {noformat}
> (RANK() OVER (PARTITION BY x ORDER BY y) - 1) / (COUNT(*) OVER (PARTITION BY
> x) - 1)
> {noformat}
> *Design*
> The old WindowFunction interface will be replaced by the following
> (initial/very early) design (including sub-classes):
> {noformat}
> /**
> * A window function is a function that can only be evaluated in the context
> of a window operator.
> */
> trait WindowFunction {
> self: Expression =>
> /**
> * Define the frame in which the window operator must be executed.
> */
> def frame: WindowFrame = UnspecifiedFrame
> }
> /**
> * Base class for LEAD/LAG offset window functions.
> *
> * These are ordinary expressions, the idea is that the Window operator will
> process these in a
> * separate (specialized) window frame.
> */
> abstract class OffsetWindowFunction(val child: Expression, val offset: Int,
> val default: Expression) {
> override def deterministic: Boolean = false
> ...
> }
> case class Lead(child: Expression, offset: Int, default: Expression) extends
> OffsetWindowFunction(child, offset, default) {
> override val frame = SpecifiedWindowFrame(RowFrame, ValuePreceding(offset),
> ValuePreceding(offset))
> ...
> }
> case class Lag(child: Expression, offset: Int, default: Expression) extends
> OffsetWindowFunction(child, offset, default) {
> override val frame = SpecifiedWindowFrame(RowFrame, ValueFollowing(offset),
> ValueFollowing(offset))
> ...
> }
> case class RowNumber() extends AlgebraicAggregate with WindowFunction {
> override def deterministic: Boolean = false
> override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
> CurrentRow)
> ...
> }
> abstact class RankLike(val order: Seq[Expression] = Nil) extends
> AlgebraicAggregate with WindowFunction {
> override def deterministic: Boolean = true
> // This can be injected by either the Planner or the Window operator.
> def withOrderSpec(orderSpec: Seq[Expression]): AggregateWindowFuntion
> // This will be injected by the Window operator.
> // Only needed by: PERCENT_RANK(), NTILE(..) & CUME_DIST(). Maybe put this
> in a subclass.
> def withPartitionSize(size: MutableLiteral): AggregateWindowFuntion
> // We can do this as long as partition size is available before execution...
> override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
> CurrentRow)
> ...
> }
> case class Rank(order: Seq[Expression] = Nil) extends RankLike(order) {
> ...
> }
> case class DenseRank(order: Seq[Expression] = Nil) extends RankLike(order) {
> ...
> }
> case class PercentRank(order: Seq[Expression] = Nil) extends RankLike(order) {
> ...
> }
> case class NTile(order: Seq[Expression] = Nil, buckets: Int) extends
> RankLike(order) {
> override def deterministic: Boolean = false
> ...
> }
> case class CumeDist(order: Seq[Expression] = Nil) extends RankLike(order) {
> ...
> }
> {noformat}
> This change will have impact on quite a few other classes as well:
> * org.apache.spark.sql.catalyst.expressions.WindowExpression
> * org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions)
> * org.apache.spark.sql.execution.Window (Add another window frame processor,
> Add support for new UDAFs)
> * org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff)
> * org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction)
> * org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this)
> * org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this)
> *Unknowns & Challenges*
> There are still a few unknowns and challengers, mainly because the work on
> SPARK-4366 is still in full swing:
> * How will we retain Hive UDAF functionality?
> * What will a WindowExpression containing an AggregateFunction look like?
> Will there be an intermediate AggregateExpression2? Or is this only present
> when distinct values and/or a non-Complete processing mode is requested?
> * The new implementation moves the responsibility of distinct processing to
> the operator. It also
> adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate
> (it is assumed that the current AggregateFunction doesn't require a third).
> Are there posibilities of code reuse? Or
> do we have to implement everything from scratch?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]