Repository: spark Updated Branches: refs/heads/master d6ddfdf60 -> 142f6d149
[SPARK-20048][SQL] Cloning SessionState does not clone query execution listeners ## What changes were proposed in this pull request? Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826) Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries. ## How was this patch tested? - Unit test Author: Kunal Khamar <[email protected]> Closes #17379 from kunalkhamar/clone-bugfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/142f6d14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/142f6d14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/142f6d14 Branch: refs/heads/master Commit: 142f6d14928c780cc9e8d6d7749c5d7c08a30972 Parents: d6ddfdf Author: Kunal Khamar <[email protected]> Authored: Wed Mar 29 12:35:19 2017 -0700 Committer: Herman van Hovell <[email protected]> Committed: Wed Mar 29 12:35:19 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/SparkSession.scala | 22 +- .../sql/internal/BaseSessionStateBuilder.scala | 305 +++++++++++++++++++ .../spark/sql/internal/SessionState.scala | 38 +-- .../sql/internal/sessionStateBuilders.scala | 285 ----------------- .../spark/sql/util/QueryExecutionListener.scala | 10 + .../apache/spark/sql/SessionStateSuite.scala | 53 ++++ .../hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../spark/sql/hive/HiveSessionState.scala | 141 --------- .../sql/hive/HiveSessionStateBuilder.scala | 129 ++++++++ .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/HiveSessionStateSuite.scala | 2 +- 11 files changed, 522 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 4956257..a972978 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} +import org.apache.spark.sql.internal.{BaseSessionStateBuilder, CatalogImpl, SessionState, SessionStateBuilder, SharedState} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ @@ -194,7 +194,7 @@ class SparkSession private( * * @since 2.0.0 */ - def udf: UDFRegistration = sessionState.udf + def udf: UDFRegistration = sessionState.udfRegistration /** * :: Experimental :: @@ -990,28 +990,28 @@ object SparkSession { /** Reference to the root SparkSession. */ private val defaultSession = new AtomicReference[SparkSession] - private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState" + private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME = + "org.apache.spark.sql.hive.HiveSessionStateBuilder" private def sessionStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { - case "hive" => HIVE_SESSION_STATE_CLASS_NAME - case "in-memory" => classOf[SessionState].getCanonicalName + case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME + case "in-memory" => classOf[SessionStateBuilder].getCanonicalName } } /** * Helper method to create an instance of `SessionState` based on `className` from conf. - * The result is either `SessionState` or `HiveSessionState`. + * The result is either `SessionState` or a Hive based `SessionState`. */ private def instantiateSessionState( className: String, sparkSession: SparkSession): SessionState = { - try { - // get `SessionState.apply(SparkSession)` + // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])` val clazz = Utils.classForName(className) - val method = clazz.getMethod("apply", sparkSession.getClass) - method.invoke(null, sparkSession).asInstanceOf[SessionState] + val ctor = clazz.getConstructors.head + ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build() } catch { case NonFatal(e) => throw new IllegalArgumentException(s"Error while instantiating '$className':", e) @@ -1023,7 +1023,7 @@ object SparkSession { */ private[spark] def hiveClassesArePresent: Boolean = { try { - Utils.classForName(HIVE_SESSION_STATE_CLASS_NAME) + Utils.classForName(HIVE_SESSION_STATE_BUILDER_CLASS_NAME) Utils.classForName("org.apache.hadoop.hive.conf.HiveConf") true } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala new file mode 100644 index 0000000..2b14eca --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.internal + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy, UDFRegistration} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.streaming.StreamingQueryManager +import org.apache.spark.sql.util.ExecutionListenerManager + +/** + * Builder class that coordinates construction of a new [[SessionState]]. + * + * The builder explicitly defines all components needed by the session state, and creates a session + * state when `build` is called. Components should only be initialized once. This is not a problem + * for most components as they are only used in the `build` function. However some components + * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are as dependencies + * for other components and are shared as a result. These components are defined as lazy vals to + * make sure the component is created only once. + * + * A developer can modify the builder by providing custom versions of components, or by using the + * hooks provided for the analyzer, optimizer & planner. There are some dependencies between the + * components (they are documented per dependency), a developer should respect these when making + * modifications in order to prevent initialization problems. + * + * A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The new session + * state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods` + * and `catalog` fields. Note that the state is cloned when `build` is called, and not before. + */ +@Experimental [email protected] +abstract class BaseSessionStateBuilder( + val session: SparkSession, + val parentState: Option[SessionState] = None) { + type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder + + /** + * Function that produces a new instance of the SessionStateBuilder. This is used by the + * [[SessionState]]'s clone functionality. Make sure to override this when implementing your own + * [[SessionStateBuilder]]. + */ + protected def newBuilder: NewBuilder + + /** + * Extract entries from `SparkConf` and put them in the `SQLConf` + */ + protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = { + sparkConf.getAll.foreach { case (k, v) => + sqlConf.setConfString(k, v) + } + } + + /** + * SQL-specific key-value configurations. + * + * These either get cloned from a pre-existing instance or newly created. The conf is always + * merged with its [[SparkConf]]. + */ + protected lazy val conf: SQLConf = { + val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf) + mergeSparkConf(conf, session.sparkContext.conf) + conf + } + + /** + * Internal catalog managing functions registered by the user. + * + * This either gets cloned from a pre-existing version or cloned from the built-in registry. + */ + protected lazy val functionRegistry: FunctionRegistry = { + parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone() + } + + /** + * Experimental methods that can be used to define custom optimization rules and custom planning + * strategies. + * + * This either gets cloned from a pre-existing version or newly created. + */ + protected lazy val experimentalMethods: ExperimentalMethods = { + parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods) + } + + /** + * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. + * + * Note: this depends on the `conf` field. + */ + protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) + + /** + * ResourceLoader that is used to load function resources and jars. + */ + protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session) + + /** + * Catalog for managing table and database states. If there is a pre-existing catalog, the state + * of that catalog (temp tables & current database) will be copied into the new catalog. + * + * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields. + */ + protected lazy val catalog: SessionCatalog = { + val catalog = new SessionCatalog( + session.sharedState.externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + /** + * Interface exposed to the user for registering user-defined functions. + * + * Note 1: The user-defined functions must be deterministic. + * Note 2: This depends on the `functionRegistry` field. + */ + protected def udfRegistration: UDFRegistration = new UDFRegistration(functionRegistry) + + /** + * Logical query plan analyzer for resolving unresolved attributes and relations. + * + * Note: this depends on the `conf` and `catalog` fields. + */ + protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + customResolutionRules + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + customPostHocResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck +: + HiveOnlyCheck +: + customCheckRules + } + + /** + * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating + * your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Custom post resolution rules to add to the Analyzer. Prefer overriding this instead of + * creating your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Custom check rules to add to the Analyzer. Prefer overriding this instead of creating + * your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil + + /** + * Logical query plan optimizer. + * + * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields. + */ + protected def optimizer: Optimizer = { + new SparkOptimizer(catalog, conf, experimentalMethods) { + override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = + super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules + } + } + + /** + * Custom operator optimization rules to add to the Optimizer. Prefer overriding this instead + * of creating your own Optimizer. + * + * Note that this may NOT depend on the `optimizer` function. + */ + protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Planner that converts optimized logical plans to physical plans. + * + * Note: this depends on the `conf` and `experimentalMethods` fields. + */ + protected def planner: SparkPlanner = { + new SparkPlanner(session.sparkContext, conf, experimentalMethods) { + override def extraPlanningStrategies: Seq[Strategy] = + super.extraPlanningStrategies ++ customPlanningStrategies + } + } + + /** + * Custom strategies to add to the planner. Prefer overriding this instead of creating + * your own Planner. + * + * Note that this may NOT depend on the `planner` function. + */ + protected def customPlanningStrategies: Seq[Strategy] = Nil + + /** + * Create a query execution object. + */ + protected def createQueryExecution: LogicalPlan => QueryExecution = { plan => + new QueryExecution(session, plan) + } + + /** + * Interface to start and stop streaming queries. + */ + protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session) + + /** + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. + * + * This gets cloned from parent if available, otherwise is a new instance is created. + */ + protected def listenerManager: ExecutionListenerManager = { + parentState.map(_.listenerManager.clone()).getOrElse(new ExecutionListenerManager) + } + + /** + * Function used to make clones of the session state. + */ + protected def createClone: (SparkSession, SessionState) => SessionState = { + val createBuilder = newBuilder + (session, state) => createBuilder(session, Option(state)).build() + } + + /** + * Build the [[SessionState]]. + */ + def build(): SessionState = { + new SessionState( + session.sharedState, + conf, + experimentalMethods, + functionRegistry, + udfRegistration, + catalog, + sqlParser, + analyzer, + optimizer, + planner, + streamingQueryManager, + listenerManager, + resourceLoader, + createQueryExecution, + createClone) + } +} + +/** + * Helper class for using SessionStateBuilders during tests. + */ +private[sql] trait WithTestConf { self: BaseSessionStateBuilder => + def overrideConfs: Map[String, String] + + override protected lazy val conf: SQLConf = { + val conf = parentState.map(_.conf.clone()).getOrElse { + new SQLConf { + clear() + override def clear(): Unit = { + super.clear() + // Make sure we start with the default test configs even after clear + overrideConfs.foreach { case (key, value) => setConfString(key, value) } + } + } + } + mergeSparkConf(conf, session.sparkContext.conf) + conf + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index c6241d9..1b341a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -32,43 +32,46 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.streaming.StreamingQueryManager -import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener} /** * A class that holds all session-specific state in a given [[SparkSession]]. * - * @param sparkContext The [[SparkContext]]. - * @param sharedState The shared state. + * @param sharedState The state shared across sessions, e.g. global view manager, external catalog. * @param conf SQL-specific key-value configurations. - * @param experimentalMethods The experimental methods. + * @param experimentalMethods Interface to add custom planning strategies and optimizers. * @param functionRegistry Internal catalog for managing functions registered by the user. + * @param udfRegistration Interface exposed to the user for registering user-defined functions. * @param catalog Internal catalog for managing table and database states. * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts. * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. * @param optimizer Logical query plan optimizer. - * @param planner Planner that converts optimized logical plans to physical plans + * @param planner Planner that converts optimized logical plans to physical plans. * @param streamingQueryManager Interface to start and stop streaming queries. + * @param listenerManager Interface to register custom [[QueryExecutionListener]]s. + * @param resourceLoader Session shared resource loader to load JARs, files, etc. * @param createQueryExecution Function used to create QueryExecution objects. * @param createClone Function used to create clones of the session state. */ private[sql] class SessionState( - sparkContext: SparkContext, sharedState: SharedState, val conf: SQLConf, val experimentalMethods: ExperimentalMethods, val functionRegistry: FunctionRegistry, + val udfRegistration: UDFRegistration, val catalog: SessionCatalog, val sqlParser: ParserInterface, val analyzer: Analyzer, val optimizer: Optimizer, val planner: SparkPlanner, val streamingQueryManager: StreamingQueryManager, + val listenerManager: ExecutionListenerManager, val resourceLoader: SessionResourceLoader, createQueryExecution: LogicalPlan => QueryExecution, createClone: (SparkSession, SessionState) => SessionState) { def newHadoopConf(): Configuration = SessionState.newHadoopConf( - sparkContext.hadoopConfiguration, + sharedState.sparkContext.hadoopConfiguration, conf) def newHadoopConfWithOptions(options: Map[String, String]): Configuration = { @@ -82,18 +85,6 @@ private[sql] class SessionState( } /** - * Interface exposed to the user for registering user-defined functions. - * Note that the user-defined functions must be deterministic. - */ - val udf: UDFRegistration = new UDFRegistration(functionRegistry) - - /** - * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s - * that listen for execution metrics. - */ - val listenerManager: ExecutionListenerManager = new ExecutionListenerManager - - /** * Get an identical copy of the `SessionState` and associate it with the given `SparkSession` */ def clone(newSparkSession: SparkSession): SessionState = createClone(newSparkSession, this) @@ -110,13 +101,6 @@ private[sql] class SessionState( } private[sql] object SessionState { - /** - * Create a new [[SessionState]] for the given session. - */ - def apply(session: SparkSession): SessionState = { - new SessionStateBuilder(session).build() - } - def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = { val newHadoopConf = new Configuration(hadoopConf) sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) newHadoopConf.set(k, v) } @@ -155,7 +139,7 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade /** * Add a jar path to [[SparkContext]] and the classloader. * - * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs + * Note: this method seems not access any session state, but a Hive based `SessionState` needs * to add the jar to its hive client for the current session. Hence, it still needs to be in * [[SessionState]]. */ http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala deleted file mode 100644 index b8f645f..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.internal - -import org.apache.spark.SparkConf -import org.apache.spark.annotation.{Experimental, InterfaceStability} -import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.streaming.StreamingQueryManager - -/** - * Builder class that coordinates construction of a new [[SessionState]]. - * - * The builder explicitly defines all components needed by the session state, and creates a session - * state when `build` is called. Components should only be initialized once. This is not a problem - * for most components as they are only used in the `build` function. However some components - * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are as dependencies - * for other components and are shared as a result. These components are defined as lazy vals to - * make sure the component is created only once. - * - * A developer can modify the builder by providing custom versions of components, or by using the - * hooks provided for the analyzer, optimizer & planner. There are some dependencies between the - * components (they are documented per dependency), a developer should respect these when making - * modifications in order to prevent initialization problems. - * - * A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The new session - * state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods` - * and `catalog` fields. Note that the state is cloned when `build` is called, and not before. - */ -@Experimental [email protected] -abstract class BaseSessionStateBuilder( - val session: SparkSession, - val parentState: Option[SessionState] = None) { - type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder - - /** - * Function that produces a new instance of the SessionStateBuilder. This is used by the - * [[SessionState]]'s clone functionality. Make sure to override this when implementing your own - * [[SessionStateBuilder]]. - */ - protected def newBuilder: NewBuilder - - /** - * Extract entries from `SparkConf` and put them in the `SQLConf` - */ - protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = { - sparkConf.getAll.foreach { case (k, v) => - sqlConf.setConfString(k, v) - } - } - - /** - * SQL-specific key-value configurations. - * - * These either get cloned from a pre-existing instance or newly created. The conf is always - * merged with its [[SparkConf]]. - */ - protected lazy val conf: SQLConf = { - val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf) - mergeSparkConf(conf, session.sparkContext.conf) - conf - } - - /** - * Internal catalog managing functions registered by the user. - * - * This either gets cloned from a pre-existing version or cloned from the built-in registry. - */ - protected lazy val functionRegistry: FunctionRegistry = { - parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone() - } - - /** - * Experimental methods that can be used to define custom optimization rules and custom planning - * strategies. - * - * This either gets cloned from a pre-existing version or newly created. - */ - protected lazy val experimentalMethods: ExperimentalMethods = { - parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods) - } - - /** - * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. - * - * Note: this depends on the `conf` field. - */ - protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) - - /** - * ResourceLoader that is used to load function resources and jars. - */ - protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session) - - /** - * Catalog for managing table and database states. If there is a pre-existing catalog, the state - * of that catalog (temp tables & current database) will be copied into the new catalog. - * - * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields. - */ - protected lazy val catalog: SessionCatalog = { - val catalog = new SessionCatalog( - session.sharedState.externalCatalog, - session.sharedState.globalTempViewManager, - functionRegistry, - conf, - SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), - sqlParser, - resourceLoader) - parentState.foreach(_.catalog.copyStateTo(catalog)) - catalog - } - - /** - * Logical query plan analyzer for resolving unresolved attributes and relations. - * - * Note: this depends on the `conf` and `catalog` fields. - */ - protected def analyzer: Analyzer = new Analyzer(catalog, conf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new FindDataSourceTable(session) +: - new ResolveSQLOnFile(session) +: - customResolutionRules - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - PreprocessTableCreation(session) +: - PreprocessTableInsertion(conf) +: - DataSourceAnalysis(conf) +: - customPostHocResolutionRules - - override val extendedCheckRules: Seq[LogicalPlan => Unit] = - PreWriteCheck +: - HiveOnlyCheck +: - customCheckRules - } - - /** - * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating - * your own Analyzer. - * - * Note that this may NOT depend on the `analyzer` function. - */ - protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil - - /** - * Custom post resolution rules to add to the Analyzer. Prefer overriding this instead of - * creating your own Analyzer. - * - * Note that this may NOT depend on the `analyzer` function. - */ - protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil - - /** - * Custom check rules to add to the Analyzer. Prefer overriding this instead of creating - * your own Analyzer. - * - * Note that this may NOT depend on the `analyzer` function. - */ - protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil - - /** - * Logical query plan optimizer. - * - * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields. - */ - protected def optimizer: Optimizer = { - new SparkOptimizer(catalog, conf, experimentalMethods) { - override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = - super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules - } - } - - /** - * Custom operator optimization rules to add to the Optimizer. Prefer overriding this instead - * of creating your own Optimizer. - * - * Note that this may NOT depend on the `optimizer` function. - */ - protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil - - /** - * Planner that converts optimized logical plans to physical plans. - * - * Note: this depends on the `conf` and `experimentalMethods` fields. - */ - protected def planner: SparkPlanner = { - new SparkPlanner(session.sparkContext, conf, experimentalMethods) { - override def extraPlanningStrategies: Seq[Strategy] = - super.extraPlanningStrategies ++ customPlanningStrategies - } - } - - /** - * Custom strategies to add to the planner. Prefer overriding this instead of creating - * your own Planner. - * - * Note that this may NOT depend on the `planner` function. - */ - protected def customPlanningStrategies: Seq[Strategy] = Nil - - /** - * Create a query execution object. - */ - protected def createQueryExecution: LogicalPlan => QueryExecution = { plan => - new QueryExecution(session, plan) - } - - /** - * Interface to start and stop streaming queries. - */ - protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session) - - /** - * Function used to make clones of the session state. - */ - protected def createClone: (SparkSession, SessionState) => SessionState = { - val createBuilder = newBuilder - (session, state) => createBuilder(session, Option(state)).build() - } - - /** - * Build the [[SessionState]]. - */ - def build(): SessionState = { - new SessionState( - session.sparkContext, - session.sharedState, - conf, - experimentalMethods, - functionRegistry, - catalog, - sqlParser, - analyzer, - optimizer, - planner, - streamingQueryManager, - resourceLoader, - createQueryExecution, - createClone) - } -} - -/** - * Helper class for using SessionStateBuilders during tests. - */ -private[sql] trait WithTestConf { self: BaseSessionStateBuilder => - def overrideConfs: Map[String, String] - - override protected lazy val conf: SQLConf = { - val conf = parentState.map(_.conf.clone()).getOrElse { - new SQLConf { - clear() - override def clear(): Unit = { - super.clear() - // Make sure we start with the default test configs even after clear - overrideConfs.foreach { case (key, value) => setConfString(key, value) } - } - } - } - mergeSparkConf(conf, session.sparkContext.conf) - conf - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 26ad0ea..f6240d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -98,6 +98,16 @@ class ExecutionListenerManager private[sql] () extends Logging { listeners.clear() } + /** + * Get an identical copy of this listener manager. + */ + @DeveloperApi + override def clone(): ExecutionListenerManager = writeLock { + val newListenerManager = new ExecutionListenerManager + listeners.foreach(newListenerManager.register) + newListenerManager + } + private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { readLock { withErrorHandling { listener => http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala index 2d5e372..5638c8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterEach +import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { @@ -122,6 +125,56 @@ class SessionStateSuite extends SparkFunSuite } } + test("fork new session and inherit listener manager") { + class CommandCollector extends QueryExecutionListener { + val commands: ArrayBuffer[String] = ArrayBuffer.empty[String] + override def onFailure(funcName: String, qe: QueryExecution, ex: Exception) : Unit = {} + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + commands += funcName + } + } + val collectorA = new CommandCollector + val collectorB = new CommandCollector + val collectorC = new CommandCollector + + try { + def runCollectQueryOn(sparkSession: SparkSession): Unit = { + val tupleEncoder = Encoders.tuple(Encoders.scalaInt, Encoders.STRING) + val df = sparkSession.createDataset(Seq(1 -> "a"))(tupleEncoder).toDF("i", "j") + df.select("i").collect() + } + + activeSession.listenerManager.register(collectorA) + val forkedSession = activeSession.cloneSession() + + // inheritance + assert(forkedSession ne activeSession) + assert(forkedSession.listenerManager ne activeSession.listenerManager) + runCollectQueryOn(forkedSession) + assert(collectorA.commands.length == 1) // forked should callback to A + assert(collectorA.commands(0) == "collect") + + // independence + // => changes to forked do not affect original + forkedSession.listenerManager.register(collectorB) + runCollectQueryOn(activeSession) + assert(collectorB.commands.isEmpty) // original should not callback to B + assert(collectorA.commands.length == 2) // original should still callback to A + assert(collectorA.commands(1) == "collect") + // <= changes to original do not affect forked + activeSession.listenerManager.register(collectorC) + runCollectQueryOn(forkedSession) + assert(collectorC.commands.isEmpty) // forked should not callback to C + assert(collectorA.commands.length == 3) // forked should still callback to A + assert(collectorB.commands.length == 1) // forked should still callback to B + assert(collectorA.commands(2) == "collect") + assert(collectorB.commands(0) == "collect") + } finally { + activeSession.listenerManager.unregister(collectorA) + activeSession.listenerManager.unregister(collectorC) + } + } + test("fork new sessions and run query on inherited table") { def checkTableExists(sparkSession: SparkSession): Unit = { QueryTest.checkAnswer(sparkSession.sql( http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 0c79b6f..390b9b6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -38,7 +38,7 @@ import org.apache.thrift.transport.TSocket import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.util.ShutdownHookManager /** http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala deleted file mode 100644 index f49e6bb..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.annotation.{Experimental, InterfaceStability} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlanner -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} - -/** - * Entry object for creating a Hive aware [[SessionState]]. - */ -private[hive] object HiveSessionState { - /** - * Create a new Hive aware [[SessionState]]. for the given session. - */ - def apply(session: SparkSession): SessionState = { - new HiveSessionStateBuilder(session).build() - } -} - -/** - * Builder that produces a [[HiveSessionState]]. - */ -@Experimental [email protected] -class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None) - extends BaseSessionStateBuilder(session, parentState) { - - private def externalCatalog: HiveExternalCatalog = - session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] - - /** - * Create a Hive aware resource loader. - */ - override protected lazy val resourceLoader: HiveSessionResourceLoader = { - val client: HiveClient = externalCatalog.client.newSession() - new HiveSessionResourceLoader(session, client) - } - - /** - * Create a [[HiveSessionCatalog]]. - */ - override protected lazy val catalog: HiveSessionCatalog = { - val catalog = new HiveSessionCatalog( - externalCatalog, - session.sharedState.globalTempViewManager, - new HiveMetastoreCatalog(session), - functionRegistry, - conf, - SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), - sqlParser, - resourceLoader) - parentState.foreach(_.catalog.copyStateTo(catalog)) - catalog - } - - /** - * A logical query plan `Analyzer` with rules specific to Hive. - */ - override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(session) +: - new FindDataSourceTable(session) +: - new ResolveSQLOnFile(session) +: - customResolutionRules - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - new DetermineTableStats(session) +: - catalog.ParquetConversions +: - catalog.OrcConversions +: - PreprocessTableCreation(session) +: - PreprocessTableInsertion(conf) +: - DataSourceAnalysis(conf) +: - HiveAnalysis +: - customPostHocResolutionRules - - override val extendedCheckRules: Seq[LogicalPlan => Unit] = - PreWriteCheck +: - customCheckRules - } - - /** - * Planner that takes into account Hive-specific strategies. - */ - override protected def planner: SparkPlanner = { - new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies { - override val sparkSession: SparkSession = session - - override def extraPlanningStrategies: Seq[Strategy] = - super.extraPlanningStrategies ++ customPlanningStrategies - - override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ - extraPlanningStrategies ++ Seq( - FileSourceStrategy, - DataSourceStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - Scripts, - Aggregation, - JoinSelection, - BasicOperators - ) - } - } - } - - override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) -} - -class HiveSessionResourceLoader( - session: SparkSession, - client: HiveClient) - extends SessionResourceLoader(session) { - override def addJar(path: String): Unit = { - client.addJar(path) - super.addJar(path) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala new file mode 100644 index 0000000..8048c2b --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} + +/** + * Builder that produces a Hive aware [[SessionState]]. + */ +@Experimental [email protected] +class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None) + extends BaseSessionStateBuilder(session, parentState) { + + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + + /** + * Create a [[HiveSessionCatalog]]. + */ + override protected lazy val catalog: HiveSessionCatalog = { + val catalog = new HiveSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + new HiveMetastoreCatalog(session), + functionRegistry, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + /** + * A logical query plan `Analyzer` with rules specific to Hive. + */ + override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + customResolutionRules + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + catalog.ParquetConversions +: + catalog.OrcConversions +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck +: + customCheckRules + } + + /** + * Planner that takes into account Hive-specific strategies. + */ + override protected def planner: SparkPlanner = { + new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies { + override val sparkSession: SparkSession = session + + override def extraPlanningStrategies: Seq[Strategy] = + super.extraPlanningStrategies ++ customPlanningStrategies + + override def strategies: Seq[Strategy] = { + experimentalMethods.extraStrategies ++ + extraPlanningStrategies ++ Seq( + FileSourceStrategy, + DataSourceStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + Scripts, + Aggregation, + JoinSelection, + BasicOperators + ) + } + } + } + + override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) +} + +class HiveSessionResourceLoader( + session: SparkSession, + client: HiveClient) + extends SessionResourceLoader(session) { + override def addJar(path: String): Unit = { + client.addJar(path) + super.addJar(path) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 0bcf219..d9bb1f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal._ +import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, WithTestConf} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/142f6d14/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala index 67c77fb..958ad3e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHiveSingleton /** - * Run all tests from `SessionStateSuite` with a `HiveSessionState`. + * Run all tests from `SessionStateSuite` with a Hive based `SessionState`. */ class HiveSessionStateSuite extends SessionStateSuite with TestHiveSingleton with BeforeAndAfterEach { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
