This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5f5ee4d [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml
and spark.sql.warehouse.dir
5f5ee4d is described below
commit 5f5ee4d84acc933112c52b1818a865139c2af05a
Author: Kent Yao <[email protected]>
AuthorDate: Fri Mar 27 12:05:45 2020 +0800
[SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and
spark.sql.warehouse.dir
### What changes were proposed in this pull request?
In Spark CLI, we create a hive `CliSessionState` and it does not load the
`hive-site.xml`. So the configurations in `hive-site.xml` will not take effects
like other spark-hive integration apps.
Also, the warehouse directory is not correctly picked. If the `default`
database does not exist, the `CliSessionState` will create one during the first
time it talks to the metastore. The `Location` of the default DB will be
neither the value of `spark.sql.warehousr.dir` nor the user-specified value of
`hive.metastore.warehourse.dir`, but the default value of
`hive.metastore.warehourse.dir `which will always be `/user/hive/warehouse`.
This PR fixes CLiSuite failure with the hive-1.2 profile in
https://github.com/apache/spark/pull/27933.
In https://github.com/apache/spark/pull/27933, we fix the issue in JIRA by
deciding the warehouse dir using all properties from spark conf and Hadoop
conf, but properties from `--hiveconf` is not included, they will be applied
to the `CliSessionState` instance after it initialized. When this command-line
option key is `hive.metastore.warehouse.dir`, the actual warehouse dir is
overridden. Because of the logic in Hive for creating the non-existing default
database changed, that test p [...]
` spark.hive.xxx > spark.hadoop.xxx > --hiveconf xxx > hive-site.xml`
througth `ShareState.loadHiveConfFile` before sessionState start
### Why are the changes needed?
Bugfix for Spark SQL CLI to pick right confs
### Does this PR introduce any user-facing change?
yes,
1. the non-exists default database will be created in the location
specified by the users via `spark.sql.warehouse.dir` or
`hive.metastore.warehouse.dir`, or the default value of
`spark.sql.warehouse.dir` if none of them specified.
2. configurations from `hive-site.xml` will not override command-line
options or the properties defined with `spark.hadoo(hive).` prefix in spark
conf.
### How was this patch tested?
add cli ut
Closes #27969 from yaooqinn/SPARK-31170-2.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8be16907c261657f83f5d5934bcd978d8dacf7ff)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/internal/SharedState.scala | 87 +++++++++++---------
.../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 9 +-
.../src/test/noclasspath/hive-site.xml | 30 +++++++
.../spark/sql/hive/thriftserver/CliSuite.scala | 96 +++++++++++++++++-----
.../spark/sql/hive/HiveSharedStateSuite.scala | 1 -
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
6 files changed, 159 insertions(+), 66 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 5347264..14b8ea6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -22,6 +22,7 @@ import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -41,7 +42,6 @@ import
org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils
-
/**
* A class that holds all state shared across sessions in a given
[[SQLContext]].
*
@@ -55,45 +55,10 @@ private[sql] class SharedState(
SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
- // Load hive-site.xml into hadoopConf and determine the warehouse path we
want to use, based on
- // the config from both hive and Spark SQL. Finally set the warehouse config
value to sparkConf.
- val warehousePath: String = {
- val configFile =
Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
- if (configFile != null) {
- logInfo(s"loading hive config file: $configFile")
- sparkContext.hadoopConfiguration.addResource(configFile)
- }
-
- // hive.metastore.warehouse.dir only stay in hadoopConf
- sparkContext.conf.remove("hive.metastore.warehouse.dir")
- // Set the Hive metastore warehouse path to the one we use
- val hiveWarehouseDir =
sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
- if (hiveWarehouseDir != null &&
!sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
- // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is
not set,
- // we will respect the value of hive.metastore.warehouse.dir.
- sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
- logInfo(s"${WAREHOUSE_PATH.key} is not set, but
hive.metastore.warehouse.dir " +
- s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
- s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
- hiveWarehouseDir
- } else {
- // If spark.sql.warehouse.dir is set, we will override
hive.metastore.warehouse.dir using
- // the value of spark.sql.warehouse.dir.
- // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir
is set,
- // we will set hive.metastore.warehouse.dir to the default value of
spark.sql.warehouse.dir.
- val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
- logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to
the value of " +
- s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
- sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir",
sparkWarehouseDir)
- sparkWarehouseDir
- }
- }
- logInfo(s"Warehouse path is '$warehousePath'.")
-
- // These 2 variables should be initiated after `warehousePath`, because in
the first place we need
- // to load hive-site.xml into hadoopConf and determine the warehouse path
which will be set into
- // both spark conf and hadoop conf avoiding be affected by any SparkSession
level options
private val (conf, hadoopConf) = {
+ // Load hive-site.xml into hadoopConf and determine the warehouse path
which will be set into
+ // both spark conf and hadoop conf avoiding be affected by any
SparkSession level options
+ SharedState.loadHiveConfFile(sparkContext.conf,
sparkContext.hadoopConfiguration)
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext`
instance and no existing
@@ -166,7 +131,7 @@ private[sql] class SharedState(
val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
- CatalogUtils.stringToURI(warehousePath),
+ CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
Map())
// Create default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
@@ -258,4 +223,46 @@ object SharedState extends Logging {
throw new IllegalArgumentException(s"Error while instantiating
'$className':", e)
}
}
+
+ /**
+ * Load hive-site.xml into hadoopConf and determine the warehouse path we
want to use, based on
+ * the config from both hive and Spark SQL. Finally set the warehouse config
value to sparkConf.
+ */
+ def loadHiveConfFile(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration): Unit = {
+ val hiveWarehouseKey = "hive.metastore.warehouse.dir"
+ val configFile =
Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
+ if (configFile != null) {
+ logInfo(s"loading hive config file: $configFile")
+ val hadoopConfTemp = new Configuration()
+ hadoopConfTemp.addResource(configFile)
+ hadoopConfTemp.asScala.foreach { entry =>
+ hadoopConf.setIfUnset(entry.getKey, entry.getValue)
+ }
+ }
+ // hive.metastore.warehouse.dir only stay in hadoopConf
+ sparkConf.remove(hiveWarehouseKey)
+ // Set the Hive metastore warehouse path to the one we use
+ val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey)
+ val warehousePath = if (hiveWarehouseDir != null &&
!sparkConf.contains(WAREHOUSE_PATH.key)) {
+ // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is
not set,
+ // we will respect the value of hive.metastore.warehouse.dir.
+ sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
+ logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is
set. Setting" +
+ s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey
('$hiveWarehouseDir').")
+ hiveWarehouseDir
+ } else {
+ // If spark.sql.warehouse.dir is set, we will override
hive.metastore.warehouse.dir using
+ // the value of spark.sql.warehouse.dir.
+ // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir
is set
+ // we will set hive.metastore.warehouse.dir to the default value of
spark.sql.warehouse.dir.
+ val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH)
+ logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value
of " +
+ s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
+ hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir)
+ sparkWarehouseDir
+ }
+ logInfo(s"Warehouse path is '$warehousePath'.")
+ }
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 6b76927..5ed0cb0 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
+import org.apache.spark.sql.internal.SharedState
import org.apache.spark.util.ShutdownHookManager
/**
@@ -130,6 +131,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
UserGroupInformation.getCurrentUser.addCredentials(credentials)
}
+ SharedState.loadHiveConfFile(sparkConf, conf)
SessionState.start(sessionState)
// Clean up after we exit
@@ -188,8 +190,11 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)
- newHiveConf.foreach { kv =>
- SparkSQLEnv.sqlContext.setConf(kv._1, kv._2)
+ // We don't propagate hive.metastore.warehouse.dir, because it might has
been adjusted in
+ // [[SharedState.loadHiveConfFile]] based on the user specified or default
values of
+ // spark.sql.warehouse.dir and hive.metastore.warehouse.dir.
+ for ((k, v) <- newHiveConf if k != "hive.metastore.warehouse.dir") {
+ SparkSQLEnv.sqlContext.setConf(k, v)
}
if (sessionState.execString != null) {
diff --git a/sql/hive-thriftserver/src/test/noclasspath/hive-site.xml
b/sql/hive-thriftserver/src/test/noclasspath/hive-site.xml
new file mode 100644
index 0000000..d0bf04d
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/noclasspath/hive-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>hive.in.test</name>
+ <value>true</value>
+ <description>Internal marker for test.</description>
+ </property>
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>/tmp/hive_one</value>
+ </property>
+</configuration>
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 43aafc3..c393054 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -27,22 +27,23 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.test.HiveTestJars
+import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
/**
- * A test suite for the `spark-sql` CLI tool. Note that all test cases share
the same temporary
- * Hive metastore and warehouse.
+ * A test suite for the `spark-sql` CLI tool.
*/
-class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
+class CliSuite extends SparkFunSuite with BeforeAndAfterAll with
BeforeAndAfterEach with Logging {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
val scratchDirPath = Utils.createTempDir()
+ val sparkWareHouseDir = Utils.createTempDir()
override def beforeAll(): Unit = {
super.beforeAll()
@@ -53,14 +54,20 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll
with Logging {
override def afterAll(): Unit = {
try {
- warehousePath.delete()
- metastorePath.delete()
- scratchDirPath.delete()
+ Utils.deleteRecursively(warehousePath)
+ Utils.deleteRecursively(metastorePath)
+ Utils.deleteRecursively(scratchDirPath)
} finally {
super.afterAll()
}
}
+ override def afterEach(): Unit = {
+ // Only running `runCliWithin` in a single test case will share the same
temporary
+ // Hive metastore
+ Utils.deleteRecursively(metastorePath)
+ }
+
/**
* Run a CLI operation and expect all the queries and expected answers to be
returned.
*
@@ -75,25 +82,35 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll
with Logging {
def runCliWithin(
timeout: FiniteDuration,
extraArgs: Seq[String] = Seq.empty,
- errorResponses: Seq[String] = Seq("Error:"))(
+ errorResponses: Seq[String] = Seq("Error:"),
+ maybeWarehouse: Option[File] = Some(warehousePath),
+ useExternalHiveFile: Boolean = false)(
queriesAndExpectedAnswers: (String, String)*): Unit = {
val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip
// Explicitly adds ENTER for each statement to make sure they are actually
entered into the CLI.
val queriesString = queries.map(_ + "\n").mkString
+ val extraHive = if (useExternalHiveFile) {
+ s"--driver-class-path
${System.getProperty("user.dir")}/src/test/noclasspath"
+ } else {
+ ""
+ }
+ val warehouseConf =
+ maybeWarehouse.map(dir => s"--hiveconf
${ConfVars.METASTOREWAREHOUSE}=$dir").getOrElse("")
val command = {
val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true"
s"""$cliScript
| --master local
| --driver-java-options -Dderby.system.durability=test
+ | $extraHive
| --conf spark.ui.enabled=false
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
- | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath
| --hiveconf conf1=conftest
| --hiveconf conf2=1
+ | $warehouseConf
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}
@@ -159,6 +176,54 @@ class CliSuite extends SparkFunSuite with
BeforeAndAfterAll with Logging {
}
}
+ test("load warehouse dir from hive-site.xml") {
+ runCliWithin(1.minute, maybeWarehouse = None, useExternalHiveFile = true)(
+ "desc database default;" -> "hive_one",
+ "set spark.sql.warehouse.dir;" -> "hive_one")
+ }
+
+ test("load warehouse dir from --hiveconf") {
+ // --hiveconf will overrides hive-site.xml
+ runCliWithin(2.minute, useExternalHiveFile = true)(
+ "desc database default;" -> warehousePath.getAbsolutePath,
+ "create database cliTestDb;" -> "",
+ "desc database cliTestDb;" -> warehousePath.getAbsolutePath,
+ "set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath)
+ }
+
+ test("load warehouse dir from --conf spark(.hadoop).hive.*") {
+ // override conf from hive-site.xml
+ runCliWithin(
+ 2.minute,
+ extraArgs = Seq("--conf",
s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$sparkWareHouseDir"),
+ maybeWarehouse = None,
+ useExternalHiveFile = true)(
+ "desc database default;" -> sparkWareHouseDir.getAbsolutePath,
+ "create database cliTestDb;" -> "",
+ "desc database cliTestDb;" -> sparkWareHouseDir.getAbsolutePath,
+ "set spark.sql.warehouse.dir;" -> sparkWareHouseDir.getAbsolutePath)
+
+ // override conf from --hiveconf too
+ runCliWithin(
+ 2.minute,
+ extraArgs = Seq("--conf",
s"spark.${ConfVars.METASTOREWAREHOUSE}=$sparkWareHouseDir"))(
+ "desc database default;" -> sparkWareHouseDir.getAbsolutePath,
+ "create database cliTestDb;" -> "",
+ "desc database cliTestDb;" -> sparkWareHouseDir.getAbsolutePath,
+ "set spark.sql.warehouse.dir;" -> sparkWareHouseDir.getAbsolutePath)
+ }
+
+ test("load warehouse dir from spark.sql.warehouse.dir") {
+ // spark.sql.warehouse.dir overrides all hive ones
+ runCliWithin(
+ 2.minute,
+ extraArgs =
+ Seq("--conf",
+ s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}1",
+ "--conf",
s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=${sparkWareHouseDir}2"))(
+ "desc database default;" ->
sparkWareHouseDir.getAbsolutePath.concat("1"))
+ }
+
test("Simple commands") {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
@@ -308,19 +373,6 @@ class CliSuite extends SparkFunSuite with
BeforeAndAfterAll with Logging {
)
}
- test("SPARK-21451: spark.sql.warehouse.dir should respect options in
--hiveconf") {
- runCliWithin(1.minute)("set spark.sql.warehouse.dir;" ->
warehousePath.getAbsolutePath)
- }
-
- test("SPARK-21451: Apply spark.hadoop.* configurations") {
- val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451")
- runCliWithin(
- 1.minute,
- Seq("--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))(
- "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath)
- tmpDir.delete()
- }
-
test("Support hive.aux.jars.path") {
val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath
runCliWithin(
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index 6e2dcfc..78535b0 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -45,7 +45,6 @@ class HiveSharedStateSuite extends SparkFunSuite {
GLOBAL_TEMP_DATABASE.key -> tmpDb)
val state = new SharedState(sc, initialConfigs)
- assert(state.warehousePath !== invalidPath, "warehouse path can't
determine by session options")
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark
conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !==
invalidPath,
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 31ff62e..8b97489 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -787,7 +787,7 @@ object SPARK_18360 {
.enableHiveSupport().getOrCreate()
val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
- assert(new Path(defaultDbLocation) == new
Path(spark.sharedState.warehousePath))
+ assert(new Path(defaultDbLocation) == new
Path(spark.conf.get(WAREHOUSE_PATH)))
val hiveClient =
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]