Reynold Xin created SPARK-15598:
-----------------------------------

             Summary: Change Aggregator.zero to Aggregator.init
                 Key: SPARK-15598
                 URL: https://issues.apache.org/jira/browse/SPARK-15598
             Project: Spark
          Issue Type: Sub-task
          Components: SQL
            Reporter: Reynold Xin


org.apache.spark.sql.expressions.Aggregator currently requires defining the 
zero value for an aggregator. This is actually a limitation making it difficult 
to implement APIs such as reduce. In reduce (or reduceByKey), a single 
associative and commutative reduce function is specified by the user, and there 
is no definition of zero value.

A small tweak to the API is to change zero to init, taking an input, similar to 
the following:

{code}
abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
  def init(a: IN): BUF
  def reduce(b: BUF, a: IN): BUF
  def merge(b1: BUF, b2: BUF): BUF
  def finish(reduction: BUF): OUT
}
{code}

Then reduce can be implemented using:

{code}
f: (T, T) => T

new Aggregator[T, T, T] {
  override def init(a: T): T = identify
  override def reduce(b: T, a: T): T = f(b, a)
  override def merge(b1: T, b2: T): T = f(b1, b2)
  override def finish(reduction: T): T = identify
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to