[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-14654:
--------------------------------
    Description: 
The current accumulator API has a few problems:

1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
AccumulatorParam, AccumulableParam, etc.
2. The intermediate buffer type must be the same as the output type, so there 
is no way to define an accumulator that computes averages.
3. It is very difficult to specialize the methods, leading to excessive boxing 
and making accumulators bad for metrics that change for each record.
4. There is not a single coherent API that works for both Java and Scala.

This is a proposed new API that addresses all of the above. In this new API:

1. There is only a single class (Accumulator) that is user facing
2. The intermediate value is stored in the accumulator itself and can be 
different from the output type.
3. Concrete implementations can provide its own specialized methods.
4. Designed to work for both Java and Scala.

{code}
abstract class Accumulator[IN, OUT] extends Serializable {
  def isRegistered: Boolean = ...
  def register(metadata: AccumulatorMetadata): Unit = ...
  def metadata: AccumulatorMetadata = ...
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: Accumulator[IN, OUT]): Unit
  def value: OUT
  def localValue: OUT = value

  final def registerAccumulatorOnExecutor(): Unit = {
    // Automatically register the accumulator when it is deserialized with the 
task closure.
    // This is for external accumulators and internal ones that do not 
represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  }

  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    registerAccumulator()
  }
}
{code}

Metadata, provided by Spark after registration:

{code}
class AccumulatorMetadata(
  val id: Long,
  val name: Option[String],
  val countFailedValues: Boolean
) extends Serializable
{code}

and an implementation that also offers specialized getters and setters

{code}
class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
  private[this] var _sum = 0L

  override def reset(): Unit = _sum = 0L

  override def add(v: jl.Long): Unit = {
    _sum += v
  }

  override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other match {
    case o: LongAccumulator => _sum += o.sum
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: jl.Long = _sum

  def sum: Long = _sum
}
{code}

and SparkContext...

{code}
class SparkContext {
  ...
  def newLongAccumulator(): LongAccumulator
  def newLongAccumulator(name: Long): LongAccumulator
  def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
  def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, 
OUT]
  ...
}
{code}

To use it ...

{code}
val acc = sc.newLongAccumulator()
sc.parallelize(1 to 1000).map { i =>
  acc.add(1)
  i
}
{code}

A work-in-progress prototype here: 
https://github.com/rxin/spark/tree/accumulator-refactor



  was:
The current accumulator API has a few problems:

1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
AccumulatorParam, AccumulableParam, etc.
2. The intermediate buffer type must be the same as the output type, so there 
is no way to define an accumulator that computes averages.
3. It is very difficult to specialize the methods, leading to excessive boxing 
and making accumulators bad for metrics that change for each record.
4. There is not a single coherent API that works for both Java and Scala.

This is a proposed new API that addresses all of the above. In this new API:

1. There is only a single class (Accumulator) that is user facing
2. The intermediate value is stored in the accumulator itself and can be 
different from the output type.
3. Concrete implementations can provide its own specialized methods.
4. Designed to work for both Java and Scala.

{code}
abstract class Accumulator[IN, OUT] extends Serializable {
  def isRegistered: Boolean = ...
  def register(metadata: AccumulatorMetadata): Unit = ...
  def metadata: AccumulatorMetadata = ...
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: Accumulator[IN, OUT]): Unit
  def value: OUT
  def localValue: OUT = value

  final def registerAccumulatorOnExecutor(): Unit = {
    // Automatically register the accumulator when it is deserialized with the 
task closure.
    // This is for external accumulators and internal ones that do not 
represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  }

  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    registerAccumulator()
  }
}
{code}

Metadata, provided by Spark after registration:

{code}
class AccumulatorMetadata(
  val id: Long,
  val name: Option[String],
  val countFailedValues: Boolean
) extends Serializable
{code}

and an implementation that also offers specialized getters and setters

{code}
class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
  private[this] var _sum = 0L

  override def reset(): Unit = _sum = 0L

  override def add(v: jl.Long): Unit = {
    _sum += v
  }

  override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other match {
    case o: LongAccumulator => _sum += o.sum
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: jl.Long = _sum

  def sum: Long = _sum
}
{code}

and SparkContext...

{code}
class SparkContext {
  ...
  def newLongAccumulator(): LongAccumulator
  def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, 
OUT]
  ...
}
{code}

To use it ...

{code}
val acc = sc.newLongAccumulator()
sc.parallelize(1 to 1000).map { i =>
  acc.add(1)
  i
}
{code}

A work-in-progress prototype here: 
https://github.com/rxin/spark/tree/accumulator-refactor




> New accumulator API
> -------------------
>
>                 Key: SPARK-14654
>                 URL: https://issues.apache.org/jira/browse/SPARK-14654
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
>     // Automatically register the accumulator when it is deserialized with 
> the task closure.
>     // This is for external accumulators and internal ones that do not 
> represent task level
>     // metrics, e.g. internal SQL metrics, which are per-operator.
>     val taskContext = TaskContext.get()
>     if (taskContext != null) {
>       taskContext.registerAccumulator(this)
>     }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
>     in.defaultReadObject()
>     registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
>     _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
>     case o: LongAccumulator => _sum += o.sum
>     case _ => throw new UnsupportedOperationException(
>       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to