[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-14654:
--------------------------------
    Description: 
This is a proposed new API -- that is substantially simpler than the existing 
one, and also more powerful (e.g. at the very least allows computing averages).

{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
  def isRegistered: Boolean = ...
  def register(metadata: AccumulatorMetadata): Unit = ...
  def metadata: AccumulatorMetadata = ...
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: NewAccumulator[IN, OUT]): Unit
  def value: OUT
  def localValue: OUT = value

  final def registerAccumulatorOnExecutor(): Unit = {
    // Automatically register the accumulator when it is deserialized with the 
task closure.
    // This is for external accumulators and internal ones that do not 
represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  }

  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    registerAccumulator()
  }
}
{code}

{code}
class AccumulatorMetadata(
  val id: Long,
  val name: Option[String],
  val countFailedValues: Boolean
) extends Serializable
{code}

and to create it ...

{code}
class SparkContext {
  ...
  def newLongAccumulator(): LongAccumulator

  def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, 
OUT]
  ...
}
{code}

A work-in-progress prototype here: 
https://github.com/rxin/spark/tree/accumulator-refactor



  was:
This is a proposed new API -- that is substantially simpler than the existing 
one, and also more powerful (e.g. at the very least allows computing averages).

{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {

  def isRegistered: Boolean = {
    ...
  }

  def register(metadata: AccumulatorMetadata): Unit = {
    ...
  }

  def metadata: AccumulatorMetadata = {
    ...
  }

  def reset(): Unit

  def add(v: IN): Unit

  def merge(other: NewAccumulator[IN, OUT]): Unit

  def value: OUT

  def localValue: OUT = value

  final def registerAccumulatorOnExecutor(): Unit = {
    // Automatically register the accumulator when it is deserialized with the 
task closure.
    // This is for external accumulators and internal ones that do not 
represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  }

  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    registerAccumulator()
  }
}
{code}

{code}
class AccumulatorMetadata(
  val id: Long,
  val name: Option[String],
  val countFailedValues: Boolean
) extends Serializable
{code}

and to create it ...

{code}
class SparkContext {
  ...
  def newLongAccumulator(): LongAccumulator

  def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, 
OUT]
  ...
}
{code}

A work-in-progress prototype here: 
https://github.com/rxin/spark/tree/accumulator-refactor




> New accumulator API
> -------------------
>
>                 Key: SPARK-14654
>                 URL: https://issues.apache.org/jira/browse/SPARK-14654
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>
> This is a proposed new API -- that is substantially simpler than the existing 
> one, and also more powerful (e.g. at the very least allows computing 
> averages).
> {code}
> abstract class NewAccumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: NewAccumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
>     // Automatically register the accumulator when it is deserialized with 
> the task closure.
>     // This is for external accumulators and internal ones that do not 
> represent task level
>     // metrics, e.g. internal SQL metrics, which are per-operator.
>     val taskContext = TaskContext.get()
>     if (taskContext != null) {
>       taskContext.registerAccumulator(this)
>     }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
>     in.defaultReadObject()
>     registerAccumulator()
>   }
> }
> {code}
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and to create it ...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to