Repository: spark Updated Branches: refs/heads/branch-2.0 705172202 -> 7d63c36e1
[SPARK-15049] Rename NewAccumulator to AccumulatorV2 ## What changes were proposed in this pull request? NewAccumulator isn't the best name if we ever come up with v3 of the API. ## How was this patch tested? Updated tests to reflect the change. Author: Reynold Xin <r...@databricks.com> Closes #12827 from rxin/SPARK-15049. (cherry picked from commit 44da8d8eabeccc12bfed0d43b37d54e0da845c66) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d63c36e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d63c36e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d63c36e Branch: refs/heads/branch-2.0 Commit: 7d63c36e1efe8baec96cdc16a997249728e204fd Parents: 7051722 Author: Reynold Xin <r...@databricks.com> Authored: Sun May 1 20:21:02 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sun May 1 20:21:11 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/AccumulatorV2.scala | 394 +++++++++++++++++++ .../scala/org/apache/spark/ContextCleaner.scala | 2 +- .../org/apache/spark/HeartbeatReceiver.scala | 2 +- .../scala/org/apache/spark/NewAccumulator.scala | 393 ------------------ .../scala/org/apache/spark/SparkContext.scala | 4 +- .../scala/org/apache/spark/TaskContext.scala | 2 +- .../org/apache/spark/TaskContextImpl.scala | 2 +- .../scala/org/apache/spark/TaskEndReason.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 4 +- .../org/apache/spark/executor/TaskMetrics.scala | 18 +- .../apache/spark/scheduler/DAGScheduler.scala | 10 +- .../spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- .../org/apache/spark/scheduler/TaskResult.scala | 8 +- .../apache/spark/scheduler/TaskScheduler.scala | 4 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../org/apache/spark/AccumulatorSuite.scala | 2 +- .../apache/spark/InternalAccumulatorSuite.scala | 2 +- .../spark/executor/TaskMetricsSuite.scala | 6 +- .../spark/scheduler/DAGSchedulerSuite.scala | 6 +- .../scheduler/ExternalClusterManagerSuite.scala | 4 +- .../spark/scheduler/TaskSetManagerSuite.scala | 6 +- .../spark/sql/execution/metric/SQLMetrics.scala | 6 +- 24 files changed, 444 insertions(+), 443 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/AccumulatorV2.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala new file mode 100644 index 0000000..c65108a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.{lang => jl} +import java.io.ObjectInputStream +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.scheduler.AccumulableInfo +import org.apache.spark.util.Utils + + +private[spark] case class AccumulatorMetadata( + id: Long, + name: Option[String], + countFailedValues: Boolean) extends Serializable + + +/** + * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of + * type `OUT`. + */ +abstract class AccumulatorV2[IN, OUT] extends Serializable { + private[spark] var metadata: AccumulatorMetadata = _ + private[this] var atDriverSide = true + + private[spark] def register( + sc: SparkContext, + name: Option[String] = None, + countFailedValues: Boolean = false): Unit = { + if (this.metadata != null) { + throw new IllegalStateException("Cannot register an Accumulator twice.") + } + this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues) + AccumulatorContext.register(this) + sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) + } + + /** + * Returns true if this accumulator has been registered. Note that all accumulators must be + * registered before ues, or it will throw exception. + */ + final def isRegistered: Boolean = + metadata != null && AccumulatorContext.get(metadata.id).isDefined + + private def assertMetadataNotNull(): Unit = { + if (metadata == null) { + throw new IllegalAccessError("The metadata of this accumulator has not been assigned yet.") + } + } + + /** + * Returns the id of this accumulator, can only be called after registration. + */ + final def id: Long = { + assertMetadataNotNull() + metadata.id + } + + /** + * Returns the name of this accumulator, can only be called after registration. + */ + final def name: Option[String] = { + assertMetadataNotNull() + metadata.name + } + + /** + * Whether to accumulate values from failed tasks. This is set to true for system and time + * metrics like serialization time or bytes spilled, and false for things with absolute values + * like number of input rows. This should be used for internal metrics only. + */ + private[spark] final def countFailedValues: Boolean = { + assertMetadataNotNull() + metadata.countFailedValues + } + + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided + * values. + */ + private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) + } + + final private[spark] def isAtDriverSide: Boolean = atDriverSide + + /** + * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero + * value; for a list accumulator, Nil is zero value. + */ + def isZero: Boolean + + /** + * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy + * must return true. + */ + def copyAndReset(): AccumulatorV2[IN, OUT] + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + */ + def add(v: IN): Unit + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. + */ + def merge(other: AccumulatorV2[IN, OUT]): Unit + + /** + * Access this accumulator's current value; only allowed on driver. + */ + final def value: OUT = { + if (atDriverSide) { + localValue + } else { + throw new UnsupportedOperationException("Can't read accumulator value in task") + } + } + + /** + * Defines the current value of this accumulator. + * + * This is NOT the global value of the accumulator. To get the global value after a + * completed operation on the dataset, call `value`. + */ + def localValue: OUT + + // Called by Java when serializing an object + final protected def writeReplace(): Any = { + if (atDriverSide) { + if (!isRegistered) { + throw new UnsupportedOperationException( + "Accumulator must be registered before send to executor") + } + val copy = copyAndReset() + assert(copy.isZero, "copyAndReset must return a zero value copy") + copy.metadata = metadata + copy + } else { + this + } + } + + // Called by Java when deserializing an object + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + if (atDriverSide) { + atDriverSide = false + + // Automatically register the accumulator when it is deserialized with the task closure. + // This is for external accumulators and internal ones that do not represent task level + // metrics, e.g. internal SQL metrics, which are per-operator. + val taskContext = TaskContext.get() + if (taskContext != null) { + taskContext.registerAccumulator(this) + } + } else { + atDriverSide = true + } + } + + override def toString: String = { + if (metadata == null) { + "Un-registered Accumulator: " + getClass.getSimpleName + } else { + getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)" + } + } +} + + +/** + * An internal class used to track accumulators by Spark itself. + */ +private[spark] object AccumulatorContext { + + /** + * This global map holds the original accumulator objects that are created on the driver. + * It keeps weak references to these objects so that accumulators can be garbage-collected + * once the RDDs and user-code that reference them are cleaned up. + * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). + */ + private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]] + + private[this] val nextId = new AtomicLong(0L) + + /** + * Returns a globally unique ID for a new [[Accumulator]]. + * Note: Once you copy the [[Accumulator]] the ID is no longer unique. + */ + def newId(): Long = nextId.getAndIncrement + + /** Returns the number of accumulators registered. Used in testing. */ + def numAccums: Int = originals.size + + /** + * Registers an [[Accumulator]] created on the driver such that it can be used on the executors. + * + * All accumulators registered here can later be used as a container for accumulating partial + * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. + * Note: if an accumulator is registered here, it should also be registered with the active + * context cleaner for cleanup so as to avoid memory leaks. + * + * If an [[Accumulator]] with the same ID was already registered, this does nothing instead + * of overwriting it. We will never register same accumulator twice, this is just a sanity check. + */ + def register(a: AccumulatorV2[_, _]): Unit = { + originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a)) + } + + /** + * Unregisters the [[Accumulator]] with the given ID, if any. + */ + def remove(id: Long): Unit = { + originals.remove(id) + } + + /** + * Returns the [[Accumulator]] registered with the given ID, if any. + */ + def get(id: Long): Option[AccumulatorV2[_, _]] = { + Option(originals.get(id)).map { ref => + // Since we are storing weak references, we must check whether the underlying data is valid. + val acc = ref.get + if (acc eq null) { + throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") + } + acc + } + } + + /** + * Clears all registered [[Accumulator]]s. For testing only. + */ + def clear(): Unit = { + originals.clear() + } +} + + +class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { + private[this] var _sum = 0L + + override def isZero: Boolean = _sum == 0 + + override def copyAndReset(): LongAccumulator = new LongAccumulator + + override def add(v: jl.Long): Unit = _sum += v + + def add(v: Long): Unit = _sum += v + + def sum: Long = _sum + + override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match { + case o: LongAccumulator => _sum += o.sum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + private[spark] def setValue(newValue: Long): Unit = _sum = newValue + + override def localValue: jl.Long = _sum +} + + +class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { + private[this] var _sum = 0.0 + + override def isZero: Boolean = _sum == 0.0 + + override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator + + override def add(v: jl.Double): Unit = _sum += v + + def add(v: Double): Unit = _sum += v + + def sum: Double = _sum + + override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match { + case o: DoubleAccumulator => _sum += o.sum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + private[spark] def setValue(newValue: Double): Unit = _sum = newValue + + override def localValue: jl.Double = _sum +} + + +class AverageAccumulator extends AccumulatorV2[jl.Double, jl.Double] { + private[this] var _sum = 0.0 + private[this] var _count = 0L + + override def isZero: Boolean = _sum == 0.0 && _count == 0 + + override def copyAndReset(): AverageAccumulator = new AverageAccumulator + + override def add(v: jl.Double): Unit = { + _sum += v + _count += 1 + } + + def add(d: Double): Unit = { + _sum += d + _count += 1 + } + + override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match { + case o: AverageAccumulator => + _sum += o.sum + _count += o.count + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: jl.Double = if (_count == 0) { + Double.NaN + } else { + _sum / _count + } + + def sum: Double = _sum + + def count: Long = _count +} + + +class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { + private[this] val _list: java.util.List[T] = new java.util.ArrayList[T] + + override def isZero: Boolean = _list.isEmpty + + override def copyAndReset(): ListAccumulator[T] = new ListAccumulator + + override def add(v: T): Unit = _list.add(v) + + override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { + case o: ListAccumulator[T] => _list.addAll(o.localValue) + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + + private[spark] def setValue(newValue: java.util.List[T]): Unit = { + _list.clear() + _list.addAll(newValue) + } +} + + +class LegacyAccumulatorWrapper[R, T]( + initialValue: R, + param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { + private[spark] var _value = initialValue // Current value on driver + + override def isZero: Boolean = _value == param.zero(initialValue) + + override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = { + val acc = new LegacyAccumulatorWrapper(initialValue, param) + acc._value = param.zero(initialValue) + acc + } + + override def add(v: T): Unit = _value = param.addAccumulator(_value, v) + + override def merge(other: AccumulatorV2[T, R]): Unit = other match { + case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: R = _value +} http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/ContextCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 63a00a8..a51338c 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { registerForCleanup(rdd, CleanRDD(rdd.id)) } - def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { + def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { registerForCleanup(a, CleanAccum(a.id)) } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 9eac05f..29018c7 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} */ private[spark] case class Heartbeat( executorId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> accumulator updates + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates blockManagerId: BlockManagerId) /** http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/NewAccumulator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala deleted file mode 100644 index 1571e15..0000000 --- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.{lang => jl} -import java.io.ObjectInputStream -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong -import javax.annotation.concurrent.GuardedBy - -import scala.collection.JavaConverters._ - -import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.util.Utils - - -private[spark] case class AccumulatorMetadata( - id: Long, - name: Option[String], - countFailedValues: Boolean) extends Serializable - - -/** - * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of - * type `OUT`. - */ -abstract class NewAccumulator[IN, OUT] extends Serializable { - private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true - - private[spark] def register( - sc: SparkContext, - name: Option[String] = None, - countFailedValues: Boolean = false): Unit = { - if (this.metadata != null) { - throw new IllegalStateException("Cannot register an Accumulator twice.") - } - this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues) - AccumulatorContext.register(this) - sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) - } - - /** - * Returns true if this accumulator has been registered. Note that all accumulators must be - * registered before ues, or it will throw exception. - */ - final def isRegistered: Boolean = - metadata != null && AccumulatorContext.get(metadata.id).isDefined - - private def assertMetadataNotNull(): Unit = { - if (metadata == null) { - throw new IllegalAccessError("The metadata of this accumulator has not been assigned yet.") - } - } - - /** - * Returns the id of this accumulator, can only be called after registration. - */ - final def id: Long = { - assertMetadataNotNull() - metadata.id - } - - /** - * Returns the name of this accumulator, can only be called after registration. - */ - final def name: Option[String] = { - assertMetadataNotNull() - metadata.name - } - - /** - * Whether to accumulate values from failed tasks. This is set to true for system and time - * metrics like serialization time or bytes spilled, and false for things with absolute values - * like number of input rows. This should be used for internal metrics only. - */ - private[spark] final def countFailedValues: Boolean = { - assertMetadataNotNull() - metadata.countFailedValues - } - - /** - * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] with the provided - * values. - */ - private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) - new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) - } - - final private[spark] def isAtDriverSide: Boolean = atDriverSide - - /** - * Tells if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero - * value; for a list accumulator, Nil is zero value. - */ - def isZero(): Boolean - - /** - * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy - * must return true. - */ - def copyAndReset(): NewAccumulator[IN, OUT] - - /** - * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. - */ - def add(v: IN): Unit - - /** - * Merges another same-type accumulator into this one and update its state, i.e. this should be - * merge-in-place. - */ - def merge(other: NewAccumulator[IN, OUT]): Unit - - /** - * Access this accumulator's current value; only allowed on driver. - */ - final def value: OUT = { - if (atDriverSide) { - localValue - } else { - throw new UnsupportedOperationException("Can't read accumulator value in task") - } - } - - /** - * Defines the current value of this accumulator. - * - * This is NOT the global value of the accumulator. To get the global value after a - * completed operation on the dataset, call `value`. - */ - def localValue: OUT - - // Called by Java when serializing an object - final protected def writeReplace(): Any = { - if (atDriverSide) { - if (!isRegistered) { - throw new UnsupportedOperationException( - "Accumulator must be registered before send to executor") - } - val copy = copyAndReset() - assert(copy.isZero(), "copyAndReset must return a zero value copy") - copy.metadata = metadata - copy - } else { - this - } - } - - // Called by Java when deserializing an object - private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { - in.defaultReadObject() - if (atDriverSide) { - atDriverSide = false - - // Automatically register the accumulator when it is deserialized with the task closure. - // This is for external accumulators and internal ones that do not represent task level - // metrics, e.g. internal SQL metrics, which are per-operator. - val taskContext = TaskContext.get() - if (taskContext != null) { - taskContext.registerAccumulator(this) - } - } else { - atDriverSide = true - } - } - - override def toString: String = { - if (metadata == null) { - "Un-registered Accumulator: " + getClass.getSimpleName - } else { - getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)" - } - } -} - - -private[spark] object AccumulatorContext { - - /** - * This global map holds the original accumulator objects that are created on the driver. - * It keeps weak references to these objects so that accumulators can be garbage-collected - * once the RDDs and user-code that reference them are cleaned up. - * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). - */ - private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] - - private[this] val nextId = new AtomicLong(0L) - - /** - * Return a globally unique ID for a new [[Accumulator]]. - * Note: Once you copy the [[Accumulator]] the ID is no longer unique. - */ - def newId(): Long = nextId.getAndIncrement - - def numAccums: Int = originals.size - - /** - * Register an [[Accumulator]] created on the driver such that it can be used on the executors. - * - * All accumulators registered here can later be used as a container for accumulating partial - * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. - * Note: if an accumulator is registered here, it should also be registered with the active - * context cleaner for cleanup so as to avoid memory leaks. - * - * If an [[Accumulator]] with the same ID was already registered, this does nothing instead - * of overwriting it. We will never register same accumulator twice, this is just a sanity check. - */ - def register(a: NewAccumulator[_, _]): Unit = { - originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a)) - } - - /** - * Unregister the [[Accumulator]] with the given ID, if any. - */ - def remove(id: Long): Unit = { - originals.remove(id) - } - - /** - * Return the [[Accumulator]] registered with the given ID, if any. - */ - def get(id: Long): Option[NewAccumulator[_, _]] = { - Option(originals.get(id)).map { ref => - // Since we are storing weak references, we must check whether the underlying data is valid. - val acc = ref.get - if (acc eq null) { - throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") - } - acc - } - } - - /** - * Clear all registered [[Accumulator]]s. For testing only. - */ - def clear(): Unit = { - originals.clear() - } -} - - -class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] { - private[this] var _sum = 0L - - override def isZero(): Boolean = _sum == 0 - - override def copyAndReset(): LongAccumulator = new LongAccumulator - - override def add(v: jl.Long): Unit = _sum += v - - def add(v: Long): Unit = _sum += v - - def sum: Long = _sum - - override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other match { - case o: LongAccumulator => _sum += o.sum - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - private[spark] def setValue(newValue: Long): Unit = _sum = newValue - - override def localValue: jl.Long = _sum -} - - -class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] { - private[this] var _sum = 0.0 - - override def isZero(): Boolean = _sum == 0.0 - - override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator - - override def add(v: jl.Double): Unit = _sum += v - - def add(v: Double): Unit = _sum += v - - def sum: Double = _sum - - override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match { - case o: DoubleAccumulator => _sum += o.sum - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - private[spark] def setValue(newValue: Double): Unit = _sum = newValue - - override def localValue: jl.Double = _sum -} - - -class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] { - private[this] var _sum = 0.0 - private[this] var _count = 0L - - override def isZero(): Boolean = _sum == 0.0 && _count == 0 - - override def copyAndReset(): AverageAccumulator = new AverageAccumulator - - override def add(v: jl.Double): Unit = { - _sum += v - _count += 1 - } - - def add(d: Double): Unit = { - _sum += d - _count += 1 - } - - override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match { - case o: AverageAccumulator => - _sum += o.sum - _count += o.count - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - override def localValue: jl.Double = if (_count == 0) { - Double.NaN - } else { - _sum / _count - } - - def sum: Double = _sum - - def count: Long = _count -} - - -class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] { - private[this] val _list: java.util.List[T] = new java.util.ArrayList[T] - - override def isZero(): Boolean = _list.isEmpty - - override def copyAndReset(): ListAccumulator[T] = new ListAccumulator - - override def add(v: T): Unit = _list.add(v) - - override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = other match { - case o: ListAccumulator[T] => _list.addAll(o.localValue) - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list) - - private[spark] def setValue(newValue: java.util.List[T]): Unit = { - _list.clear() - _list.addAll(newValue) - } -} - - -class LegacyAccumulatorWrapper[R, T]( - initialValue: R, - param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, R] { - private[spark] var _value = initialValue // Current value on driver - - override def isZero(): Boolean = _value == param.zero(initialValue) - - override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = { - val acc = new LegacyAccumulatorWrapper(initialValue, param) - acc._value = param.zero(initialValue) - acc - } - - override def add(v: T): Unit = _value = param.addAccumulator(_value, v) - - override def merge(other: NewAccumulator[T, R]): Unit = other match { - case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - override def localValue: R = _value -} http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2cb3ed0..d0f88d4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1282,7 +1282,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register the given accumulator. Note that accumulators must be registered before use, or it * will throw exception. */ - def register(acc: NewAccumulator[_, _]): Unit = { + def register(acc: AccumulatorV2[_, _]): Unit = { acc.register(this) } @@ -1290,7 +1290,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register the given accumulator with given name. Note that accumulators must be registered * before use, or it will throw exception. */ - def register(acc: NewAccumulator[_, _], name: String): Unit = { + def register(acc: AccumulatorV2[_, _], name: String): Unit = { acc.register(this, name = Some(name)) } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/TaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 9e53257..1a8f8cf 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable { * Register an accumulator that belongs to this task. Accumulators must call this method when * deserializing in executors. */ - private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index bc3807f..c904e08 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -122,7 +122,7 @@ private[spark] class TaskContextImpl( override def getMetricsSources(sourceName: String): Seq[Source] = metricsSystem.getSourcesByName(sourceName) - private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): Unit = { + private[spark] override def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { taskMetrics.registerAccumulator(a) } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/TaskEndReason.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 82ba2d0..ef333e3 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -118,7 +118,7 @@ case class ExceptionFailure( fullStackTrace: String, private val exceptionWrapper: Option[ThrowableSerializationWrapper], accumUpdates: Seq[AccumulableInfo] = Seq.empty, - private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil) + private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) extends TaskFailedReason { /** @@ -138,7 +138,7 @@ case class ExceptionFailure( this(e, accumUpdates, preserveCause = true) } - private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): ExceptionFailure = { + private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): ExceptionFailure = { this.accums = accums this } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4d61f7e..4f74dc9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -353,7 +353,7 @@ private[spark] class Executor( logError(s"Exception in $taskName (TID $taskId)", t) // Collect latest accumulator values to report back to the driver - val accums: Seq[NewAccumulator[_, _]] = + val accums: Seq[AccumulatorV2[_, _]] = if (task != null) { task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) @@ -478,7 +478,7 @@ private[spark] class Executor( /** Reports heartbeat and metrics for active tasks to the driver. */ private def reportHeartBeat(): Unit = { // list of (task id, accumUpdates) to send back to the driver - val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]() + val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() for (taskRunner <- runningTasks.values().asScala) { http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 0b64917..56d034f 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -201,7 +201,7 @@ class TaskMetrics private[spark] () extends Serializable { output.RECORDS_WRITTEN -> outputMetrics._recordsWritten ) ++ testAccum.map(TEST_ACCUM -> _) - @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] = + @transient private[spark] lazy val internalAccums: Seq[AccumulatorV2[_, _]] = nameToAccums.values.toIndexedSeq /* ========================== * @@ -217,13 +217,13 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private lazy val externalAccums = new ArrayBuffer[NewAccumulator[_, _]] + @transient private lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] - private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = { + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { externalAccums += a } - private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = internalAccums ++ externalAccums + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums } @@ -271,15 +271,15 @@ private[spark] object TaskMetrics extends Logging { /** * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only. */ - def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = { + def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = { val tm = new TaskMetrics val (internalAccums, externalAccums) = accums.partition(a => a.name.isDefined && tm.nameToAccums.contains(a.name.get)) internalAccums.foreach { acc => - val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]] + val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[AccumulatorV2[Any, Any]] tmAcc.metadata = acc.metadata - tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]]) + tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) } tm.externalAccums ++= externalAccums @@ -289,7 +289,7 @@ private[spark] object TaskMetrics extends Logging { private[spark] class BlockStatusesAccumulator - extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { + extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] override def isZero(): Boolean = _seq.isEmpty @@ -298,7 +298,7 @@ private[spark] class BlockStatusesAccumulator override def add(v: (BlockId, BlockStatus)): Unit = _seq += v - override def merge(other: NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) + override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) : Unit = other match { case o: BlockStatusesAccumulator => _seq ++= o.localValue case _ => throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a96d5f6..4dfd532 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -209,7 +209,7 @@ class DAGScheduler( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[NewAccumulator[_, _]], + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = { eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo)) @@ -1091,14 +1091,14 @@ class DAGScheduler( event.accumUpdates.foreach { updates => val id = updates.id // Find the corresponding accumulator on the driver and update it - val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match { - case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]] + val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match { + case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]] case None => throw new SparkException(s"attempted to access non-existent accumulator $id") } - acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]]) + acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]]) // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && !updates.isZero()) { + if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index e57a224..0a2c2dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[NewAccumulator[_, _]], + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) extends DAGSchedulerEvent http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/Task.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index e7ca6ef..362f8e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -153,7 +153,7 @@ private[spark] abstract class Task[T]( * Collect the latest values of accumulators used in this task. If the task failed, * filter out the accumulators whose values should not be included on failures. */ - def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[NewAccumulator[_, _]] = { + def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = { if (context != null) { context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues } } else { http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index b472c55..69ce00f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{NewAccumulator, SparkEnv} +import org.apache.spark.{AccumulatorV2, SparkEnv} import org.apache.spark.storage.BlockId import org.apache.spark.util.Utils @@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int) /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] class DirectTaskResult[T]( var valueBytes: ByteBuffer, - var accumUpdates: Seq[NewAccumulator[_, _]]) + var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T] with Externalizable { private var valueObjectDeserialized = false @@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T]( if (numUpdates == 0) { accumUpdates = null } else { - val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]] + val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]] for (i <- 0 until numUpdates) { - _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]] + _accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]] } accumUpdates = _accumUpdates } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 75a0c56..9881a10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import org.apache.spark.NewAccumulator +import org.apache.spark.AccumulatorV2 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId @@ -67,7 +67,7 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean /** http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8fa4aa1..666b636 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -384,7 +384,7 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b79f643..b724050 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -647,7 +647,7 @@ private[spark] class TaskSetManager( info.markFailed() val index = info.index copiesRunning(index) -= 1 - var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty + var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + reason.asInstanceOf[TaskFailedReason].toErrorString val failureException: Option[Throwable] = reason match { http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 9c90049..09eb9c1 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -273,7 +273,7 @@ private[spark] object AccumulatorSuite { * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the * info as an accumulator update. */ - def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 688eb6b..25977a4 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -213,7 +213,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { private val accumsRegistered = new ArrayBuffer[Long] - override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { + override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { accumsRegistered += a.id super.registerAccumulatorForCleanup(a) } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 94f6e1a..27a1e7b 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -203,7 +203,7 @@ class TaskMetricsSuite extends SparkFunSuite { acc1.add(1) acc2.add(2) val newUpdates = tm.accumulators() - .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap + .map(a => (a.id, a.asInstanceOf[AccumulatorV2[Any, Any]])).toMap assert(newUpdates.contains(acc1.id)) assert(newUpdates.contains(acc2.id)) assert(newUpdates.contains(acc3.id)) @@ -230,8 +230,8 @@ private[spark] object TaskMetricsSuite extends Assertions { * Note: this does NOT check accumulator ID equality. */ def assertUpdatesEquals( - updates1: Seq[NewAccumulator[_, _]], - updates2: Seq[NewAccumulator[_, _]]): Unit = { + updates1: Seq[AccumulatorV2[_, _]], + updates2: Seq[AccumulatorV2[_, _]]): Unit = { assert(updates1.size === updates2.size) updates1.zip(updates2).foreach { case (acc1, acc2) => // do not assert ID equals here http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9912d1f..5a5c3a0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def stop() = {} override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager @@ -483,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -2012,7 +2012,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou task: Task[_], reason: TaskEndReason, result: Any, - extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty, + extraAccumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty, taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = { val accumUpdates = reason match { case Success => task.metrics.accumulators() http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 16027d9..72ac848 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{AccumulatorV2, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId @@ -67,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 339fc42..122a3ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[NewAccumulator[_, _]], + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason } @@ -184,7 +184,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task => + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } @@ -791,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private def createTaskResult( id: Int, - accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = { + accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 7bf9225..40c00ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.metric import java.text.NumberFormat -import org.apache.spark.{NewAccumulator, SparkContext} +import org.apache.spark.{AccumulatorV2, SparkContext} import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.util.Utils -class SQLMetric(val metricType: String, initValue: Long = 0L) extends NewAccumulator[Long, Long] { +class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] { // This is a workaround for SPARK-11013. // We may use -1 as initial value of the accumulator, if the accumulator is valid, we will // update it at the end of task and the value will be at least 0. Then we can filter out the -1 @@ -33,7 +33,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends NewAccumul override def copyAndReset(): SQLMetric = new SQLMetric(metricType, initValue) - override def merge(other: NewAccumulator[Long, Long]): Unit = other match { + override def merge(other: AccumulatorV2[Long, Long]): Unit = other match { case o: SQLMetric => _value += o.localValue case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org