Repository: spark
Updated Branches:
  refs/heads/master cf2e0ae72 -> 9a430a027


[SPARK-11068] [SQL] [FOLLOW-UP] move execution listener to util

Author: Wenchen Fan <[email protected]>

Closes #9119 from cloud-fan/callback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a430a02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a430a02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a430a02

Branch: refs/heads/master
Commit: 9a430a027faafb083ca569698effb697af26a1db
Parents: cf2e0ae
Author: Wenchen Fan <[email protected]>
Authored: Wed Oct 14 15:08:13 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Oct 14 15:08:13 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/QueryExecutionListener.scala      | 136 -------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |   1 +
 .../spark/sql/util/QueryExecutionListener.scala | 136 +++++++++++++++++++
 .../spark/sql/DataFrameCallbackSuite.scala      |  82 -----------
 .../spark/sql/util/DataFrameCallbackSuite.scala |  83 +++++++++++
 5 files changed, 220 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala
deleted file mode 100644
index 14fbebb..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/QueryExecutionListener.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.collection.mutable.ListBuffer
-
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.Logging
-import org.apache.spark.sql.execution.QueryExecution
-
-
-/**
- * The interface of query execution listener that can be used to analyze 
execution metrics.
- *
- * Note that implementations should guarantee thread-safety as they will be 
used in a non
- * thread-safe way.
- */
-@Experimental
-trait QueryExecutionListener {
-
-  /**
-   * A callback function that will be called when a query executed 
successfully.
-   * Implementations should guarantee thread-safe.
-   *
-   * @param funcName the name of the action that triggered this query.
-   * @param qe the QueryExecution object that carries detail information like 
logical plan,
-   *           physical plan, etc.
-   * @param duration the execution time for this query in nanoseconds.
-   */
-  @DeveloperApi
-  def onSuccess(funcName: String, qe: QueryExecution, duration: Long)
-
-  /**
-   * A callback function that will be called when a query execution failed.
-   * Implementations should guarantee thread-safe.
-   *
-   * @param funcName the name of the action that triggered this query.
-   * @param qe the QueryExecution object that carries detail information like 
logical plan,
-   *           physical plan, etc.
-   * @param exception the exception that failed this query.
-   */
-  @DeveloperApi
-  def onFailure(funcName: String, qe: QueryExecution, exception: Exception)
-}
-
-@Experimental
-class ExecutionListenerManager extends Logging {
-  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
-  private[this] val lock = new ReentrantReadWriteLock()
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-    val rl = lock.readLock()
-    rl.lock()
-    try f finally {
-      rl.unlock()
-    }
-  }
-
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-    val wl = lock.writeLock()
-    wl.lock()
-    try f finally {
-      wl.unlock()
-    }
-  }
-
-  /**
-   * Registers the specified QueryExecutionListener.
-   */
-  @DeveloperApi
-  def register(listener: QueryExecutionListener): Unit = writeLock {
-    listeners += listener
-  }
-
-  /**
-   * Unregisters the specified QueryExecutionListener.
-   */
-  @DeveloperApi
-  def unregister(listener: QueryExecutionListener): Unit = writeLock {
-    listeners -= listener
-  }
-
-  /**
-   * clears out all registered QueryExecutionListeners.
-   */
-  @DeveloperApi
-  def clear(): Unit = writeLock {
-    listeners.clear()
-  }
-
-  private[sql] def onSuccess(
-      funcName: String,
-      qe: QueryExecution,
-      duration: Long): Unit = readLock {
-    withErrorHandling { listener =>
-      listener.onSuccess(funcName, qe, duration)
-    }
-  }
-
-  private[sql] def onFailure(
-      funcName: String,
-      qe: QueryExecution,
-      exception: Exception): Unit = readLock {
-    withErrorHandling { listener =>
-      listener.onFailure(funcName, qe, exception)
-    }
-  }
-
-  private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = {
-    for (listener <- listeners) {
-      try {
-        f(listener)
-      } catch {
-        case e: Exception => logWarning("error executing query execution 
listener", e)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a835408..3d5e35a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{execution => sparkexecution}
+import org.apache.spark.sql.util.ExecutionListenerManager
 import org.apache.spark.util.Utils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
new file mode 100644
index 0000000..909a8ab
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.Logging
+import org.apache.spark.sql.execution.QueryExecution
+
+
+/**
+ * The interface of query execution listener that can be used to analyze 
execution metrics.
+ *
+ * Note that implementations should guarantee thread-safety as they will be 
used in a non
+ * thread-safe way.
+ */
+@Experimental
+trait QueryExecutionListener {
+
+  /**
+   * A callback function that will be called when a query executed 
successfully.
+   * Implementations should guarantee thread-safe.
+   *
+   * @param funcName the name of the action that triggered this query.
+   * @param qe the QueryExecution object that carries detail information like 
logical plan,
+   *           physical plan, etc.
+   * @param duration the execution time for this query in nanoseconds.
+   */
+  @DeveloperApi
+  def onSuccess(funcName: String, qe: QueryExecution, duration: Long)
+
+  /**
+   * A callback function that will be called when a query execution failed.
+   * Implementations should guarantee thread-safe.
+   *
+   * @param funcName the name of the action that triggered this query.
+   * @param qe the QueryExecution object that carries detail information like 
logical plan,
+   *           physical plan, etc.
+   * @param exception the exception that failed this query.
+   */
+  @DeveloperApi
+  def onFailure(funcName: String, qe: QueryExecution, exception: Exception)
+}
+
+@Experimental
+class ExecutionListenerManager extends Logging {
+  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
+  private[this] val lock = new ReentrantReadWriteLock()
+
+  /** Acquires a read lock on the cache for the duration of `f`. */
+  private def readLock[A](f: => A): A = {
+    val rl = lock.readLock()
+    rl.lock()
+    try f finally {
+      rl.unlock()
+    }
+  }
+
+  /** Acquires a write lock on the cache for the duration of `f`. */
+  private def writeLock[A](f: => A): A = {
+    val wl = lock.writeLock()
+    wl.lock()
+    try f finally {
+      wl.unlock()
+    }
+  }
+
+  /**
+   * Registers the specified QueryExecutionListener.
+   */
+  @DeveloperApi
+  def register(listener: QueryExecutionListener): Unit = writeLock {
+    listeners += listener
+  }
+
+  /**
+   * Unregisters the specified QueryExecutionListener.
+   */
+  @DeveloperApi
+  def unregister(listener: QueryExecutionListener): Unit = writeLock {
+    listeners -= listener
+  }
+
+  /**
+   * clears out all registered QueryExecutionListeners.
+   */
+  @DeveloperApi
+  def clear(): Unit = writeLock {
+    listeners.clear()
+  }
+
+  private[sql] def onSuccess(
+      funcName: String,
+      qe: QueryExecution,
+      duration: Long): Unit = readLock {
+    withErrorHandling { listener =>
+      listener.onSuccess(funcName, qe, duration)
+    }
+  }
+
+  private[sql] def onFailure(
+      funcName: String,
+      qe: QueryExecution,
+      exception: Exception): Unit = readLock {
+    withErrorHandling { listener =>
+      listener.onFailure(funcName, qe, exception)
+    }
+  }
+
+  private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = {
+    for (listener <- listeners) {
+      try {
+        f(listener)
+      } catch {
+        case e: Exception => logWarning("error executing query execution 
listener", e)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala
deleted file mode 100644
index 4e286a0..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCallbackSuite.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.test.SharedSQLContext
-
-import scala.collection.mutable.ArrayBuffer
-
-class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
-  import testImplicits._
-  import functions._
-
-  test("execute callback functions when a DataFrame action finished 
successfully") {
-    val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)]
-    val listener = new QueryExecutionListener {
-      // Only test successful case here, so no need to implement `onFailure`
-      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
-
-      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
-        metrics += ((funcName, qe, duration))
-      }
-    }
-    sqlContext.listenerManager.register(listener)
-
-    val df = Seq(1 -> "a").toDF("i", "j")
-    df.select("i").collect()
-    df.filter($"i" > 0).count()
-
-    assert(metrics.length == 2)
-
-    assert(metrics(0)._1 == "collect")
-    assert(metrics(0)._2.analyzed.isInstanceOf[Project])
-    assert(metrics(0)._3 > 0)
-
-    assert(metrics(1)._1 == "count")
-    assert(metrics(1)._2.analyzed.isInstanceOf[Aggregate])
-    assert(metrics(1)._3 > 0)
-  }
-
-  test("execute callback functions when a DataFrame action failed") {
-    val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
-    val listener = new QueryExecutionListener {
-      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
-        metrics += ((funcName, qe, exception))
-      }
-
-      // Only test failed case here, so no need to implement `onSuccess`
-      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {}
-    }
-    sqlContext.listenerManager.register(listener)
-
-    val errorUdf = udf[Int, Int] { _ => throw new RuntimeException("udf 
error") }
-    val df = sparkContext.makeRDD(Seq(1 -> "a")).toDF("i", "j")
-
-    // Ignore the log when we are expecting an exception.
-    sparkContext.setLogLevel("FATAL")
-    val e = intercept[SparkException](df.select(errorUdf($"i")).collect())
-
-    assert(metrics.length == 1)
-    assert(metrics(0)._1 == "collect")
-    assert(metrics(0)._2.analyzed.isInstanceOf[Project])
-    assert(metrics(0)._3.getMessage == e.getMessage)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a430a02/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
new file mode 100644
index 0000000..eb056cd
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{functions, QueryTest}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.test.SharedSQLContext
+
+import scala.collection.mutable.ArrayBuffer
+
+class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+  import functions._
+
+  test("execute callback functions when a DataFrame action finished 
successfully") {
+    val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)]
+    val listener = new QueryExecutionListener {
+      // Only test successful case here, so no need to implement `onFailure`
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
+        metrics += ((funcName, qe, duration))
+      }
+    }
+    sqlContext.listenerManager.register(listener)
+
+    val df = Seq(1 -> "a").toDF("i", "j")
+    df.select("i").collect()
+    df.filter($"i" > 0).count()
+
+    assert(metrics.length == 2)
+
+    assert(metrics(0)._1 == "collect")
+    assert(metrics(0)._2.analyzed.isInstanceOf[Project])
+    assert(metrics(0)._3 > 0)
+
+    assert(metrics(1)._1 == "count")
+    assert(metrics(1)._2.analyzed.isInstanceOf[Aggregate])
+    assert(metrics(1)._3 > 0)
+  }
+
+  test("execute callback functions when a DataFrame action failed") {
+    val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
+    val listener = new QueryExecutionListener {
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+        metrics += ((funcName, qe, exception))
+      }
+
+      // Only test failed case here, so no need to implement `onSuccess`
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {}
+    }
+    sqlContext.listenerManager.register(listener)
+
+    val errorUdf = udf[Int, Int] { _ => throw new RuntimeException("udf 
error") }
+    val df = sparkContext.makeRDD(Seq(1 -> "a")).toDF("i", "j")
+
+    // Ignore the log when we are expecting an exception.
+    sparkContext.setLogLevel("FATAL")
+    val e = intercept[SparkException](df.select(errorUdf($"i")).collect())
+
+    assert(metrics.length == 1)
+    assert(metrics(0)._1 == "collect")
+    assert(metrics(0)._2.analyzed.isInstanceOf[Project])
+    assert(metrics(0)._3.getMessage == e.getMessage)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to