[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-14654:
--------------------------------
    Description: 
This is a proposed new API -- that is substantially simpler than the existing 
one, and also more powerful (e.g. at the very least allows computing averages).

{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
  def isRegistered: Boolean = ...
  def register(metadata: AccumulatorMetadata): Unit = ...
  def metadata: AccumulatorMetadata = ...
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: NewAccumulator[IN, OUT]): Unit
  def value: OUT
  def localValue: OUT = value

  final def registerAccumulatorOnExecutor(): Unit = {
    // Automatically register the accumulator when it is deserialized with the 
task closure.
    // This is for external accumulators and internal ones that do not 
represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  }

  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    registerAccumulator()
  }
}
{code}

Metadata, provided by Spark after registration:

{code}
class AccumulatorMetadata(
  val id: Long,
  val name: Option[String],
  val countFailedValues: Boolean
) extends Serializable
{code}

and an implementation that also offers specialized getters and setters

{code}
class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
  private[this] var _sum = 0L

  override def add(v: jl.Long): Unit = {
    _sum += v
  }

  override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other 
match {
    case o: LongAccumulator => _sum += o.sum
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: jl.Long = _sum

  def sum: Long = _sum
}
{code}

and SparkContext...

{code}
class SparkContext {
  ...
  def newLongAccumulator(): LongAccumulator

  def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, 
OUT]
  ...
}
{code}

To use it ...

{code}
val acc = sc.newLongAccumulator()

{code}

A work-in-progress prototype here: 
https://github.com/rxin/spark/tree/accumulator-refactor



  was:
This is a proposed new API -- that is substantially simpler than the existing 
one, and also more powerful (e.g. at the very least allows computing averages).

{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
  def isRegistered: Boolean = ...
  def register(metadata: AccumulatorMetadata): Unit = ...
  def metadata: AccumulatorMetadata = ...
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: NewAccumulator[IN, OUT]): Unit
  def value: OUT
  def localValue: OUT = value

  final def registerAccumulatorOnExecutor(): Unit = {
    // Automatically register the accumulator when it is deserialized with the 
task closure.
    // This is for external accumulators and internal ones that do not 
represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  }

  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    registerAccumulator()
  }
}
{code}

{code}
class AccumulatorMetadata(
  val id: Long,
  val name: Option[String],
  val countFailedValues: Boolean
) extends Serializable
{code}

and to create it ...

{code}
class SparkContext {
  ...
  def newLongAccumulator(): LongAccumulator

  def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, 
OUT]
  ...
}
{code}

A work-in-progress prototype here: 
https://github.com/rxin/spark/tree/accumulator-refactor




> New accumulator API
> -------------------
>
>                 Key: SPARK-14654
>                 URL: https://issues.apache.org/jira/browse/SPARK-14654
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>
> This is a proposed new API -- that is substantially simpler than the existing 
> one, and also more powerful (e.g. at the very least allows computing 
> averages).
> {code}
> abstract class NewAccumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: NewAccumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
>     // Automatically register the accumulator when it is deserialized with 
> the task closure.
>     // This is for external accumulators and internal ones that do not 
> represent task level
>     // metrics, e.g. internal SQL metrics, which are per-operator.
>     val taskContext = TaskContext.get()
>     if (taskContext != null) {
>       taskContext.registerAccumulator(this)
>     }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
>     in.defaultReadObject()
>     registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def add(v: jl.Long): Unit = {
>     _sum += v
>   }
>   override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other 
> match {
>     case o: LongAccumulator => _sum += o.sum
>     case _ => throw new UnsupportedOperationException(
>       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to