Repository: spark Updated Branches: refs/heads/branch-2.0 9c817d027 -> 554e0f30a
[SPARK-15322][SQL][FOLLOW-UP] Update deprecated accumulator usage into accumulatorV2 ## What changes were proposed in this pull request? This PR corrects another case that uses deprecated `accumulableCollection` to use `listAccumulator`, which seems the previous PR missed. Since `ArrayBuffer[InternalRow].asJava` is `java.util.List[InternalRow]`, it seems ok to replace the usage. ## How was this patch tested? Related existing tests `InMemoryColumnarQuerySuite` and `CachedTableSuite`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #13187 from HyukjinKwon/SPARK-15322. (cherry picked from commit f5065abf49dea0eac04b0ec219f2d832a0f6730a) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/554e0f30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/554e0f30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/554e0f30 Branch: refs/heads/branch-2.0 Commit: 554e0f30a7fd9fae4282d93ec4c0f1c6dbdffabe Parents: 9c817d0 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Thu May 19 11:54:50 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu May 19 11:55:00 2016 -0700 ---------------------------------------------------------------------- .../execution/columnar/InMemoryTableScanExec.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/554e0f30/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 009fbaa..ba61940 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.columnar -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import org.apache.commons.lang.StringUtils -import org.apache.spark.{Accumulable, Accumulator} +import org.apache.spark.Accumulator import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.AccumulatorContext +import org.apache.spark.util.{AccumulatorContext, ListAccumulator} private[sql] object InMemoryRelation { @@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation( tableName: Option[String])( @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null, @transient private[sql] var _statistics: Statistics = null, - private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) + private[sql] var _batchStats: ListAccumulator[InternalRow] = null) extends logical.LeafNode with MultiInstanceRelation { override def producedAttributes: AttributeSet = outputSet - private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = + private[sql] val batchStats: ListAccumulator[InternalRow] = if (_batchStats == null) { - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) + child.sqlContext.sparkContext.listAccumulator[InternalRow] } else { _batchStats } @@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation( output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), partitionStatistics.schema) - batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum } // Statistics propagation contracts: @@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation( val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) .flatMap(_.values)) - batchStats += stats + batchStats.add(stats) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org