Repository: spark Updated Branches: refs/heads/branch-2.0 55a837246 -> 935b6e0e4
[SPARK-15866] Rename listAccumulator collectionAccumulator ## What changes were proposed in this pull request? SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" is a verb and the method should return a list of accumulators. This patch renames the method and the class collection accumulator. ## How was this patch tested? Updated test case to reflect the names. Author: Reynold Xin <r...@databricks.com> Closes #13594 from rxin/SPARK-15866. (cherry picked from commit 254bc8c34e70241508bdfc8ff42a65491f5280cd) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/935b6e0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/935b6e0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/935b6e0e Branch: refs/heads/branch-2.0 Commit: 935b6e0e48e258f447622033b512f7ba5d83da69 Parents: 55a8372 Author: Reynold Xin <r...@databricks.com> Authored: Fri Jun 10 11:08:39 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Jun 10 11:08:47 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkContext.scala | 16 ++++++++-------- .../scala/org/apache/spark/util/AccumulatorV2.scala | 15 ++++++++++----- .../org/apache/spark/util/AccumulatorV2Suite.scala | 2 +- .../execution/columnar/InMemoryTableScanExec.scala | 8 ++++---- 4 files changed, 23 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 33b11ed..230fabd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1340,21 +1340,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * Create and register a list accumulator, which starts with empty list and accumulates inputs - * by adding them into the inner list. + * Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates + * inputs by adding them into the list. */ - def listAccumulator[T]: ListAccumulator[T] = { - val acc = new ListAccumulator[T] + def collectionAccumulator[T]: CollectionAccumulator[T] = { + val acc = new CollectionAccumulator[T] register(acc) acc } /** - * Create and register a list accumulator, which starts with empty list and accumulates inputs - * by adding them into the inner list. + * Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates + * inputs by adding them into the list. */ - def listAccumulator[T](name: String): ListAccumulator[T] = { - val acc = new ListAccumulator[T] + def collectionAccumulator[T](name: String): CollectionAccumulator[T] = { + val acc = new CollectionAccumulator[T] register(acc, name) acc } http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0b9a47c..044dd69 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -415,15 +415,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { } -class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { +/** + * An [[AccumulatorV2 accumulator]] for collecting a list of elements. + * + * @since 2.0.0 + */ +class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { private val _list: java.util.List[T] = new ArrayList[T] override def isZero: Boolean = _list.isEmpty - override def copyAndReset(): ListAccumulator[T] = new ListAccumulator + override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator - override def copy(): ListAccumulator[T] = { - val newAcc = new ListAccumulator[T] + override def copy(): CollectionAccumulator[T] = { + val newAcc = new CollectionAccumulator[T] newAcc._list.addAll(_list) newAcc } @@ -433,7 +438,7 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { override def add(v: T): Unit = _list.add(v) override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { - case o: ListAccumulator[T] => _list.addAll(o.value) + case o: CollectionAccumulator[T] => _list.addAll(o.value) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index 439da13..a04644d 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -88,7 +88,7 @@ class AccumulatorV2Suite extends SparkFunSuite { } test("ListAccumulator") { - val acc = new ListAccumulator[Double] + val acc = new CollectionAccumulator[Double] assert(acc.value.isEmpty) assert(acc.isZero) http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index a1c2f0a..ff07331 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{AccumulatorContext, ListAccumulator, LongAccumulator} +import org.apache.spark.util.{AccumulatorContext, CollectionAccumulator, LongAccumulator} private[sql] object InMemoryRelation { @@ -67,16 +67,16 @@ private[sql] case class InMemoryRelation( tableName: Option[String])( @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null, @transient private[sql] var _statistics: Statistics = null, - private[sql] var _batchStats: ListAccumulator[InternalRow] = null) + private[sql] var _batchStats: CollectionAccumulator[InternalRow] = null) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) override def producedAttributes: AttributeSet = outputSet - private[sql] val batchStats: ListAccumulator[InternalRow] = + private[sql] val batchStats: CollectionAccumulator[InternalRow] = if (_batchStats == null) { - child.sqlContext.sparkContext.listAccumulator[InternalRow] + child.sqlContext.sparkContext.collectionAccumulator[InternalRow] } else { _batchStats } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org