[
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Reynold Xin updated SPARK-14654:
--------------------------------
Description:
This is a proposed new API -- that is substantially simpler than the existing
one, and also more powerful (e.g. at the very least allows computing averages).
{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
def isRegistered: Boolean = ...
def register(metadata: AccumulatorMetadata): Unit = ...
def metadata: AccumulatorMetadata = ...
def reset(): Unit
def add(v: IN): Unit
def merge(other: NewAccumulator[IN, OUT]): Unit
def value: OUT
def localValue: OUT = value
final def registerAccumulatorOnExecutor(): Unit = {
// Automatically register the accumulator when it is deserialized with the
task closure.
// This is for external accumulators and internal ones that do not
represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
registerAccumulator()
}
}
{code}
{code}
class AccumulatorMetadata(
val id: Long,
val name: Option[String],
val countFailedValues: Boolean
) extends Serializable
{code}
and to create it ...
{code}
class SparkContext {
...
def newLongAccumulator(): LongAccumulator
def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN,
OUT]
...
}
{code}
A work-in-progress prototype here:
https://github.com/rxin/spark/tree/accumulator-refactor
was:
This is a proposed new API -- that is substantially simpler than the existing
one, and also more powerful (e.g. at the very least allows computing averages).
{code}
abstract class NewAccumulator[IN, OUT] extends Serializable {
def isRegistered: Boolean = {
...
}
def register(metadata: AccumulatorMetadata): Unit = {
...
}
def metadata: AccumulatorMetadata = {
...
}
def reset(): Unit
def add(v: IN): Unit
def merge(other: NewAccumulator[IN, OUT]): Unit
def value: OUT
def localValue: OUT = value
final def registerAccumulatorOnExecutor(): Unit = {
// Automatically register the accumulator when it is deserialized with the
task closure.
// This is for external accumulators and internal ones that do not
represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
registerAccumulator()
}
}
{code}
{code}
class AccumulatorMetadata(
val id: Long,
val name: Option[String],
val countFailedValues: Boolean
) extends Serializable
{code}
and to create it ...
{code}
class SparkContext {
...
def newLongAccumulator(): LongAccumulator
def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN,
OUT]
...
}
{code}
A work-in-progress prototype here:
https://github.com/rxin/spark/tree/accumulator-refactor
> New accumulator API
> -------------------
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Reynold Xin
> Assignee: Reynold Xin
>
> This is a proposed new API -- that is substantially simpler than the existing
> one, and also more powerful (e.g. at the very least allows computing
> averages).
> {code}
> abstract class NewAccumulator[IN, OUT] extends Serializable {
> def isRegistered: Boolean = ...
> def register(metadata: AccumulatorMetadata): Unit = ...
> def metadata: AccumulatorMetadata = ...
> def reset(): Unit
> def add(v: IN): Unit
> def merge(other: NewAccumulator[IN, OUT]): Unit
> def value: OUT
> def localValue: OUT = value
> final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with
> the task closure.
> // This is for external accumulators and internal ones that do not
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
> taskContext.registerAccumulator(this)
> }
> }
> // Called by Java when deserializing an object
> private def readObject(in: ObjectInputStream): Unit =
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
> }
> }
> {code}
> {code}
> class AccumulatorMetadata(
> val id: Long,
> val name: Option[String],
> val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and to create it ...
> {code}
> class SparkContext {
> ...
> def newLongAccumulator(): LongAccumulator
> def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]):
> Accumulator[IN, OUT]
> ...
> }
> {code}
> A work-in-progress prototype here:
> https://github.com/rxin/spark/tree/accumulator-refactor
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]