Repository: spark
Updated Branches:
  refs/heads/master d6ddfdf60 -> 142f6d149


[SPARK-20048][SQL] Cloning SessionState does not clone query execution listeners

## What changes were proposed in this pull request?

Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826)
Cloning SessionState does not clone query execution listeners, so cloned 
session is unable to listen to events on queries.

## How was this patch tested?

- Unit test

Author: Kunal Khamar <[email protected]>

Closes #17379 from kunalkhamar/clone-bugfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/142f6d14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/142f6d14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/142f6d14

Branch: refs/heads/master
Commit: 142f6d14928c780cc9e8d6d7749c5d7c08a30972
Parents: d6ddfdf
Author: Kunal Khamar <[email protected]>
Authored: Wed Mar 29 12:35:19 2017 -0700
Committer: Herman van Hovell <[email protected]>
Committed: Wed Mar 29 12:35:19 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/SparkSession.scala     |  22 +-
 .../sql/internal/BaseSessionStateBuilder.scala  | 305 +++++++++++++++++++
 .../spark/sql/internal/SessionState.scala       |  38 +--
 .../sql/internal/sessionStateBuilders.scala     | 285 -----------------
 .../spark/sql/util/QueryExecutionListener.scala |  10 +
 .../apache/spark/sql/SessionStateSuite.scala    |  53 ++++
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |   2 +-
 .../spark/sql/hive/HiveSessionState.scala       | 141 ---------
 .../sql/hive/HiveSessionStateBuilder.scala      | 129 ++++++++
 .../apache/spark/sql/hive/test/TestHive.scala   |   2 +-
 .../spark/sql/hive/HiveSessionStateSuite.scala  |   2 +-
 11 files changed, 522 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 4956257..a972978 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -38,7 +38,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, CatalogImpl, 
SessionState, SessionStateBuilder, SharedState}
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.streaming._
@@ -194,7 +194,7 @@ class SparkSession private(
    *
    * @since 2.0.0
    */
-  def udf: UDFRegistration = sessionState.udf
+  def udf: UDFRegistration = sessionState.udfRegistration
 
   /**
    * :: Experimental ::
@@ -990,28 +990,28 @@ object SparkSession {
   /** Reference to the root SparkSession. */
   private val defaultSession = new AtomicReference[SparkSession]
 
-  private val HIVE_SESSION_STATE_CLASS_NAME = 
"org.apache.spark.sql.hive.HiveSessionState"
+  private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
+    "org.apache.spark.sql.hive.HiveSessionStateBuilder"
 
   private def sessionStateClassName(conf: SparkConf): String = {
     conf.get(CATALOG_IMPLEMENTATION) match {
-      case "hive" => HIVE_SESSION_STATE_CLASS_NAME
-      case "in-memory" => classOf[SessionState].getCanonicalName
+      case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
+      case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
     }
   }
 
   /**
    * Helper method to create an instance of `SessionState` based on 
`className` from conf.
-   * The result is either `SessionState` or `HiveSessionState`.
+   * The result is either `SessionState` or a Hive based `SessionState`.
    */
   private def instantiateSessionState(
       className: String,
       sparkSession: SparkSession): SessionState = {
-
     try {
-      // get `SessionState.apply(SparkSession)`
+      // invoke `new [Hive]SessionStateBuilder(SparkSession, 
Option[SessionState])`
       val clazz = Utils.classForName(className)
-      val method = clazz.getMethod("apply", sparkSession.getClass)
-      method.invoke(null, sparkSession).asInstanceOf[SessionState]
+      val ctor = clazz.getConstructors.head
+      ctor.newInstance(sparkSession, 
None).asInstanceOf[BaseSessionStateBuilder].build()
     } catch {
       case NonFatal(e) =>
         throw new IllegalArgumentException(s"Error while instantiating 
'$className':", e)
@@ -1023,7 +1023,7 @@ object SparkSession {
    */
   private[spark] def hiveClassesArePresent: Boolean = {
     try {
-      Utils.classForName(HIVE_SESSION_STATE_CLASS_NAME)
+      Utils.classForName(HIVE_SESSION_STATE_BUILDER_CLASS_NAME)
       Utils.classForName("org.apache.hadoop.hive.conf.HiveConf")
       true
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
new file mode 100644
index 0000000..2b14eca
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy, 
UDFRegistration}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, 
SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.streaming.StreamingQueryManager
+import org.apache.spark.sql.util.ExecutionListenerManager
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session state, 
and creates a session
+ * state when `build` is called. Components should only be initialized once. 
This is not a problem
+ * for most components as they are only used in the `build` function. However 
some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & 
`sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are 
defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of 
components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some 
dependencies between the
+ * components (they are documented per dependency), a developer should respect 
these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new 
[[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`, `functionRegistry`, 
`experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is called, 
and not before.
+ */
+@Experimental
[email protected]
+abstract class BaseSessionStateBuilder(
+    val session: SparkSession,
+    val parentState: Option[SessionState] = None) {
+  type NewBuilder = (SparkSession, Option[SessionState]) => 
BaseSessionStateBuilder
+
+  /**
+   * Function that produces a new instance of the SessionStateBuilder. This is 
used by the
+   * [[SessionState]]'s clone functionality. Make sure to override this when 
implementing your own
+   * [[SessionStateBuilder]].
+   */
+  protected def newBuilder: NewBuilder
+
+  /**
+   * Extract entries from `SparkConf` and put them in the `SQLConf`
+   */
+  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = 
{
+    sparkConf.getAll.foreach { case (k, v) =>
+      sqlConf.setConfString(k, v)
+    }
+  }
+
+  /**
+   * SQL-specific key-value configurations.
+   *
+   * These either get cloned from a pre-existing instance or newly created. 
The conf is always
+   * merged with its [[SparkConf]].
+   */
+  protected lazy val conf: SQLConf = {
+    val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
+    mergeSparkConf(conf, session.sparkContext.conf)
+    conf
+  }
+
+  /**
+   * Internal catalog managing functions registered by the user.
+   *
+   * This either gets cloned from a pre-existing version or cloned from the 
built-in registry.
+   */
+  protected lazy val functionRegistry: FunctionRegistry = {
+    
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
+  }
+
+  /**
+   * Experimental methods that can be used to define custom optimization rules 
and custom planning
+   * strategies.
+   *
+   * This either gets cloned from a pre-existing version or newly created.
+   */
+  protected lazy val experimentalMethods: ExperimentalMethods = {
+    parentState.map(_.experimentalMethods.clone()).getOrElse(new 
ExperimentalMethods)
+  }
+
+  /**
+   * Parser that extracts expressions, plans, table identifiers etc. from SQL 
texts.
+   *
+   * Note: this depends on the `conf` field.
+   */
+  protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
+
+  /**
+   * ResourceLoader that is used to load function resources and jars.
+   */
+  protected lazy val resourceLoader: SessionResourceLoader = new 
SessionResourceLoader(session)
+
+  /**
+   * Catalog for managing table and database states. If there is a 
pre-existing catalog, the state
+   * of that catalog (temp tables & current database) will be copied into the 
new catalog.
+   *
+   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` 
fields.
+   */
+  protected lazy val catalog: SessionCatalog = {
+    val catalog = new SessionCatalog(
+      session.sharedState.externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  /**
+   * Interface exposed to the user for registering user-defined functions.
+   *
+   * Note 1: The user-defined functions must be deterministic.
+   * Note 2: This depends on the `functionRegistry` field.
+   */
+  protected def udfRegistration: UDFRegistration = new 
UDFRegistration(functionRegistry)
+
+  /**
+   * Logical query plan analyzer for resolving unresolved attributes and 
relations.
+   *
+   * Note: this depends on the `conf` and `catalog` fields.
+   */
+  protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+      new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        customResolutionRules
+
+    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+      PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        customPostHocResolutionRules
+
+    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck +:
+        HiveOnlyCheck +:
+        customCheckRules
+  }
+
+  /**
+   * Custom resolution rules to add to the Analyzer. Prefer overriding this 
instead of creating
+   * your own Analyzer.
+   *
+   * Note that this may NOT depend on the `analyzer` function.
+   */
+  protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Custom post resolution rules to add to the Analyzer. Prefer overriding 
this instead of
+   * creating your own Analyzer.
+   *
+   * Note that this may NOT depend on the `analyzer` function.
+   */
+  protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Custom check rules to add to the Analyzer. Prefer overriding this instead 
of creating
+   * your own Analyzer.
+   *
+   * Note that this may NOT depend on the `analyzer` function.
+   */
+  protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil
+
+  /**
+   * Logical query plan optimizer.
+   *
+   * Note: this depends on the `conf`, `catalog` and `experimentalMethods` 
fields.
+   */
+  protected def optimizer: Optimizer = {
+    new SparkOptimizer(catalog, conf, experimentalMethods) {
+      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
+        super.extendedOperatorOptimizationRules ++ 
customOperatorOptimizationRules
+    }
+  }
+
+  /**
+   * Custom operator optimization rules to add to the Optimizer. Prefer 
overriding this instead
+   * of creating your own Optimizer.
+   *
+   * Note that this may NOT depend on the `optimizer` function.
+   */
+  protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Planner that converts optimized logical plans to physical plans.
+   *
+   * Note: this depends on the `conf` and `experimentalMethods` fields.
+   */
+  protected def planner: SparkPlanner = {
+    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
+      override def extraPlanningStrategies: Seq[Strategy] =
+        super.extraPlanningStrategies ++ customPlanningStrategies
+    }
+  }
+
+  /**
+   * Custom strategies to add to the planner. Prefer overriding this instead 
of creating
+   * your own Planner.
+   *
+   * Note that this may NOT depend on the `planner` function.
+   */
+  protected def customPlanningStrategies: Seq[Strategy] = Nil
+
+  /**
+   * Create a query execution object.
+   */
+  protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
+    new QueryExecution(session, plan)
+  }
+
+  /**
+   * Interface to start and stop streaming queries.
+   */
+  protected def streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(session)
+
+  /**
+   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
+   * that listen for execution metrics.
+   *
+   * This gets cloned from parent if available, otherwise is a new instance is 
created.
+   */
+  protected def listenerManager: ExecutionListenerManager = {
+    parentState.map(_.listenerManager.clone()).getOrElse(new 
ExecutionListenerManager)
+  }
+
+  /**
+   * Function used to make clones of the session state.
+   */
+  protected def createClone: (SparkSession, SessionState) => SessionState = {
+    val createBuilder = newBuilder
+    (session, state) => createBuilder(session, Option(state)).build()
+  }
+
+  /**
+   * Build the [[SessionState]].
+   */
+  def build(): SessionState = {
+    new SessionState(
+      session.sharedState,
+      conf,
+      experimentalMethods,
+      functionRegistry,
+      udfRegistration,
+      catalog,
+      sqlParser,
+      analyzer,
+      optimizer,
+      planner,
+      streamingQueryManager,
+      listenerManager,
+      resourceLoader,
+      createQueryExecution,
+      createClone)
+  }
+}
+
+/**
+ * Helper class for using SessionStateBuilders during tests.
+ */
+private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
+  def overrideConfs: Map[String, String]
+
+  override protected lazy val conf: SQLConf = {
+    val conf = parentState.map(_.conf.clone()).getOrElse {
+      new SQLConf {
+        clear()
+        override def clear(): Unit = {
+          super.clear()
+          // Make sure we start with the default test configs even after clear
+          overrideConfs.foreach { case (key, value) => setConfString(key, 
value) }
+        }
+      }
+    }
+    mergeSparkConf(conf, session.sparkContext.conf)
+    conf
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c6241d9..1b341a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -32,43 +32,46 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.streaming.StreamingQueryManager
-import org.apache.spark.sql.util.ExecutionListenerManager
+import org.apache.spark.sql.util.{ExecutionListenerManager, 
QueryExecutionListener}
 
 /**
  * A class that holds all session-specific state in a given [[SparkSession]].
  *
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
+ * @param sharedState The state shared across sessions, e.g. global view 
manager, external catalog.
  * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
+ * @param experimentalMethods Interface to add custom planning strategies and 
optimizers.
  * @param functionRegistry Internal catalog for managing functions registered 
by the user.
+ * @param udfRegistration Interface exposed to the user for registering 
user-defined functions.
  * @param catalog Internal catalog for managing table and database states.
  * @param sqlParser Parser that extracts expressions, plans, table identifiers 
etc. from SQL texts.
  * @param analyzer Logical query plan analyzer for resolving unresolved 
attributes and relations.
  * @param optimizer Logical query plan optimizer.
- * @param planner Planner that converts optimized logical plans to physical 
plans
+ * @param planner Planner that converts optimized logical plans to physical 
plans.
  * @param streamingQueryManager Interface to start and stop streaming queries.
+ * @param listenerManager Interface to register custom 
[[QueryExecutionListener]]s.
+ * @param resourceLoader Session shared resource loader to load JARs, files, 
etc.
  * @param createQueryExecution Function used to create QueryExecution objects.
  * @param createClone Function used to create clones of the session state.
  */
 private[sql] class SessionState(
-    sparkContext: SparkContext,
     sharedState: SharedState,
     val conf: SQLConf,
     val experimentalMethods: ExperimentalMethods,
     val functionRegistry: FunctionRegistry,
+    val udfRegistration: UDFRegistration,
     val catalog: SessionCatalog,
     val sqlParser: ParserInterface,
     val analyzer: Analyzer,
     val optimizer: Optimizer,
     val planner: SparkPlanner,
     val streamingQueryManager: StreamingQueryManager,
+    val listenerManager: ExecutionListenerManager,
     val resourceLoader: SessionResourceLoader,
     createQueryExecution: LogicalPlan => QueryExecution,
     createClone: (SparkSession, SessionState) => SessionState) {
 
   def newHadoopConf(): Configuration = SessionState.newHadoopConf(
-    sparkContext.hadoopConfiguration,
+    sharedState.sparkContext.hadoopConfiguration,
     conf)
 
   def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
@@ -82,18 +85,6 @@ private[sql] class SessionState(
   }
 
   /**
-   * Interface exposed to the user for registering user-defined functions.
-   * Note that the user-defined functions must be deterministic.
-   */
-  val udf: UDFRegistration = new UDFRegistration(functionRegistry)
-
-  /**
-   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
-   * that listen for execution metrics.
-   */
-  val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
-
-  /**
    * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
    */
   def clone(newSparkSession: SparkSession): SessionState = 
createClone(newSparkSession, this)
@@ -110,13 +101,6 @@ private[sql] class SessionState(
 }
 
 private[sql] object SessionState {
-  /**
-   * Create a new [[SessionState]] for the given session.
-   */
-  def apply(session: SparkSession): SessionState = {
-    new SessionStateBuilder(session).build()
-  }
-
   def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
     val newHadoopConf = new Configuration(hadoopConf)
     sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
@@ -155,7 +139,7 @@ class SessionResourceLoader(session: SparkSession) extends 
FunctionResourceLoade
   /**
    * Add a jar path to [[SparkContext]] and the classloader.
    *
-   * Note: this method seems not access any session state, but the subclass 
`HiveSessionState` needs
+   * Note: this method seems not access any session state, but a Hive based 
`SessionState` needs
    * to add the jar to its hive client for the current session. Hence, it 
still needs to be in
    * [[SessionState]].
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
deleted file mode 100644
index b8f645f..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.internal
-
-import org.apache.spark.SparkConf
-import org.apache.spark.annotation.{Experimental, InterfaceStability}
-import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, 
SparkPlanner, SparkSqlParser}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.streaming.StreamingQueryManager
-
-/**
- * Builder class that coordinates construction of a new [[SessionState]].
- *
- * The builder explicitly defines all components needed by the session state, 
and creates a session
- * state when `build` is called. Components should only be initialized once. 
This is not a problem
- * for most components as they are only used in the `build` function. However 
some components
- * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & 
`sqlParser`) are as dependencies
- * for other components and are shared as a result. These components are 
defined as lazy vals to
- * make sure the component is created only once.
- *
- * A developer can modify the builder by providing custom versions of 
components, or by using the
- * hooks provided for the analyzer, optimizer & planner. There are some 
dependencies between the
- * components (they are documented per dependency), a developer should respect 
these when making
- * modifications in order to prevent initialization problems.
- *
- * A parent [[SessionState]] can be used to initialize the new 
[[SessionState]]. The new session
- * state will clone the parent sessions state's `conf`, `functionRegistry`, 
`experimentalMethods`
- * and `catalog` fields. Note that the state is cloned when `build` is called, 
and not before.
- */
-@Experimental
[email protected]
-abstract class BaseSessionStateBuilder(
-    val session: SparkSession,
-    val parentState: Option[SessionState] = None) {
-  type NewBuilder = (SparkSession, Option[SessionState]) => 
BaseSessionStateBuilder
-
-  /**
-   * Function that produces a new instance of the SessionStateBuilder. This is 
used by the
-   * [[SessionState]]'s clone functionality. Make sure to override this when 
implementing your own
-   * [[SessionStateBuilder]].
-   */
-  protected def newBuilder: NewBuilder
-
-  /**
-   * Extract entries from `SparkConf` and put them in the `SQLConf`
-   */
-  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = 
{
-    sparkConf.getAll.foreach { case (k, v) =>
-      sqlConf.setConfString(k, v)
-    }
-  }
-
-  /**
-   * SQL-specific key-value configurations.
-   *
-   * These either get cloned from a pre-existing instance or newly created. 
The conf is always
-   * merged with its [[SparkConf]].
-   */
-  protected lazy val conf: SQLConf = {
-    val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
-    mergeSparkConf(conf, session.sparkContext.conf)
-    conf
-  }
-
-  /**
-   * Internal catalog managing functions registered by the user.
-   *
-   * This either gets cloned from a pre-existing version or cloned from the 
built-in registry.
-   */
-  protected lazy val functionRegistry: FunctionRegistry = {
-    
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
-  }
-
-  /**
-   * Experimental methods that can be used to define custom optimization rules 
and custom planning
-   * strategies.
-   *
-   * This either gets cloned from a pre-existing version or newly created.
-   */
-  protected lazy val experimentalMethods: ExperimentalMethods = {
-    parentState.map(_.experimentalMethods.clone()).getOrElse(new 
ExperimentalMethods)
-  }
-
-  /**
-   * Parser that extracts expressions, plans, table identifiers etc. from SQL 
texts.
-   *
-   * Note: this depends on the `conf` field.
-   */
-  protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
-
-  /**
-   * ResourceLoader that is used to load function resources and jars.
-   */
-  protected lazy val resourceLoader: SessionResourceLoader = new 
SessionResourceLoader(session)
-
-  /**
-   * Catalog for managing table and database states. If there is a 
pre-existing catalog, the state
-   * of that catalog (temp tables & current database) will be copied into the 
new catalog.
-   *
-   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` 
fields.
-   */
-  protected lazy val catalog: SessionCatalog = {
-    val catalog = new SessionCatalog(
-      session.sharedState.externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  /**
-   * Logical query plan analyzer for resolving unresolved attributes and 
relations.
-   *
-   * Note: this depends on the `conf` and `catalog` fields.
-   */
-  protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
-    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-      new FindDataSourceTable(session) +:
-        new ResolveSQLOnFile(session) +:
-        customResolutionRules
-
-    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-      PreprocessTableCreation(session) +:
-        PreprocessTableInsertion(conf) +:
-        DataSourceAnalysis(conf) +:
-        customPostHocResolutionRules
-
-    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-      PreWriteCheck +:
-        HiveOnlyCheck +:
-        customCheckRules
-  }
-
-  /**
-   * Custom resolution rules to add to the Analyzer. Prefer overriding this 
instead of creating
-   * your own Analyzer.
-   *
-   * Note that this may NOT depend on the `analyzer` function.
-   */
-  protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil
-
-  /**
-   * Custom post resolution rules to add to the Analyzer. Prefer overriding 
this instead of
-   * creating your own Analyzer.
-   *
-   * Note that this may NOT depend on the `analyzer` function.
-   */
-  protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
-
-  /**
-   * Custom check rules to add to the Analyzer. Prefer overriding this instead 
of creating
-   * your own Analyzer.
-   *
-   * Note that this may NOT depend on the `analyzer` function.
-   */
-  protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil
-
-  /**
-   * Logical query plan optimizer.
-   *
-   * Note: this depends on the `conf`, `catalog` and `experimentalMethods` 
fields.
-   */
-  protected def optimizer: Optimizer = {
-    new SparkOptimizer(catalog, conf, experimentalMethods) {
-      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
-        super.extendedOperatorOptimizationRules ++ 
customOperatorOptimizationRules
-    }
-  }
-
-  /**
-   * Custom operator optimization rules to add to the Optimizer. Prefer 
overriding this instead
-   * of creating your own Optimizer.
-   *
-   * Note that this may NOT depend on the `optimizer` function.
-   */
-  protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
-
-  /**
-   * Planner that converts optimized logical plans to physical plans.
-   *
-   * Note: this depends on the `conf` and `experimentalMethods` fields.
-   */
-  protected def planner: SparkPlanner = {
-    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
-      override def extraPlanningStrategies: Seq[Strategy] =
-        super.extraPlanningStrategies ++ customPlanningStrategies
-    }
-  }
-
-  /**
-   * Custom strategies to add to the planner. Prefer overriding this instead 
of creating
-   * your own Planner.
-   *
-   * Note that this may NOT depend on the `planner` function.
-   */
-  protected def customPlanningStrategies: Seq[Strategy] = Nil
-
-  /**
-   * Create a query execution object.
-   */
-  protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
-    new QueryExecution(session, plan)
-  }
-
-  /**
-   * Interface to start and stop streaming queries.
-   */
-  protected def streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(session)
-
-  /**
-   * Function used to make clones of the session state.
-   */
-  protected def createClone: (SparkSession, SessionState) => SessionState = {
-    val createBuilder = newBuilder
-    (session, state) => createBuilder(session, Option(state)).build()
-  }
-
-  /**
-   * Build the [[SessionState]].
-   */
-  def build(): SessionState = {
-    new SessionState(
-      session.sparkContext,
-      session.sharedState,
-      conf,
-      experimentalMethods,
-      functionRegistry,
-      catalog,
-      sqlParser,
-      analyzer,
-      optimizer,
-      planner,
-      streamingQueryManager,
-      resourceLoader,
-      createQueryExecution,
-      createClone)
-  }
-}
-
-/**
- * Helper class for using SessionStateBuilders during tests.
- */
-private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
-  def overrideConfs: Map[String, String]
-
-  override protected lazy val conf: SQLConf = {
-    val conf = parentState.map(_.conf.clone()).getOrElse {
-      new SQLConf {
-        clear()
-        override def clear(): Unit = {
-          super.clear()
-          // Make sure we start with the default test configs even after clear
-          overrideConfs.foreach { case (key, value) => setConfString(key, 
value) }
-        }
-      }
-    }
-    mergeSparkConf(conf, session.sparkContext.conf)
-    conf
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 26ad0ea..f6240d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -98,6 +98,16 @@ class ExecutionListenerManager private[sql] () extends 
Logging {
     listeners.clear()
   }
 
+  /**
+   * Get an identical copy of this listener manager.
+   */
+  @DeveloperApi
+  override def clone(): ExecutionListenerManager = writeLock {
+    val newListenerManager = new ExecutionListenerManager
+    listeners.foreach(newListenerManager.register)
+    newListenerManager
+  }
+
   private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
     readLock {
       withErrorHandling { listener =>

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
index 2d5e372..5638c8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
@@ -19,10 +19,13 @@ package org.apache.spark.sql
 
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.BeforeAndAfterEach
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
 
 class SessionStateSuite extends SparkFunSuite
     with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -122,6 +125,56 @@ class SessionStateSuite extends SparkFunSuite
     }
   }
 
+  test("fork new session and inherit listener manager") {
+    class CommandCollector extends QueryExecutionListener {
+      val commands: ArrayBuffer[String] = ArrayBuffer.empty[String]
+      override def onFailure(funcName: String, qe: QueryExecution, ex: 
Exception) : Unit = {}
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
+        commands += funcName
+      }
+    }
+    val collectorA = new CommandCollector
+    val collectorB = new CommandCollector
+    val collectorC = new CommandCollector
+
+    try {
+      def runCollectQueryOn(sparkSession: SparkSession): Unit = {
+        val tupleEncoder = Encoders.tuple(Encoders.scalaInt, Encoders.STRING)
+        val df = sparkSession.createDataset(Seq(1 -> 
"a"))(tupleEncoder).toDF("i", "j")
+        df.select("i").collect()
+      }
+
+      activeSession.listenerManager.register(collectorA)
+      val forkedSession = activeSession.cloneSession()
+
+      // inheritance
+      assert(forkedSession ne activeSession)
+      assert(forkedSession.listenerManager ne activeSession.listenerManager)
+      runCollectQueryOn(forkedSession)
+      assert(collectorA.commands.length == 1) // forked should callback to A
+      assert(collectorA.commands(0) == "collect")
+
+      // independence
+      // => changes to forked do not affect original
+      forkedSession.listenerManager.register(collectorB)
+      runCollectQueryOn(activeSession)
+      assert(collectorB.commands.isEmpty) // original should not callback to B
+      assert(collectorA.commands.length == 2) // original should still 
callback to A
+      assert(collectorA.commands(1) == "collect")
+      // <= changes to original do not affect forked
+      activeSession.listenerManager.register(collectorC)
+      runCollectQueryOn(forkedSession)
+      assert(collectorC.commands.isEmpty) // forked should not callback to C
+      assert(collectorA.commands.length == 3) // forked should still callback 
to A
+      assert(collectorB.commands.length == 1) // forked should still callback 
to B
+      assert(collectorA.commands(2) == "collect")
+      assert(collectorB.commands(0) == "collect")
+    } finally {
+      activeSession.listenerManager.unregister(collectorA)
+      activeSession.listenerManager.unregister(collectorC)
+    }
+  }
+
   test("fork new sessions and run query on inherited table") {
     def checkTableExists(sparkSession: SparkSession): Unit = {
       QueryTest.checkAnswer(sparkSession.sql(

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 0c79b6f..390b9b6 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -38,7 +38,7 @@ import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
+import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.util.ShutdownHookManager
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
deleted file mode 100644
index f49e6bb..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.annotation.{Experimental, InterfaceStability}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlanner
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{BaseSessionStateBuilder, 
SessionResourceLoader, SessionState}
-
-/**
- * Entry object for creating a Hive aware [[SessionState]].
- */
-private[hive] object HiveSessionState {
-  /**
-   * Create a new Hive aware [[SessionState]]. for the given session.
-   */
-  def apply(session: SparkSession): SessionState = {
-    new HiveSessionStateBuilder(session).build()
-  }
-}
-
-/**
- * Builder that produces a [[HiveSessionState]].
- */
-@Experimental
[email protected]
-class HiveSessionStateBuilder(session: SparkSession, parentState: 
Option[SessionState] = None)
-  extends BaseSessionStateBuilder(session, parentState) {
-
-  private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
-  /**
-   * Create a Hive aware resource loader.
-   */
-  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
-  }
-
-  /**
-   * Create a [[HiveSessionCatalog]].
-   */
-  override protected lazy val catalog: HiveSessionCatalog = {
-    val catalog = new HiveSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      new HiveMetastoreCatalog(session),
-      functionRegistry,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  /**
-   * A logical query plan `Analyzer` with rules specific to Hive.
-   */
-  override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
-    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-      new ResolveHiveSerdeTable(session) +:
-      new FindDataSourceTable(session) +:
-      new ResolveSQLOnFile(session) +:
-      customResolutionRules
-
-    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-      new DetermineTableStats(session) +:
-      catalog.ParquetConversions +:
-      catalog.OrcConversions +:
-      PreprocessTableCreation(session) +:
-      PreprocessTableInsertion(conf) +:
-      DataSourceAnalysis(conf) +:
-      HiveAnalysis +:
-      customPostHocResolutionRules
-
-    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-      PreWriteCheck +:
-      customCheckRules
-  }
-
-  /**
-   * Planner that takes into account Hive-specific strategies.
-   */
-  override protected def planner: SparkPlanner = {
-    new SparkPlanner(session.sparkContext, conf, experimentalMethods) with 
HiveStrategies {
-      override val sparkSession: SparkSession = session
-
-      override def extraPlanningStrategies: Seq[Strategy] =
-        super.extraPlanningStrategies ++ customPlanningStrategies
-
-      override def strategies: Seq[Strategy] = {
-        experimentalMethods.extraStrategies ++
-          extraPlanningStrategies ++ Seq(
-          FileSourceStrategy,
-          DataSourceStrategy,
-          SpecialLimits,
-          InMemoryScans,
-          HiveTableScans,
-          Scripts,
-          Aggregation,
-          JoinSelection,
-          BasicOperators
-        )
-      }
-    }
-  }
-
-  override protected def newBuilder: NewBuilder = new 
HiveSessionStateBuilder(_, _)
-}
-
-class HiveSessionResourceLoader(
-    session: SparkSession,
-    client: HiveClient)
-  extends SessionResourceLoader(session) {
-  override def addJar(path: String): Unit = {
-    client.addJar(path)
-    super.addJar(path)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
new file mode 100644
index 0000000..8048c2b
--- /dev/null
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, 
SessionResourceLoader, SessionState}
+
+/**
+ * Builder that produces a Hive aware [[SessionState]].
+ */
+@Experimental
[email protected]
+class HiveSessionStateBuilder(session: SparkSession, parentState: 
Option[SessionState] = None)
+  extends BaseSessionStateBuilder(session, parentState) {
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  /**
+   * Create a [[HiveSessionCatalog]].
+   */
+  override protected lazy val catalog: HiveSessionCatalog = {
+    val catalog = new HiveSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      new HiveMetastoreCatalog(session),
+      functionRegistry,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  /**
+   * A logical query plan `Analyzer` with rules specific to Hive.
+   */
+  override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+      new ResolveHiveSerdeTable(session) +:
+      new FindDataSourceTable(session) +:
+      new ResolveSQLOnFile(session) +:
+      customResolutionRules
+
+    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+      new DetermineTableStats(session) +:
+      catalog.ParquetConversions +:
+      catalog.OrcConversions +:
+      PreprocessTableCreation(session) +:
+      PreprocessTableInsertion(conf) +:
+      DataSourceAnalysis(conf) +:
+      HiveAnalysis +:
+      customPostHocResolutionRules
+
+    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck +:
+      customCheckRules
+  }
+
+  /**
+   * Planner that takes into account Hive-specific strategies.
+   */
+  override protected def planner: SparkPlanner = {
+    new SparkPlanner(session.sparkContext, conf, experimentalMethods) with 
HiveStrategies {
+      override val sparkSession: SparkSession = session
+
+      override def extraPlanningStrategies: Seq[Strategy] =
+        super.extraPlanningStrategies ++ customPlanningStrategies
+
+      override def strategies: Seq[Strategy] = {
+        experimentalMethods.extraStrategies ++
+          extraPlanningStrategies ++ Seq(
+          FileSourceStrategy,
+          DataSourceStrategy,
+          SpecialLimits,
+          InMemoryScans,
+          HiveTableScans,
+          Scripts,
+          Aggregation,
+          JoinSelection,
+          BasicOperators
+        )
+      }
+    }
+  }
+
+  override protected def newBuilder: NewBuilder = new 
HiveSessionStateBuilder(_, _)
+}
+
+class HiveSessionResourceLoader(
+    session: SparkSession,
+    client: HiveClient)
+  extends SessionResourceLoader(session) {
+  override def addJar(path: String): Unit = {
+    client.addJar(path)
+    super.addJar(path)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 0bcf219..d9bb1f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal._
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, 
WithTestConf}
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala
index 67c77fb..958ad3e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 /**
- * Run all tests from `SessionStateSuite` with a `HiveSessionState`.
+ * Run all tests from `SessionStateSuite` with a Hive based `SessionState`.
  */
 class HiveSessionStateSuite extends SessionStateSuite
   with TestHiveSingleton with BeforeAndAfterEach {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to