[
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Reynold Xin updated SPARK-14654:
--------------------------------
Description:
The current accumulator API has a few problems:
1. Its type hierarchy is very complicated, with Accumulator, Accumulable,
AccumulatorParam, AccumulableParam, etc.
2. The intermediate buffer type must be the same as the output type, so there
is no way to define an accumulator that computes averages.
3. It is very difficult to specialize the methods, leading to excessive boxing
and making accumulators bad for metrics that change for each record.
4. There is not a single coherent API that works for both Java and Scala.
This is a proposed new API that addresses all of the above. In this new API:
1. There is only a single class (Accumulator) that is user facing
2. The intermediate value is stored in the accumulator itself and can be
different from the output type.
3. Concrete implementations can provide its own specialized methods.
4. Designed to work for both Java and Scala.
{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
def isRegistered: Boolean = ...
def register(metadata: AccumulatorMetadata): Unit = ...
def metadata: AccumulatorMetadata = ...
def reset(): Unit
def add(v: IN): Unit
def merge(other: NewAccumulator[IN, OUT]): Unit
def value: OUT
def localValue: OUT = value
final def registerAccumulatorOnExecutor(): Unit = {
// Automatically register the accumulator when it is deserialized with the
task closure.
// This is for external accumulators and internal ones that do not
represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
registerAccumulator()
}
}
{code}
Metadata, provided by Spark after registration:
{code}
class AccumulatorMetadata(
val id: Long,
val name: Option[String],
val countFailedValues: Boolean
) extends Serializable
{code}
and an implementation that also offers specialized getters and setters
{code}
class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
private[this] var _sum = 0L
override def add(v: jl.Long): Unit = {
_sum += v
}
override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other
match {
case o: LongAccumulator => _sum += o.sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: jl.Long = _sum
def sum: Long = _sum
}
{code}
and SparkContext...
{code}
class SparkContext {
...
def newLongAccumulator(): LongAccumulator
def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN,
OUT]
...
}
{code}
To use it ...
{code}
val acc = sc.newLongAccumulator()
sc.parallelize(1 to 1000).map { i =>
acc.add(1)
i
}
{code}
A work-in-progress prototype here:
https://github.com/rxin/spark/tree/accumulator-refactor
was:
The current accumulator API has a few problems:
1. Its type hierarchy is very complicated, with Accumulator, Accumulable,
AccumulatorParam, AccumulableParam, etc.
2. The intermediate buffer type must be the same as the output type, so there
is no way to define an accumulator that computes averages.
3. It is very difficult to specialize the methods, leading to excessive boxing
and making accumulators bad for metrics that change for each record.
4. There is not a single coherent API that works for both Java and Scala.
This is a proposed new API that addresses all of the above. In this new API:
1. There is only a single class (Accumulator) that is user facing
2. The intermediate value is stored in the accumulator itself and can be
different from the output type.
3. Concrete implementations can provide its own specialized methods.
4. Designed to work for both Java and Scala.
{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
def isRegistered: Boolean = ...
def register(metadata: AccumulatorMetadata): Unit = ...
def metadata: AccumulatorMetadata = ...
def reset(): Unit
def add(v: IN): Unit
def merge(other: NewAccumulator[IN, OUT]): Unit
def value: OUT
def localValue: OUT = value
final def registerAccumulatorOnExecutor(): Unit = {
// Automatically register the accumulator when it is deserialized with the
task closure.
// This is for external accumulators and internal ones that do not
represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
registerAccumulator()
}
}
{code}
Metadata, provided by Spark after registration:
{code}
class AccumulatorMetadata(
val id: Long,
val name: Option[String],
val countFailedValues: Boolean
) extends Serializable
{code}
and an implementation that also offers specialized getters and setters
{code}
class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
private[this] var _sum = 0L
override def add(v: jl.Long): Unit = {
_sum += v
}
override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other
match {
case o: LongAccumulator => _sum += o.sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: jl.Long = _sum
def sum: Long = _sum
}
{code}
and SparkContext...
{code}
class SparkContext {
...
def newLongAccumulator(): LongAccumulator
def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN,
OUT]
...
}
{code}
To use it ...
{code}
val acc = sc.newLongAccumulator()
sc.parallelize(1 to 1000).map { i =>
acc.add(1)
i
}
{code}
A work-in-progress prototype here:
https://github.com/rxin/spark/tree/accumulator-refactor
> New accumulator API
> -------------------
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Reynold Xin
> Assignee: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable,
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class NewAccumulator[IN, OUT] extends Serializable {
> def isRegistered: Boolean = ...
> def register(metadata: AccumulatorMetadata): Unit = ...
> def metadata: AccumulatorMetadata = ...
> def reset(): Unit
> def add(v: IN): Unit
> def merge(other: NewAccumulator[IN, OUT]): Unit
> def value: OUT
> def localValue: OUT = value
> final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with
> the task closure.
> // This is for external accumulators and internal ones that do not
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
> taskContext.registerAccumulator(this)
> }
> }
> // Called by Java when deserializing an object
> private def readObject(in: ObjectInputStream): Unit =
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
> }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
> val id: Long,
> val name: Option[String],
> val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
> private[this] var _sum = 0L
> override def add(v: jl.Long): Unit = {
> _sum += v
> }
> override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
> s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
> }
> override def value: jl.Long = _sum
> def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
> ...
> def newLongAccumulator(): LongAccumulator
> def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]):
> Accumulator[IN, OUT]
> ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
> acc.add(1)
> i
> }
> {code}
> A work-in-progress prototype here:
> https://github.com/rxin/spark/tree/accumulator-refactor
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]