Repository: spark
Updated Branches:
  refs/heads/branch-2.0 796dd1514 -> ffbc6b796


[SPARK-15860] Metrics for codegen size and perf

## What changes were proposed in this pull request?

Adds codahale metrics for the codegen source text size and how long it takes to 
compile. The size is particularly interesting, since the JVM does have hard 
limits on how large methods can get.

To simplify, I added the metrics under a statically-initialized source that is 
always registered with SparkEnv.

## How was this patch tested?

Unit tests

Author: Eric Liang <[email protected]>

Closes #13586 from ericl/spark-15860.

(cherry picked from commit e1f986c7a3fcc3864d53ef99ef7f14fa4d262ac3)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffbc6b79
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffbc6b79
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffbc6b79

Branch: refs/heads/branch-2.0
Commit: ffbc6b796591d3e1f3dcb950335871b7826e6b3b
Parents: 796dd15
Author: Eric Liang <[email protected]>
Authored: Sat Jun 11 23:16:21 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Jun 11 23:16:28 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/metrics/MetricsSystem.scala    |  3 +-
 .../spark/metrics/source/StaticSources.scala    | 50 ++++++++++++++++++++
 .../spark/metrics/MetricsSystemSuite.scala      |  8 ++--
 .../expressions/codegen/CodeGenerator.scala     |  3 ++
 .../expressions/CodeGenerationSuite.scala       |  9 ++++
 5 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ffbc6b79/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 0fed991..9b16c11 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -28,7 +28,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
-import org.apache.spark.metrics.source.Source
+import org.apache.spark.metrics.source.{Source, StaticSources}
 import org.apache.spark.util.Utils
 
 /**
@@ -96,6 +96,7 @@ private[spark] class MetricsSystem private (
   def start() {
     require(!running, "Attempting to start a MetricsSystem that is already 
running")
     running = true
+    StaticSources.allSources.foreach(registerSource)
     registerSources()
     registerSinks()
     sinks.foreach(_.start)

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbc6b79/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala 
b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
new file mode 100644
index 0000000..6819222
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.annotation.Experimental
+
+private[spark] object StaticSources {
+  /**
+   * The set of all static sources. These sources may be reported to from any 
class, including
+   * static classes, without requiring reference to a SparkEnv.
+   */
+  val allSources = Seq(CodegenMetrics)
+}
+
+/**
+ * :: Experimental ::
+ * Metrics for code generation.
+ */
+@Experimental
+object CodegenMetrics extends Source {
+  override val sourceName: String = "CodeGenerator"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  /**
+   * Histogram of the length of source code text compiled by CodeGenerator (in 
characters).
+   */
+  val METRIC_SOURCE_CODE_SIZE = 
metricRegistry.histogram(MetricRegistry.name("sourceCodeSize"))
+
+  /**
+   * Histogram of the time it took to compile source code text (in 
milliseconds).
+   */
+  val METRIC_COMPILATION_TIME = 
metricRegistry.histogram(MetricRegistry.name("compilationTime"))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbc6b79/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 5d85542..2400832 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.master.MasterSource
-import org.apache.spark.metrics.source.Source
+import org.apache.spark.metrics.source.{Source, StaticSources}
 
 class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with 
PrivateMethodTester{
   var filePath: String = _
@@ -43,7 +43,7 @@ class MetricsSystemSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateM
     val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
     val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
 
-    assert(metricsSystem.invokePrivate(sources()).length === 0)
+    assert(metricsSystem.invokePrivate(sources()).length === 
StaticSources.allSources.length)
     assert(metricsSystem.invokePrivate(sinks()).length === 0)
     assert(metricsSystem.getServletHandlers.nonEmpty)
   }
@@ -54,13 +54,13 @@ class MetricsSystemSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateM
     val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
     val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
 
-    assert(metricsSystem.invokePrivate(sources()).length === 0)
+    assert(metricsSystem.invokePrivate(sources()).length === 
StaticSources.allSources.length)
     assert(metricsSystem.invokePrivate(sinks()).length === 1)
     assert(metricsSystem.getServletHandlers.nonEmpty)
 
     val source = new MasterSource(null)
     metricsSystem.registerSource(source)
-    assert(metricsSystem.invokePrivate(sources()).length === 1)
+    assert(metricsSystem.invokePrivate(sources()).length === 
StaticSources.allSources.length + 1)
   }
 
   test("MetricsSystem with Driver instance") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbc6b79/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ca20292..ff97cd3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -26,6 +26,7 @@ import scala.language.existentials
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.CodegenMetrics
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
@@ -886,6 +887,8 @@ object CodeGenerator extends Logging {
           val result = doCompile(code)
           val endTime = System.nanoTime()
           def timeMs: Double = (endTime - startTime).toDouble / 1000000
+          CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
+          CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
           logInfo(s"Code generated in $timeMs ms")
           result
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbc6b79/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 8ffe390..62429a2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.metrics.source.CodegenMetrics
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -49,6 +50,14 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     futures.foreach(ThreadUtils.awaitResult(_, 10.seconds))
   }
 
+  test("metrics are recorded on compile") {
+    val startCount1 = CodegenMetrics.METRIC_COMPILATION_TIME.getCount()
+    val startCount2 = CodegenMetrics.METRIC_SOURCE_CODE_SIZE.getCount()
+    GenerateOrdering.generate(Add(Literal(123), Literal(1)).asc :: Nil)
+    assert(CodegenMetrics.METRIC_COMPILATION_TIME.getCount() == startCount1 + 
1)
+    assert(CodegenMetrics.METRIC_SOURCE_CODE_SIZE.getCount() == startCount2 + 
1)
+  }
+
   test("SPARK-8443: split wide projections into blocks due to JVM code size 
limit") {
     val length = 5000
     val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to