Repository: spark Updated Branches: refs/heads/branch-1.4 5aedfa2ce -> 73cf5def0
[SPARK-8306] [SQL] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state. https://issues.apache.org/jira/browse/SPARK-8306 I will try to add a test later. marmbrus aarondav Author: Yin Huai <[email protected]> Closes #6758 from yhuai/SPARK-8306 and squashes the following commits: 1292346 [Yin Huai] [SPARK-8306] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state. (cherry picked from commit 302556ff999ba9a1960281de6932e0d904197204) Signed-off-by: Michael Armbrust <[email protected]> Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73cf5def Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73cf5def Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73cf5def Branch: refs/heads/branch-1.4 Commit: 73cf5def0687bbe556542646e2b1bd569c59cd59 Parents: 5aedfa2 Author: Yin Huai <[email protected]> Authored: Wed Jun 17 14:52:43 2015 -0700 Committer: Michael Armbrust <[email protected]> Committed: Wed Jun 17 15:14:42 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/hive/client/ClientWrapper.scala | 9 ++++++ .../spark/sql/hive/execution/commands.scala | 12 ++++++-- .../src/test/resources/hive-contrib-0.13.1.jar | Bin 0 -> 114878 bytes .../sql/hive/execution/SQLQuerySuite.scala | 28 +++++++++++++++++++ 4 files changed, 46 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73cf5def/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 99aa0f1..88a8b38 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -90,6 +90,7 @@ private[hive] class ClientWrapper( } } + // Create an internal session state for this ClientWrapper. val state = { val original = Thread.currentThread().getContextClassLoader Thread.currentThread().setContextClassLoader(getClass.getClassLoader) @@ -126,8 +127,16 @@ private[hive] class ClientWrapper( */ private def withHiveState[A](f: => A): A = synchronized { val original = Thread.currentThread().getContextClassLoader + // This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not + // internally override the context class loader of the current thread with the class loader + // associated with the HiveConf in `state`. Thread.currentThread().setContextClassLoader(getClass.getClassLoader) + // Set the thread local metastore client to the client associated with this ClientWrapper. Hive.set(client) + + // Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated + // with the HiveConf in `state` to override the context class loader of the current + // thread. version match { case hive.v12 => classOf[SessionState] http://git-wip-us.apache.org/repos/asf/spark/blob/73cf5def/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 0ba94d7..9ff1526 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -91,9 +91,15 @@ case class AddJar(path: String) extends RunnableCommand { val jarURL = new java.io.File(path).toURL val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader) Thread.currentThread.setContextClassLoader(newClassLoader) - org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader) - - // Add jar to isolated hive classloader + // We need to explicitly set the class loader associated with the conf in executionHive's + // state because this class loader will be used as the context class loader of the current + // thread to execute any Hive command. + // We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get() + // returns the value of a thread local variable and its HiveConf may not be the HiveConf + // associated with `executionHive.state` (for example, HiveContext is created in one thread + // and then add jar is called from another thread). + hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader) + // Add jar to isolated hive (metadataHive) class loader. hiveContext.runSqlHive(s"ADD JAR $path") // Add jar to executors http://git-wip-us.apache.org/repos/asf/spark/blob/73cf5def/sql/hive/src/test/resources/hive-contrib-0.13.1.jar ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/resources/hive-contrib-0.13.1.jar b/sql/hive/src/test/resources/hive-contrib-0.13.1.jar new file mode 100644 index 0000000..ce0740d Binary files /dev/null and b/sql/hive/src/test/resources/hive-contrib-0.13.1.jar differ http://git-wip-us.apache.org/repos/asf/spark/blob/73cf5def/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index aba3bec..3fcb39d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -906,4 +906,32 @@ class SQLQuerySuite extends QueryTest { sql("set hive.exec.dynamic.partition.mode=strict") } } + + test("Call add jar in a different thread (SPARK-8306)") { + @volatile var error: Option[Throwable] = None + val thread = new Thread { + override def run() { + // To make sure this test works, this jar should not be loaded in another place. + TestHive.sql( + s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}") + try { + TestHive.sql( + """ + |CREATE TEMPORARY FUNCTION example_max + |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax' + """.stripMargin) + } catch { + case throwable: Throwable => + error = Some(throwable) + } + } + } + thread.start() + thread.join() + error match { + case Some(throwable) => + fail("CREATE TEMPORARY FUNCTION should not fail.", throwable) + case None => // OK + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
