Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9c817d027 -> 554e0f30a


[SPARK-15322][SQL][FOLLOW-UP] Update deprecated accumulator usage into 
accumulatorV2

## What changes were proposed in this pull request?

This PR corrects another case that uses deprecated `accumulableCollection` to 
use `listAccumulator`, which seems the previous PR missed.

Since `ArrayBuffer[InternalRow].asJava` is `java.util.List[InternalRow]`, it 
seems ok to replace the usage.

## How was this patch tested?

Related existing tests `InMemoryColumnarQuerySuite` and `CachedTableSuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #13187 from HyukjinKwon/SPARK-15322.

(cherry picked from commit f5065abf49dea0eac04b0ec219f2d832a0f6730a)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/554e0f30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/554e0f30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/554e0f30

Branch: refs/heads/branch-2.0
Commit: 554e0f30a7fd9fae4282d93ec4c0f1c6dbdffabe
Parents: 9c817d0
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Thu May 19 11:54:50 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu May 19 11:55:00 2016 -0700

----------------------------------------------------------------------
 .../execution/columnar/InMemoryTableScanExec.scala  | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/554e0f30/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 009fbaa..ba61940 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.execution.columnar
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 
 import org.apache.commons.lang.StringUtils
 
-import org.apache.spark.{Accumulable, Accumulator}
+import org.apache.spark.Accumulator
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, 
SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.{AccumulatorContext, ListAccumulator}
 
 
 private[sql] object InMemoryRelation {
@@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation(
     tableName: Option[String])(
     @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
     @transient private[sql] var _statistics: Statistics = null,
-    private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], 
InternalRow] = null)
+    private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override def producedAttributes: AttributeSet = outputSet
 
-  private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], 
InternalRow] =
+  private[sql] val batchStats: ListAccumulator[InternalRow] =
     if (_batchStats == null) {
-      
child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
+      child.sqlContext.sparkContext.listAccumulator[InternalRow]
     } else {
       _batchStats
     }
@@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation(
         output.map(a => 
partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
         partitionStatistics.schema)
 
-    batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
+    batchStats.value.asScala.map(row => 
sizeOfRow.eval(row).asInstanceOf[Long]).sum
   }
 
   // Statistics propagation contracts:
@@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation(
           val stats = 
InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
                         .flatMap(_.values))
 
-          batchStats += stats
+          batchStats.add(stats)
           CachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to