This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 576c4046742 [SPARK-35242][SQL] Support changing session catalog's
default database
576c4046742 is described below
commit 576c4046742e1fd5a0e93b87e9b29bac0df83788
Author: Gabor Roczei <[email protected]>
AuthorDate: Tue Sep 27 08:28:26 2022 +0800
[SPARK-35242][SQL] Support changing session catalog's default database
### What changes were proposed in this pull request?
This PR is a follow-up PR for #32364. It has been closed by github-actions
because it hasn't been updated in a while. The previous PR has added a new
custom parameter (spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase) to
configure the session catalog's default database which is required by some use
cases where the user does not have access to the default database.
Therefore I have created a new PR based on this and added these changes in
addition:
- Rebased / updated the previous PR to the latest master branch version
- Deleted the DEFAULT_DATABASE static member from
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
and refactored the code regarding this
### Why are the changes needed?
If our user does not have any permissions for the Hive default database in
Ranger, it will fail with the following error:
```
22/08/26 18:36:21 INFO metastore.RetryingMetaStoreClient: [main]:
RetryingMetaStoreClient proxy=class
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0
org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException:
MetaException(message:Permission denied: user [hrt_10] does not have [USE]
privilege on [default])
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
at
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
```
The idea is that we introduce a new configuration parameter where we can
set a different database name for the default database. Our user has enough
permissions for this in Ranger.
For example:
```spark-shell --conf
spark.sql.catalog.spark_catalog.defaultDatabase=other_db```
### Does this PR introduce _any_ user-facing change?
There will be a new configuration parameter as I mentioned above but the
default value is "default" as it was previously.
### How was this patch tested?
1) With github action (all tests passed)
https://github.com/roczei/spark/actions/runs/2934863118
2) Manually tested with Ranger + Hive
Scenario a) hrt_10 does not have access to the default database in Hive:
```
[hrt_10quasar-thbnqr-2 ~]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
22/08/26 18:14:18 WARN conf.HiveConf: [main]: HiveConf of name
hive.masking.algo does not exist
22/08/26 18:14:30 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
[dispatcher-event-loop-17]: Attempted to request executors before the AM has
registered!
...
scala> spark.sql("use other")
22/08/26 18:18:47 INFO conf.HiveConf: [main]: Found configuration file
file:/etc/hive/conf/hive-site.xml
22/08/26 18:18:48 WARN conf.HiveConf: [main]: HiveConf of name
hive.masking.algo does not exist
22/08/26 18:18:48 WARN client.HiveClientImpl: [main]: Detected HiveConf
hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless
hive logic
Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d
22/08/26 18:18:48 INFO SessionState: [main]: Hive Session ID =
2188764e-d0dc-41b3-b714-f89b03cb3d6d
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: HMS client
filtering is enabled.
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Trying to
connect to metastore with URI
thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]:
HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift
connection.
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Opened a
connection to metastore, current connections: 1
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Connected to
metastore.
22/08/26 18:18:50 INFO metastore.RetryingMetaStoreClient: [main]:
RetryingMetaStoreClient proxy=class
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0
org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException:
MetaException(message:Permission denied: user [hrt_10] does not have [USE]
privilege on [default])
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
at
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:179)
```
This is the expected behavior because it will use the "default" db name.
Scenario b) Use the "other" database where the hrt_10 user has proper
permissions
```
[hrt_10quasar-thbnqr-2 ~]$ spark-shell --conf
spark.sql.catalog.spark_catalog.defaultDatabase=other
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
22/08/26 18:27:03 WARN conf.HiveConf: [main]: HiveConf of name
hive.masking.algo does not exist
22/08/26 18:27:14 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
[dispatcher-event-loop-15]: Attempted to request executors before the AM has
registered!
...
scala> spark.sql("use other")
22/08/26 18:29:22 INFO conf.HiveConf: [main]: Found configuration file
file:/etc/hive/conf/hive-site.xml
22/08/26 18:29:22 WARN conf.HiveConf: [main]: HiveConf of name
hive.masking.algo does not exist
22/08/26 18:29:22 WARN client.HiveClientImpl: [main]: Detected HiveConf
hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless
hive logic
Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2
22/08/26 18:29:22 INFO SessionState: [main]: Hive Session ID =
47721693-dbfe-4760-80f6-d4a76a3b37d2
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: HMS client
filtering is enabled.
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Trying to
connect to metastore with URI
thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]:
HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift
connection.
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Opened a
connection to metastore, current connections: 1
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Connected to
metastore.
22/08/26 18:29:24 INFO metastore.RetryingMetaStoreClient: [main]:
RetryingMetaStoreClient proxy=class
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from employee").show()
+---+----+------+-----------+
|eid|name|salary|destination|
+---+----+------+-----------+
| 12| Ram| 10| Szeged|
| 13| Joe| 20| Debrecen|
+---+----+------+-----------+
scala>
```
Closes #37679 from roczei/SPARK-35242.
Lead-authored-by: Gabor Roczei <[email protected]>
Co-authored-by: hongdongdong <[email protected]>
Co-authored-by: hongdd <[email protected]>
Co-authored-by: Gabor Roczei <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
core/src/main/resources/error/error-classes.json | 5 +++++
.../spark/sql/catalyst/catalog/SessionCatalog.scala | 8 +++++---
.../spark/sql/connector/catalog/CatalogManager.scala | 4 ++--
.../apache/spark/sql/errors/QueryExecutionErrors.scala | 8 ++++++++
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 2 ++
.../org/apache/spark/sql/internal/StaticSQLConf.scala | 8 ++++++++
.../execution/datasources/v2/V2SessionCatalog.scala | 3 ++-
.../org/apache/spark/sql/internal/SharedState.scala | 18 ++++++++++++------
.../apache/spark/sql/hive/thriftserver/CliSuite.scala | 15 +++++++++++++++
9 files changed, 59 insertions(+), 12 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index a71b977fc51..9e909bcb6c9 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -206,6 +206,11 @@
],
"sqlState" : "22008"
},
+ "DEFAULT_DATABASE_NOT_EXISTS" : {
+ "message" : [
+ "Default database <defaultDatabase> does not exist, please create it
first or change default database to 'default'."
+ ]
+ },
"DIVIDE_BY_ZERO" : {
"message" : [
"Division by zero. Use `try_divide` to tolerate divisor being 0 and
return NULL instead. If necessary set <config> to \"false\" to bypass this
error."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 1ada2ffa4fc..cc2ba8ac7e4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -68,7 +68,8 @@ class SessionCatalog(
functionResourceLoader: FunctionResourceLoader,
functionExpressionBuilder: FunctionExpressionBuilder,
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
- cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with
Logging {
+ cacheTTL: Long = SQLConf.get.metadataCacheTTL,
+ defaultDatabase: String = SQLConf.get.defaultDatabase) extends
SQLConfHelper with Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec
@@ -88,7 +89,8 @@ class SessionCatalog(
DummyFunctionResourceLoader,
DummyFunctionExpressionBuilder,
conf.tableRelationCacheSize,
- conf.metadataCacheTTL)
+ conf.metadataCacheTTL,
+ conf.defaultDatabase)
}
// For testing only.
@@ -129,7 +131,7 @@ class SessionCatalog(
// check whether the temporary view or function exists, then, if not,
operate on
// the corresponding item in the current database.
@GuardedBy("this")
- protected var currentDb: String = format(DEFAULT_DATABASE)
+ protected var currentDb: String = format(defaultDatabase)
private val validNameFormat = "([\\w_]+)".r
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 62c0772f865..cf9dd7fdf47 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -130,7 +130,7 @@ class CatalogManager(
_currentNamespace = None
// Reset the current database of v1 `SessionCatalog` when switching
current catalog, so that
// when we switch back to session catalog, the current namespace
definitely is ["default"].
- v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
+ v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}
@@ -144,7 +144,7 @@ class CatalogManager(
catalogs.clear()
_currentNamespace = None
_currentCatalogName = None
- v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
+ v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 1a93014ecff..7c7561b3a71 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1932,6 +1932,14 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
new UnsupportedOperationException(s"$nodeName does not implement
doExecuteBroadcast")
}
+ def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = {
+ new SparkException(
+ errorClass = "DEFAULT_DATABASE_NOT_EXISTS",
+ messageParameters = Map("defaultDatabase" -> defaultDatabase),
+ cause = null
+ )
+ }
+
def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB:
String): Throwable = {
new SparkException(
s"""
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index bc6700a3b56..626c30cb9c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4705,6 +4705,8 @@ class SQLConf extends Serializable with Logging {
def errorMessageFormat: ErrorMessageFormat.Value =
ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT))
+ def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 3be02f69f23..aaeac8ce6fc 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import java.util.Locale
import java.util.concurrent.TimeUnit
+import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.util.Utils
@@ -37,6 +38,13 @@ object StaticSQLConf {
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)
+ val CATALOG_DEFAULT_DATABASE =
+ buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase")
+ .doc("The default database for session catalog.")
+ .version("3.4.0")
+ .stringConf
+ .createWithDefault("default")
+
val CATALOG_IMPLEMENTATION =
buildStaticConf("spark.sql.catalogImplementation")
.internal()
.version("2.0.0")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index efbc9dd7558..b9afe71d243 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -32,6 +32,7 @@ import
org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
extends TableCatalog with FunctionCatalog with SupportsNamespaces with
SQLConfHelper {
import V2SessionCatalog._
- override val defaultNamespace: Array[String] = Array("default")
+ override val defaultNamespace: Array[String] =
Array(SQLConf.get.defaultDatabase)
override def name: String = CatalogManager.SESSION_CATALOG_NAME
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index f6b748d2424..92c3ec888d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -148,13 +148,19 @@ private[sql] class SharedState(
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf,
Configuration](
SharedState.externalCatalogClassName(conf), conf, hadoopConf)
- val defaultDbDefinition = CatalogDatabase(
- SessionCatalog.DEFAULT_DATABASE,
- "default database",
- CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
- Map())
// Create default database if it doesn't exist
- if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
+ // If database name not equals 'default', throw exception
+ if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) {
+ if
(!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase))
{
+ throw QueryExecutionErrors.defaultDatabaseNotExistsError(
+ SQLConf.get.defaultDatabase
+ )
+ }
+ val defaultDbDefinition = CatalogDatabase(
+ SQLConf.get.defaultDatabase,
+ "default database",
+ CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
+ Map())
// There may be another Spark application creating default database at
the same time, here we
// set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists`
exception.
externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists =
true)
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index aa7b8876486..fb5beb60c5c 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -772,4 +772,19 @@ class CliSuite extends SparkFunSuite {
}
}
// scalastyle:on line.size.limit
+
+ test("SPARK-35242: Support change catalog default database for spark") {
+ // Create db and table first
+ runCliWithin(2.minute,
+ Seq("--conf",
s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))(
+ "create database spark_35242;" -> "",
+ "use spark_35242;" -> "",
+ "CREATE TABLE spark_test(key INT, val STRING);" -> "")
+
+ // Set default db
+ runCliWithin(2.minute,
+ Seq("--conf",
s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}",
+ "--conf",
s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))(
+ "show tables;" -> "spark_test")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]