[
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Reynold Xin updated SPARK-14654:
--------------------------------
Description:
This is a proposed new API -- that is substantially simpler than the existing
one, and also more powerful (e.g. at the very least allows computing averages).
{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
def isRegistered: Boolean = ...
def register(metadata: AccumulatorMetadata): Unit = ...
def metadata: AccumulatorMetadata = ...
def reset(): Unit
def add(v: IN): Unit
def merge(other: NewAccumulator[IN, OUT]): Unit
def value: OUT
def localValue: OUT = value
final def registerAccumulatorOnExecutor(): Unit = {
// Automatically register the accumulator when it is deserialized with the
task closure.
// This is for external accumulators and internal ones that do not
represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
registerAccumulator()
}
}
{code}
Metadata, provided by Spark after registration:
{code}
class AccumulatorMetadata(
val id: Long,
val name: Option[String],
val countFailedValues: Boolean
) extends Serializable
{code}
and an implementation that also offers specialized getters and setters
{code}
class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
private[this] var _sum = 0L
override def add(v: jl.Long): Unit = {
_sum += v
}
override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other
match {
case o: LongAccumulator => _sum += o.sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: jl.Long = _sum
def sum: Long = _sum
}
{code}
and SparkContext...
{code}
class SparkContext {
...
def newLongAccumulator(): LongAccumulator
def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN,
OUT]
...
}
{code}
To use it ...
{code}
val acc = sc.newLongAccumulator()
{code}
A work-in-progress prototype here:
https://github.com/rxin/spark/tree/accumulator-refactor
was:
This is a proposed new API -- that is substantially simpler than the existing
one, and also more powerful (e.g. at the very least allows computing averages).
{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
def isRegistered: Boolean = ...
def register(metadata: AccumulatorMetadata): Unit = ...
def metadata: AccumulatorMetadata = ...
def reset(): Unit
def add(v: IN): Unit
def merge(other: NewAccumulator[IN, OUT]): Unit
def value: OUT
def localValue: OUT = value
final def registerAccumulatorOnExecutor(): Unit = {
// Automatically register the accumulator when it is deserialized with the
task closure.
// This is for external accumulators and internal ones that do not
represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
registerAccumulator()
}
}
{code}
{code}
class AccumulatorMetadata(
val id: Long,
val name: Option[String],
val countFailedValues: Boolean
) extends Serializable
{code}
and to create it ...
{code}
class SparkContext {
...
def newLongAccumulator(): LongAccumulator
def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN,
OUT]
...
}
{code}
A work-in-progress prototype here:
https://github.com/rxin/spark/tree/accumulator-refactor
> New accumulator API
> -------------------
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Reynold Xin
> Assignee: Reynold Xin
>
> This is a proposed new API -- that is substantially simpler than the existing
> one, and also more powerful (e.g. at the very least allows computing
> averages).
> {code}
> abstract class NewAccumulator[IN, OUT] extends Serializable {
> def isRegistered: Boolean = ...
> def register(metadata: AccumulatorMetadata): Unit = ...
> def metadata: AccumulatorMetadata = ...
> def reset(): Unit
> def add(v: IN): Unit
> def merge(other: NewAccumulator[IN, OUT]): Unit
> def value: OUT
> def localValue: OUT = value
> final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with
> the task closure.
> // This is for external accumulators and internal ones that do not
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
> taskContext.registerAccumulator(this)
> }
> }
> // Called by Java when deserializing an object
> private def readObject(in: ObjectInputStream): Unit =
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
> }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
> val id: Long,
> val name: Option[String],
> val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
> private[this] var _sum = 0L
> override def add(v: jl.Long): Unit = {
> _sum += v
> }
> override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
> s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
> }
> override def value: jl.Long = _sum
> def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
> ...
> def newLongAccumulator(): LongAccumulator
> def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]):
> Accumulator[IN, OUT]
> ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> {code}
> A work-in-progress prototype here:
> https://github.com/rxin/spark/tree/accumulator-refactor
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]