This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 41ca69a4d [KYUUBI #4995] Use hadoop conf and hive conf from catalog
options
41ca69a4d is described below
commit 41ca69a4d2c2ca7a346e736f84c3bf5e28ec887d
Author: zhaomin <[email protected]>
AuthorDate: Mon Jun 26 15:04:39 2023 +0800
[KYUUBI #4995] Use hadoop conf and hive conf from catalog options
### _Why are the changes needed?_
There are hdfs-site.xml, hive-site, etc in spark job classpath, but we
should use hadoop conf and hive conf from catalog options.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #4995 from zhaomin1423/fix_hive_connector.
Closes #4995
64429fdcb [Xiao Zhao] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
d921be750 [zhaomin] fix
375934d65 [zhaomin] Using hadoop conf and hive conf from catalog options
Lead-authored-by: zhaomin <[email protected]>
Co-authored-by: Xiao Zhao <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 80bc028e6dfa3bfbbaa78e1d3c8e9c698b235ba7)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/spark/connector/hive/HiveTableCatalog.scala | 14 +++++++++++++-
.../kyuubi/spark/connector/hive/read/HiveFileIndex.scala | 2 +-
.../kyuubi/spark/connector/hive/write/HiveWrite.scala | 2 +-
.../spark/connector/hive/write/HiveWriteHelper.scala | 5 ++---
4 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c4d71dbba..8ceb56037 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -104,7 +105,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
catalogOptions = options
catalog = new HiveSessionCatalog(
externalCatalogBuilder = () => externalCatalog,
- globalTempViewManagerBuilder = () =>
sparkSession.sharedState.globalTempViewManager,
+ globalTempViewManagerBuilder = () => globalTempViewManager,
metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
functionRegistry = sessionState.functionRegistry,
tableFunctionRegistry = sessionState.tableFunctionRegistry,
@@ -114,6 +115,17 @@ class HiveTableCatalog(sparkSession: SparkSession)
HiveUDFExpressionBuilder)
}
+ private lazy val globalTempViewManager: GlobalTempViewManager = {
+ val globalTempDB = conf.getConf(GLOBAL_TEMP_DATABASE)
+ if (externalCatalog.databaseExists(globalTempDB)) {
+ throw KyuubiHiveConnectorException(
+ s"$globalTempDB is a system preserved database, please rename your
existing database to " +
+ s"resolve the name conflict, or set a different value for
${GLOBAL_TEMP_DATABASE.key}, " +
+ "and launch your Spark application again.")
+ }
+ new GlobalTempViewManager(globalTempDB)
+ }
+
/**
* A catalog that interacts with external systems.
*/
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 82199e6f2..937a9557d 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -80,7 +80,7 @@ class HiveCatalogFileIndex(
val partitions = selectedPartitions.map {
case BindPartition(catalogTablePartition, hivePartition) =>
val path = new Path(catalogTablePartition.location)
- val fs = path.getFileSystem(hadoopConf)
+ val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration())
val partPath = PartitionPath(
catalogTablePartition.toRow(
partitionSchema,
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 62db1fa0a..2ee338673 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -76,7 +76,7 @@ case class HiveWrite(
override def description(): String = "Kyuubi-Hive-Connector"
override def toBatch: BatchWrite = {
- val tmpLocation = HiveWriteHelper.getExternalTmpPath(sparkSession,
hadoopConf, tableLocation)
+ val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog,
hadoopConf, tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
handleCompression(fileSinkConf, hadoopConf)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index 68ba0bfb2..25bca911f 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive,
HiveExternalCatalog, HiveVersion}
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
@@ -47,7 +47,7 @@ object HiveWriteHelper extends Logging {
private val hiveScratchDir = "hive.exec.scratchdir"
def getExternalTmpPath(
- sparkSession: SparkSession,
+ externalCatalog: ExternalCatalogWithListener,
hadoopConf: Configuration,
path: Path): Path = {
@@ -70,7 +70,6 @@ object HiveWriteHelper extends Logging {
assert(hiveVersionsUsingNewExternalTempPath ++
hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)
- val externalCatalog = sparkSession.sharedState.externalCatalog
val hiveVersion =
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")