This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 80bc028e6 [KYUUBI #4995] Use hadoop conf and hive conf from catalog 
options
80bc028e6 is described below

commit 80bc028e6dfa3bfbbaa78e1d3c8e9c698b235ba7
Author: zhaomin <[email protected]>
AuthorDate: Mon Jun 26 15:04:39 2023 +0800

    [KYUUBI #4995] Use hadoop conf and hive conf from catalog options
    
    ### _Why are the changes needed?_
    
    There are hdfs-site.xml, hive-site, etc in spark job classpath, but we 
should use hadoop conf and hive conf from catalog options.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4995 from zhaomin1423/fix_hive_connector.
    
    Closes #4995
    
    64429fdcb [Xiao Zhao] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
    d921be750 [zhaomin] fix
    375934d65 [zhaomin] Using hadoop conf and hive conf from catalog options
    
    Lead-authored-by: zhaomin <[email protected]>
    Co-authored-by: Xiao Zhao <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/spark/connector/hive/HiveTableCatalog.scala     | 14 +++++++++++++-
 .../kyuubi/spark/connector/hive/read/HiveFileIndex.scala   |  2 +-
 .../kyuubi/spark/connector/hive/write/HiveWrite.scala      |  2 +-
 .../spark/connector/hive/write/HiveWriteHelper.scala       |  5 ++---
 4 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index 1199987b4..44057be0a 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -105,7 +106,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     catalogOptions = options
     catalog = new HiveSessionCatalog(
       externalCatalogBuilder = () => externalCatalog,
-      globalTempViewManagerBuilder = () => 
sparkSession.sharedState.globalTempViewManager,
+      globalTempViewManagerBuilder = () => globalTempViewManager,
       metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
       functionRegistry = sessionState.functionRegistry,
       tableFunctionRegistry = sessionState.tableFunctionRegistry,
@@ -115,6 +116,17 @@ class HiveTableCatalog(sparkSession: SparkSession)
       HiveUDFExpressionBuilder)
   }
 
+  private lazy val globalTempViewManager: GlobalTempViewManager = {
+    val globalTempDB = conf.getConf(GLOBAL_TEMP_DATABASE)
+    if (externalCatalog.databaseExists(globalTempDB)) {
+      throw KyuubiHiveConnectorException(
+        s"$globalTempDB is a system preserved database, please rename your 
existing database to " +
+          s"resolve the name conflict, or set a different value for 
${GLOBAL_TEMP_DATABASE.key}, " +
+          "and launch your Spark application again.")
+    }
+    new GlobalTempViewManager(globalTempDB)
+  }
+
   /**
    * A catalog that interacts with external systems.
    */
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 82199e6f2..937a9557d 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -80,7 +80,7 @@ class HiveCatalogFileIndex(
       val partitions = selectedPartitions.map {
         case BindPartition(catalogTablePartition, hivePartition) =>
           val path = new Path(catalogTablePartition.location)
-          val fs = path.getFileSystem(hadoopConf)
+          val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration())
           val partPath = PartitionPath(
             catalogTablePartition.toRow(
               partitionSchema,
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 62db1fa0a..2ee338673 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -76,7 +76,7 @@ case class HiveWrite(
   override def description(): String = "Kyuubi-Hive-Connector"
 
   override def toBatch: BatchWrite = {
-    val tmpLocation = HiveWriteHelper.getExternalTmpPath(sparkSession, 
hadoopConf, tableLocation)
+    val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, 
hadoopConf, tableLocation)
 
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     handleCompression(fileSinkConf, hadoopConf)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index 68ba0bfb2..25bca911f 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, 
HiveExternalCatalog, HiveVersion}
 
 import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
@@ -47,7 +47,7 @@ object HiveWriteHelper extends Logging {
   private val hiveScratchDir = "hive.exec.scratchdir"
 
   def getExternalTmpPath(
-      sparkSession: SparkSession,
+      externalCatalog: ExternalCatalogWithListener,
       hadoopConf: Configuration,
       path: Path): Path = {
 
@@ -70,7 +70,6 @@ object HiveWriteHelper extends Logging {
     assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
       allSupportedHiveVersions)
 
-    val externalCatalog = sparkSession.sharedState.externalCatalog
     val hiveVersion = 
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
     val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
     val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")

Reply via email to