Repository: spark
Updated Branches:
  refs/heads/master b85d18f3b -> 252417fa2


[SPARK-15322][SQL][FOLLOWUP] Use the new long accumulator for old int 
accumulators.

## What changes were proposed in this pull request?

This PR corrects the remaining cases for using old accumulators.

This does not change some old accumulator usages below:

- `ImplicitSuite.scala` - Tests dedicated to old accumulator, for implicits 
with `AccumulatorParam`

- `AccumulatorSuite.scala` -  Tests dedicated to old accumulator

- `JavaSparkContext.scala` - For supporting old accumulators for Java API.

- `debug.package.scala` - Usage with `HashSet[String]`. Currently, it seems no 
implementation for this. I might be able to write an anonymous class for this 
but I didn't because I think it is not worth writing a lot of codes only for 
this.

- `SQLMetricsSuite.scala` - This uses the old accumulator for checking type 
boxing. It seems new accumulator does not require type boxing for this case 
whereas the old one requires (due to the use of generic).

## How was this patch tested?

Existing tests cover this.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #13434 from HyukjinKwon/accum.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/252417fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/252417fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/252417fa

Branch: refs/heads/master
Commit: 252417fa21eb47781addfd614ff00dac793b52a9
Parents: b85d18f
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Thu Jun 2 11:16:24 2016 -0500
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jun 2 11:16:24 2016 -0500

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/DistributedSuite.scala   |  5 ++---
 .../src/test/scala/org/apache/spark/repl/ReplSuite.scala |  6 +++---
 .../src/test/scala/org/apache/spark/repl/ReplSuite.scala |  6 +++---
 .../sql/execution/columnar/InMemoryTableScanExec.scala   | 11 +++++------
 .../org/apache/spark/sql/execution/debug/package.scala   |  7 ++++---
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala  |  6 +++---
 .../apache/spark/sql/execution/ui/SQLListenerSuite.scala |  4 ++--
 7 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 0be25e9..6e69fc4 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -92,8 +92,8 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
 
   test("accumulators") {
     sc = new SparkContext(clusterUrl, "test")
-    val accum = sc.accumulator(0)
-    sc.parallelize(1 to 10, 10).foreach(x => accum += x)
+    val accum = sc.longAccumulator
+    sc.parallelize(1 to 10, 10).foreach(x => accum.add(x))
     assert(accum.value === 55)
   }
 
@@ -109,7 +109,6 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
 
   test("repeatedly failing task") {
     sc = new SparkContext(clusterUrl, "test")
-    val accum = sc.accumulator(0)
     val thrown = intercept[SparkException] {
       // scalastyle:off println
       sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))

http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 547da8f..19f201f 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -107,13 +107,13 @@ class ReplSuite extends SparkFunSuite {
   test("simple foreach with accumulator") {
     val output = runInterpreter("local",
       """
-        |val accum = sc.accumulator(0)
-        |sc.parallelize(1 to 10).foreach(x => accum += x)
+        |val accum = sc.longAccumulator
+        |sc.parallelize(1 to 10).foreach(x => accum.add(x))
         |accum.value
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
-    assertContains("res1: Int = 55", output)
+    assertContains("res1: Long = 55", output)
   }
 
   test("external vars") {

http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 1256860..48582c1 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -150,13 +150,13 @@ class ReplSuite extends SparkFunSuite {
   test("simple foreach with accumulator") {
     val output = runInterpreter("local",
       """
-        |val accum = sc.accumulator(0)
-        |sc.parallelize(1 to 10).foreach(x => accum += x)
+        |val accum = sc.longAccumulator
+        |sc.parallelize(1 to 10).foreach(x => accum.add(x))
         |accum.value
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
-    assertContains("res1: Int = 55", output)
+    assertContains("res1: Long = 55", output)
   }
 
   test("external vars") {

http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 7ccc9de..bd55e1a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
 
 import org.apache.commons.lang.StringUtils
 
-import org.apache.spark.Accumulator
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -36,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, 
SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{AccumulatorContext, ListAccumulator}
+import org.apache.spark.util.{AccumulatorContext, ListAccumulator, 
LongAccumulator}
 
 
 private[sql] object InMemoryRelation {
@@ -294,8 +293,8 @@ private[sql] case class InMemoryTableScanExec(
     sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", 
"false").toBoolean
 
   // Accumulators used for testing purposes
-  lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0)
-  lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0)
+  lazy val readPartitions = sparkContext.longAccumulator
+  lazy val readBatches = sparkContext.longAccumulator
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
@@ -339,7 +338,7 @@ private[sql] case class InMemoryTableScanExec(
               false
             } else {
               if (enableAccumulators) {
-                readBatches += 1
+                readBatches.add(1)
               }
               true
             }
@@ -361,7 +360,7 @@ private[sql] case class InMemoryTableScanExec(
       val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
       columnarIterator.initialize(withMetrics, columnTypes, 
requestedColumnIndices.toArray)
       if (enableAccumulators && columnarIterator.hasNext) {
-        readPartitions += 1
+        readPartitions.add(1)
       }
       columnarIterator
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index c77c889..f2c558a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, 
CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.LongAccumulator
 
 /**
  * Contains methods for debugging query execution.
@@ -122,13 +123,13 @@ package object debug {
     /**
      * A collection of metrics for each column of output.
      *
-     * @param elementTypes the actual runtime types for the output.  Useful 
when there are bugs
+     * @param elementTypes the actual runtime types for the output. Useful 
when there are bugs
      *                     causing the wrong data to be projected.
      */
     case class ColumnMetrics(
       elementTypes: Accumulator[HashSet[String]] = 
sparkContext.accumulator(HashSet.empty))
 
-    val tupleCount: Accumulator[Int] = sparkContext.accumulator[Int](0)
+    val tupleCount: LongAccumulator = sparkContext.longAccumulator
 
     val numColumns: Int = child.output.size
     val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new 
ColumnMetrics())
@@ -149,7 +150,7 @@ package object debug {
 
           def next(): InternalRow = {
             val currentRow = iter.next()
-            tupleCount += 1
+            tupleCount.add(1)
             var i = 0
             while (i < numColumns) {
               val value = currentRow.get(i, output(i).dataType)

http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 91d9302..49a0ba1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2067,9 +2067,9 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       checkAnswer(df.selectExpr("a + 1", "a + (a + 1)"), Row(2, 3))
 
       // Identity udf that tracks the number of times it is called.
-      val countAcc = sparkContext.accumulator(0, "CallCount")
+      val countAcc = sparkContext.longAccumulator("CallCount")
       spark.udf.register("testUdf", (x: Int) => {
-        countAcc.++=(1)
+        countAcc.add(1)
         x
       })
 
@@ -2092,7 +2092,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
         df.selectExpr("testUdf(a + 1) + testUdf(1 + b)", "testUdf(a + 1)"), 
Row(4, 2), 2)
 
       val testUdf = functions.udf((x: Int) => {
-        countAcc.++=(1)
+        countAcc.add(1)
         x
       })
       verifyCallCount(

http://git-wip-us.apache.org/repos/asf/spark/blob/252417fa/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index cf7e976..6788c9d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -365,9 +365,9 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     // This task has both accumulators that are SQL metrics and accumulators 
that are not.
     // The listener should only track the ones that are actually SQL metrics.
     val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
-    val nonSqlMetric = sparkContext.accumulator[Int](0, "baseball")
+    val nonSqlMetric = sparkContext.longAccumulator("baseball")
     val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
-    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.localValue), 
None)
+    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
     val taskInfo = createTaskInfo(0, 0)
     taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)
     val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, 
null)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to