Repository: spark Updated Branches: refs/heads/master e38b0baa3 -> 2cbc41282
[SPARK-13076][SQL] Rename ClientInterface -> HiveClient And ClientWrapper -> HiveClientImpl. I have some followup pull requests to introduce a new internal catalog, and I think this new naming reflects better the functionality of the two classes. Author: Reynold Xin <[email protected]> Closes #10981 from rxin/SPARK-13076. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cbc4128 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cbc4128 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cbc4128 Branch: refs/heads/master Commit: 2cbc412821641cf9446c0621ffa1976bd7fc4fa1 Parents: e38b0ba Author: Reynold Xin <[email protected]> Authored: Fri Jan 29 16:57:34 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Fri Jan 29 16:57:34 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/execution/commands.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveContext.scala | 10 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/client/ClientInterface.scala | 195 ------- .../spark/sql/hive/client/ClientWrapper.scala | 544 ------------------- .../spark/sql/hive/client/HiveClient.scala | 195 +++++++ .../spark/sql/hive/client/HiveClientImpl.scala | 544 +++++++++++++++++++ .../apache/spark/sql/hive/client/HiveShim.scala | 5 +- .../sql/hive/client/IsolatedClientLoader.scala | 18 +- .../org/apache/spark/sql/hive/hiveUDFs.scala | 4 +- .../apache/spark/sql/hive/test/TestHive.scala | 4 +- .../spark/sql/hive/client/VersionsSuite.scala | 4 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 14 files changed, 765 insertions(+), 766 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 703e464..c6adb58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -404,7 +404,7 @@ case class DescribeFunction( result } - case None => Seq(Row(s"Function: $functionName is not found.")) + case None => Seq(Row(s"Function: $functionName not found.")) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 51a50c1..2b821c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -84,7 +84,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { "Extended Usage") checkExistence(sql("describe functioN abcadf"), true, - "Function: abcadf is not found.") + "Function: abcadf not found.") } test("SPARK-6743: no columns from cache") { http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 1797ea5..05863ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -79,8 +79,8 @@ class HiveContext private[hive]( sc: SparkContext, cacheManager: CacheManager, listener: SQLListener, - @transient private val execHive: ClientWrapper, - @transient private val metaHive: ClientInterface, + @transient private val execHive: HiveClientImpl, + @transient private val metaHive: HiveClient, isRootContext: Boolean) extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging { self => @@ -193,7 +193,7 @@ class HiveContext private[hive]( * for storing persistent metadata, and only point to a dummy metastore in a temporary directory. */ @transient - protected[hive] lazy val executionHive: ClientWrapper = if (execHive != null) { + protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) { execHive } else { logInfo(s"Initializing execution hive, version $hiveExecutionVersion") @@ -203,7 +203,7 @@ class HiveContext private[hive]( config = newTemporaryConfiguration(useInMemoryDerby = true), isolationOn = false, baseClassLoader = Utils.getContextOrSparkClassLoader) - loader.createClient().asInstanceOf[ClientWrapper] + loader.createClient().asInstanceOf[HiveClientImpl] } /** @@ -222,7 +222,7 @@ class HiveContext private[hive]( * in the hive-site.xml file. */ @transient - protected[hive] lazy val metadataHive: ClientInterface = if (metaHive != null) { + protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) { metaHive } else { val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 848aa4e..61d0d67 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -96,7 +96,7 @@ private[hive] object HiveSerDe { } } -private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext) +private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) extends Catalog with Logging { val conf = hive.conf http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala deleted file mode 100644 index 4eec3fe..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.client - -import java.io.PrintStream -import java.util.{Map => JMap} -import javax.annotation.Nullable - -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} -import org.apache.spark.sql.catalyst.expressions.Expression - -private[hive] case class HiveDatabase(name: String, location: String) - -private[hive] abstract class TableType { val name: String } -private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" } -private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" } -private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" } -private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" } - -// TODO: Use this for Tables and Partitions -private[hive] case class HiveStorageDescriptor( - location: String, - inputFormat: String, - outputFormat: String, - serde: String, - serdeProperties: Map[String, String]) - -private[hive] case class HivePartition( - values: Seq[String], - storage: HiveStorageDescriptor) - -private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) -private[hive] case class HiveTable( - specifiedDatabase: Option[String], - name: String, - schema: Seq[HiveColumn], - partitionColumns: Seq[HiveColumn], - properties: Map[String, String], - serdeProperties: Map[String, String], - tableType: TableType, - location: Option[String] = None, - inputFormat: Option[String] = None, - outputFormat: Option[String] = None, - serde: Option[String] = None, - viewText: Option[String] = None) { - - @transient - private[client] var client: ClientInterface = _ - - private[client] def withClient(ci: ClientInterface): this.type = { - client = ci - this - } - - def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved")) - - def isPartitioned: Boolean = partitionColumns.nonEmpty - - def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) - - def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = - client.getPartitionsByFilter(this, predicates) - - // Hive does not support backticks when passing names to the client. - def qualifiedName: String = s"$database.$name" -} - -/** - * An externally visible interface to the Hive client. This interface is shared across both the - * internal and external classloaders for a given version of Hive and thus must expose only - * shared classes. - */ -private[hive] trait ClientInterface { - - /** Returns the Hive Version of this client. */ - def version: HiveVersion - - /** Returns the configuration for the given key in the current session. */ - def getConf(key: String, defaultValue: String): String - - /** - * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will - * result in one string. - */ - def runSqlHive(sql: String): Seq[String] - - def setOut(stream: PrintStream): Unit - def setInfo(stream: PrintStream): Unit - def setError(stream: PrintStream): Unit - - /** Returns the names of all tables in the given database. */ - def listTables(dbName: String): Seq[String] - - /** Returns the name of the active database. */ - def currentDatabase: String - - /** Sets the name of current database. */ - def setCurrentDatabase(databaseName: String): Unit - - /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - def getDatabase(name: String): HiveDatabase = { - getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException) - } - - /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[HiveDatabase] - - /** Returns the specified table, or throws [[NoSuchTableException]]. */ - def getTable(dbName: String, tableName: String): HiveTable = { - getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException) - } - - /** Returns the metadata for the specified table or None if it doens't exist. */ - def getTableOption(dbName: String, tableName: String): Option[HiveTable] - - /** Creates a view with the given metadata. */ - def createView(view: HiveTable): Unit - - /** Updates the given view with new metadata. */ - def alertView(view: HiveTable): Unit - - /** Creates a table with the given metadata. */ - def createTable(table: HiveTable): Unit - - /** Updates the given table with new metadata. */ - def alterTable(table: HiveTable): Unit - - /** Creates a new database with the given name. */ - def createDatabase(database: HiveDatabase): Unit - - /** Returns the specified paritition or None if it does not exist. */ - def getPartitionOption( - hTable: HiveTable, - partitionSpec: JMap[String, String]): Option[HivePartition] - - /** Returns all partitions for the given table. */ - def getAllPartitions(hTable: HiveTable): Seq[HivePartition] - - /** Returns partitions filtered by predicates for the given table. */ - def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition] - - /** Loads a static partition into an existing table. */ - def loadPartition( - loadPath: String, - tableName: String, - partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering - replace: Boolean, - holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit - - /** Loads data into an existing table. */ - def loadTable( - loadPath: String, // TODO URI - tableName: String, - replace: Boolean, - holdDDLTime: Boolean): Unit - - /** Loads new dynamic partitions into an existing table. */ - def loadDynamicPartitions( - loadPath: String, - tableName: String, - partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering - replace: Boolean, - numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit - - /** Add a jar into class loader */ - def addJar(path: String): Unit - - /** Return a ClientInterface as new session, that will share the class loader and Hive client */ - def newSession(): ClientInterface - - /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */ - def withHiveState[A](f: => A): A - - /** Used for testing only. Removes all metadata from this instance of Hive. */ - def reset(): Unit -} http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala deleted file mode 100644 index 5307e92..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.client - -import java.io.{File, PrintStream} -import java.util.{Map => JMap} - -import scala.collection.JavaConverters._ -import scala.language.reflectiveCalls - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HTableType} -import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema} -import org.apache.hadoop.hive.ql.{metadata, Driver} -import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.processors._ -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} -import org.apache.hadoop.security.UserGroupInformation - -import org.apache.spark.{Logging, SparkConf, SparkException} -import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.util.{CircularBuffer, Utils} - -/** - * A class that wraps the HiveClient and converts its responses to externally visible classes. - * Note that this class is typically loaded with an internal classloader for each instantiation, - * allowing it to interact directly with a specific isolated version of Hive. Loading this class - * with the isolated classloader however will result in it only being visible as a ClientInterface, - * not a ClientWrapper. - * - * This class needs to interact with multiple versions of Hive, but will always be compiled with - * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility - * must use reflection after matching on `version`. - * - * @param version the version of hive used when pick function calls that are not compatible. - * @param config a collection of configuration options that will be added to the hive conf before - * opening the hive client. - * @param initClassLoader the classloader used when creating the `state` field of - * this ClientWrapper. - */ -private[hive] class ClientWrapper( - override val version: HiveVersion, - config: Map[String, String], - initClassLoader: ClassLoader, - val clientLoader: IsolatedClientLoader) - extends ClientInterface - with Logging { - - // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. - private val outputBuffer = new CircularBuffer() - - private val shim = version match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } - - // Create an internal session state for this ClientWrapper. - val state = { - val original = Thread.currentThread().getContextClassLoader - // Switch to the initClassLoader. - Thread.currentThread().setContextClassLoader(initClassLoader) - - // Set up kerberos credentials for UserGroupInformation.loginUser within - // current class loader - // Instead of using the spark conf of the current spark context, a new - // instance of SparkConf is needed for the original value of spark.yarn.keytab - // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the - // keytab configuration for the link name in distributed cache - val sparkConf = new SparkConf - if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { - val principalName = sparkConf.get("spark.yarn.principal") - val keytabFileName = sparkConf.get("spark.yarn.keytab") - if (!new File(keytabFileName).exists()) { - throw new SparkException(s"Keytab file: ${keytabFileName}" + - " specified in spark.yarn.keytab does not exist") - } else { - logInfo("Attempting to login to Kerberos" + - s" using principal: ${principalName} and keytab: ${keytabFileName}") - UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName) - } - } - - val ret = try { - val initialConf = new HiveConf(classOf[SessionState]) - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - initialConf.setClassLoader(initClassLoader) - config.foreach { case (k, v) => - if (k.toLowerCase.contains("password")) { - logDebug(s"Hive Config: $k=xxx") - } else { - logDebug(s"Hive Config: $k=$v") - } - initialConf.set(k, v) - } - val state = new SessionState(initialConf) - if (clientLoader.cachedHive != null) { - Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) - } - SessionState.start(state) - state.out = new PrintStream(outputBuffer, true, "UTF-8") - state.err = new PrintStream(outputBuffer, true, "UTF-8") - state - } finally { - Thread.currentThread().setContextClassLoader(original) - } - ret - } - - /** Returns the configuration for the current session. */ - def conf: HiveConf = SessionState.get().getConf - - override def getConf(key: String, defaultValue: String): String = { - conf.get(key, defaultValue) - } - - // We use hive's conf for compatibility. - private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES) - private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf) - - /** - * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable. - */ - private def retryLocked[A](f: => A): A = clientLoader.synchronized { - // Hive sometimes retries internally, so set a deadline to avoid compounding delays. - val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong - var numTries = 0 - var caughtException: Exception = null - do { - numTries += 1 - try { - return f - } catch { - case e: Exception if causedByThrift(e) => - caughtException = e - logWarning( - "HiveClientWrapper got thrift exception, destroying client and retrying " + - s"(${retryLimit - numTries} tries remaining)", e) - clientLoader.cachedHive = null - Thread.sleep(retryDelayMillis) - } - } while (numTries <= retryLimit && System.nanoTime < deadline) - if (System.nanoTime > deadline) { - logWarning("Deadline exceeded") - } - throw caughtException - } - - private def causedByThrift(e: Throwable): Boolean = { - var target = e - while (target != null) { - val msg = target.getMessage() - if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) { - return true - } - target = target.getCause() - } - false - } - - def client: Hive = { - if (clientLoader.cachedHive != null) { - clientLoader.cachedHive.asInstanceOf[Hive] - } else { - val c = Hive.get(conf) - clientLoader.cachedHive = c - c - } - } - - /** - * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. - */ - def withHiveState[A](f: => A): A = retryLocked { - val original = Thread.currentThread().getContextClassLoader - // Set the thread local metastore client to the client associated with this ClientWrapper. - Hive.set(client) - // The classloader in clientLoader could be changed after addJar, always use the latest - // classloader - state.getConf.setClassLoader(clientLoader.classLoader) - // setCurrentSessionState will use the classLoader associated - // with the HiveConf in `state` to override the context class loader of the current - // thread. - shim.setCurrentSessionState(state) - val ret = try f finally { - Thread.currentThread().setContextClassLoader(original) - } - ret - } - - def setOut(stream: PrintStream): Unit = withHiveState { - state.out = stream - } - - def setInfo(stream: PrintStream): Unit = withHiveState { - state.info = stream - } - - def setError(stream: PrintStream): Unit = withHiveState { - state.err = stream - } - - override def currentDatabase: String = withHiveState { - state.getCurrentDatabase - } - - override def setCurrentDatabase(databaseName: String): Unit = withHiveState { - if (getDatabaseOption(databaseName).isDefined) { - state.setCurrentDatabase(databaseName) - } else { - throw new NoSuchDatabaseException - } - } - - override def createDatabase(database: HiveDatabase): Unit = withHiveState { - client.createDatabase( - new Database( - database.name, - "", - new File(database.location).toURI.toString, - new java.util.HashMap), - true) - } - - override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState { - Option(client.getDatabase(name)).map { d => - HiveDatabase( - name = d.getName, - location = d.getLocationUri) - } - } - - override def getTableOption( - dbName: String, - tableName: String): Option[HiveTable] = withHiveState { - - logDebug(s"Looking up $dbName.$tableName") - - val hiveTable = Option(client.getTable(dbName, tableName, false)) - val converted = hiveTable.map { h => - - HiveTable( - name = h.getTableName, - specifiedDatabase = Option(h.getDbName), - schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)), - partitionColumns = h.getPartCols.asScala.map(f => - HiveColumn(f.getName, f.getType, f.getComment)), - properties = h.getParameters.asScala.toMap, - serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap, - tableType = h.getTableType match { - case HTableType.MANAGED_TABLE => ManagedTable - case HTableType.EXTERNAL_TABLE => ExternalTable - case HTableType.VIRTUAL_VIEW => VirtualView - case HTableType.INDEX_TABLE => IndexTable - }, - location = shim.getDataLocation(h), - inputFormat = Option(h.getInputFormatClass).map(_.getName), - outputFormat = Option(h.getOutputFormatClass).map(_.getName), - serde = Option(h.getSerializationLib), - viewText = Option(h.getViewExpandedText)).withClient(this) - } - converted - } - - private def toInputFormat(name: String) = - Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] - - private def toOutputFormat(name: String) = - Utils.classForName(name) - .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - - private def toQlTable(table: HiveTable): metadata.Table = { - val qlTable = new metadata.Table(table.database, table.name) - - qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - qlTable.setPartCols( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } - table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } - - // set owner - qlTable.setOwner(conf.getUser) - // set create time - qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - - table.location.foreach { loc => shim.setDataLocation(qlTable, loc) } - table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass) - table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass) - table.serde.foreach(qlTable.setSerializationLib) - - qlTable - } - - private def toViewTable(view: HiveTable): metadata.Table = { - // TODO: this is duplicated with `toQlTable` except the table type stuff. - val tbl = new metadata.Table(view.database, view.name) - tbl.setTableType(HTableType.VIRTUAL_VIEW) - tbl.setSerializationLib(null) - tbl.clearSerDeInfo() - - // TODO: we will save the same SQL string to original and expanded text, which is different - // from Hive. - tbl.setViewOriginalText(view.viewText.get) - tbl.setViewExpandedText(view.viewText.get) - - tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - view.properties.foreach { case (k, v) => tbl.setProperty(k, v) } - - // set owner - tbl.setOwner(conf.getUser) - // set create time - tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - - tbl - } - - override def createView(view: HiveTable): Unit = withHiveState { - client.createTable(toViewTable(view)) - } - - override def alertView(view: HiveTable): Unit = withHiveState { - client.alterTable(view.qualifiedName, toViewTable(view)) - } - - override def createTable(table: HiveTable): Unit = withHiveState { - val qlTable = toQlTable(table) - client.createTable(qlTable) - } - - override def alterTable(table: HiveTable): Unit = withHiveState { - val qlTable = toQlTable(table) - client.alterTable(table.qualifiedName, qlTable) - } - - private def toHivePartition(partition: metadata.Partition): HivePartition = { - val apiPartition = partition.getTPartition - HivePartition( - values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), - storage = HiveStorageDescriptor( - location = apiPartition.getSd.getLocation, - inputFormat = apiPartition.getSd.getInputFormat, - outputFormat = apiPartition.getSd.getOutputFormat, - serde = apiPartition.getSd.getSerdeInfo.getSerializationLib, - serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) - } - - override def getPartitionOption( - table: HiveTable, - partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState { - - val qlTable = toQlTable(table) - val qlPartition = client.getPartition(qlTable, partitionSpec, false) - Option(qlPartition).map(toHivePartition) - } - - override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState { - val qlTable = toQlTable(hTable) - shim.getAllPartitions(client, qlTable).map(toHivePartition) - } - - override def getPartitionsByFilter( - hTable: HiveTable, - predicates: Seq[Expression]): Seq[HivePartition] = withHiveState { - val qlTable = toQlTable(hTable) - shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition) - } - - override def listTables(dbName: String): Seq[String] = withHiveState { - client.getAllTables(dbName).asScala - } - - /** - * Runs the specified SQL query using Hive. - */ - override def runSqlHive(sql: String): Seq[String] = { - val maxResults = 100000 - val results = runHive(sql, maxResults) - // It is very confusing when you only get back some of the results... - if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED") - results - } - - /** - * Execute the command using Hive and return the results as a sequence. Each element - * in the sequence is one row. - */ - protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { - logDebug(s"Running hiveql '$cmd'") - if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") } - try { - val cmd_trimmed: String = cmd.trim() - val tokens: Array[String] = cmd_trimmed.split("\\s+") - // The remainder of the command. - val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - val proc = shim.getCommandProcessor(tokens(0), conf) - proc match { - case driver: Driver => - val response: CommandProcessorResponse = driver.run(cmd) - // Throw an exception if there is an error in query processing. - if (response.getResponseCode != 0) { - driver.close() - throw new QueryExecutionException(response.getErrorMessage) - } - driver.setMaxRows(maxRows) - - val results = shim.getDriverResults(driver) - driver.close() - results - - case _ => - if (state.out != null) { - // scalastyle:off println - state.out.println(tokens(0) + " " + cmd_1) - // scalastyle:on println - } - Seq(proc.run(cmd_1).getResponseCode.toString) - } - } catch { - case e: Exception => - logError( - s""" - |====================== - |HIVE FAILURE OUTPUT - |====================== - |${outputBuffer.toString} - |====================== - |END HIVE FAILURE OUTPUT - |====================== - """.stripMargin) - throw e - } - } - - def loadPartition( - loadPath: String, - tableName: String, - partSpec: java.util.LinkedHashMap[String, String], - replace: Boolean, - holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = withHiveState { - shim.loadPartition( - client, - new Path(loadPath), // TODO: Use URI - tableName, - partSpec, - replace, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) - } - - def loadTable( - loadPath: String, // TODO URI - tableName: String, - replace: Boolean, - holdDDLTime: Boolean): Unit = withHiveState { - shim.loadTable( - client, - new Path(loadPath), - tableName, - replace, - holdDDLTime) - } - - def loadDynamicPartitions( - loadPath: String, - tableName: String, - partSpec: java.util.LinkedHashMap[String, String], - replace: Boolean, - numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit = withHiveState { - shim.loadDynamicPartitions( - client, - new Path(loadPath), - tableName, - partSpec, - replace, - numDP, - holdDDLTime, - listBucketingEnabled) - } - - def addJar(path: String): Unit = { - val uri = new Path(path).toUri - val jarURL = if (uri.getScheme == null) { - // `path` is a local file path without a URL scheme - new File(path).toURI.toURL - } else { - // `path` is a URL with a scheme - uri.toURL - } - clientLoader.addJar(jarURL) - runSqlHive(s"ADD JAR $path") - } - - def newSession(): ClientWrapper = { - clientLoader.createClient().asInstanceOf[ClientWrapper] - } - - def reset(): Unit = withHiveState { - client.getAllTables("default").asScala.foreach { t => - logDebug(s"Deleting table $t") - val table = client.getTable("default", t) - client.getIndexes("default", t, 255).asScala.foreach { index => - shim.dropIndex(client, "default", t, index.getIndexName) - } - if (!table.isIndexTable) { - client.dropTable("default", t) - } - } - client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => - logDebug(s"Dropping Database: $db") - client.dropDatabase(db, true, false, true) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala new file mode 100644 index 0000000..f681cc6 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import java.io.PrintStream +import java.util.{Map => JMap} +import javax.annotation.Nullable + +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} +import org.apache.spark.sql.catalyst.expressions.Expression + +private[hive] case class HiveDatabase(name: String, location: String) + +private[hive] abstract class TableType { val name: String } +private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" } +private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" } +private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" } +private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" } + +// TODO: Use this for Tables and Partitions +private[hive] case class HiveStorageDescriptor( + location: String, + inputFormat: String, + outputFormat: String, + serde: String, + serdeProperties: Map[String, String]) + +private[hive] case class HivePartition( + values: Seq[String], + storage: HiveStorageDescriptor) + +private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) +private[hive] case class HiveTable( + specifiedDatabase: Option[String], + name: String, + schema: Seq[HiveColumn], + partitionColumns: Seq[HiveColumn], + properties: Map[String, String], + serdeProperties: Map[String, String], + tableType: TableType, + location: Option[String] = None, + inputFormat: Option[String] = None, + outputFormat: Option[String] = None, + serde: Option[String] = None, + viewText: Option[String] = None) { + + @transient + private[client] var client: HiveClient = _ + + private[client] def withClient(ci: HiveClient): this.type = { + client = ci + this + } + + def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved")) + + def isPartitioned: Boolean = partitionColumns.nonEmpty + + def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) + + def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = + client.getPartitionsByFilter(this, predicates) + + // Hive does not support backticks when passing names to the client. + def qualifiedName: String = s"$database.$name" +} + +/** + * An externally visible interface to the Hive client. This interface is shared across both the + * internal and external classloaders for a given version of Hive and thus must expose only + * shared classes. + */ +private[hive] trait HiveClient { + + /** Returns the Hive Version of this client. */ + def version: HiveVersion + + /** Returns the configuration for the given key in the current session. */ + def getConf(key: String, defaultValue: String): String + + /** + * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will + * result in one string. + */ + def runSqlHive(sql: String): Seq[String] + + def setOut(stream: PrintStream): Unit + def setInfo(stream: PrintStream): Unit + def setError(stream: PrintStream): Unit + + /** Returns the names of all tables in the given database. */ + def listTables(dbName: String): Seq[String] + + /** Returns the name of the active database. */ + def currentDatabase: String + + /** Sets the name of current database. */ + def setCurrentDatabase(databaseName: String): Unit + + /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ + def getDatabase(name: String): HiveDatabase = { + getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException) + } + + /** Returns the metadata for a given database, or None if it doesn't exist. */ + def getDatabaseOption(name: String): Option[HiveDatabase] + + /** Returns the specified table, or throws [[NoSuchTableException]]. */ + def getTable(dbName: String, tableName: String): HiveTable = { + getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException) + } + + /** Returns the metadata for the specified table or None if it doens't exist. */ + def getTableOption(dbName: String, tableName: String): Option[HiveTable] + + /** Creates a view with the given metadata. */ + def createView(view: HiveTable): Unit + + /** Updates the given view with new metadata. */ + def alertView(view: HiveTable): Unit + + /** Creates a table with the given metadata. */ + def createTable(table: HiveTable): Unit + + /** Updates the given table with new metadata. */ + def alterTable(table: HiveTable): Unit + + /** Creates a new database with the given name. */ + def createDatabase(database: HiveDatabase): Unit + + /** Returns the specified paritition or None if it does not exist. */ + def getPartitionOption( + hTable: HiveTable, + partitionSpec: JMap[String, String]): Option[HivePartition] + + /** Returns all partitions for the given table. */ + def getAllPartitions(hTable: HiveTable): Seq[HivePartition] + + /** Returns partitions filtered by predicates for the given table. */ + def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition] + + /** Loads a static partition into an existing table. */ + def loadPartition( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit + + /** Loads data into an existing table. */ + def loadTable( + loadPath: String, // TODO URI + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit + + /** Loads new dynamic partitions into an existing table. */ + def loadDynamicPartitions( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit + + /** Add a jar into class loader */ + def addJar(path: String): Unit + + /** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */ + def newSession(): HiveClient + + /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */ + def withHiveState[A](f: => A): A + + /** Used for testing only. Removes all metadata from this instance of Hive. */ + def reset(): Unit +} http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala new file mode 100644 index 0000000..cf1ff55 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import java.io.{File, PrintStream} +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.reflectiveCalls + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.{TableType => HTableType} +import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema} +import org.apache.hadoop.hive.ql.{metadata, Driver} +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.processors._ +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.util.{CircularBuffer, Utils} + +/** + * A class that wraps the HiveClient and converts its responses to externally visible classes. + * Note that this class is typically loaded with an internal classloader for each instantiation, + * allowing it to interact directly with a specific isolated version of Hive. Loading this class + * with the isolated classloader however will result in it only being visible as a [[HiveClient]], + * not a [[HiveClientImpl]]. + * + * This class needs to interact with multiple versions of Hive, but will always be compiled with + * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility + * must use reflection after matching on `version`. + * + * @param version the version of hive used when pick function calls that are not compatible. + * @param config a collection of configuration options that will be added to the hive conf before + * opening the hive client. + * @param initClassLoader the classloader used when creating the `state` field of + * this [[HiveClientImpl]]. + */ +private[hive] class HiveClientImpl( + override val version: HiveVersion, + config: Map[String, String], + initClassLoader: ClassLoader, + val clientLoader: IsolatedClientLoader) + extends HiveClient + with Logging { + + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. + private val outputBuffer = new CircularBuffer() + + private val shim = version match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } + + // Create an internal session state for this HiveClientImpl. + val state = { + val original = Thread.currentThread().getContextClassLoader + // Switch to the initClassLoader. + Thread.currentThread().setContextClassLoader(initClassLoader) + + // Set up kerberos credentials for UserGroupInformation.loginUser within + // current class loader + // Instead of using the spark conf of the current spark context, a new + // instance of SparkConf is needed for the original value of spark.yarn.keytab + // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the + // keytab configuration for the link name in distributed cache + val sparkConf = new SparkConf + if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { + val principalName = sparkConf.get("spark.yarn.principal") + val keytabFileName = sparkConf.get("spark.yarn.keytab") + if (!new File(keytabFileName).exists()) { + throw new SparkException(s"Keytab file: ${keytabFileName}" + + " specified in spark.yarn.keytab does not exist") + } else { + logInfo("Attempting to login to Kerberos" + + s" using principal: ${principalName} and keytab: ${keytabFileName}") + UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName) + } + } + + val ret = try { + val initialConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + initialConf.setClassLoader(initClassLoader) + config.foreach { case (k, v) => + if (k.toLowerCase.contains("password")) { + logDebug(s"Hive Config: $k=xxx") + } else { + logDebug(s"Hive Config: $k=$v") + } + initialConf.set(k, v) + } + val state = new SessionState(initialConf) + if (clientLoader.cachedHive != null) { + Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) + } + SessionState.start(state) + state.out = new PrintStream(outputBuffer, true, "UTF-8") + state.err = new PrintStream(outputBuffer, true, "UTF-8") + state + } finally { + Thread.currentThread().setContextClassLoader(original) + } + ret + } + + /** Returns the configuration for the current session. */ + def conf: HiveConf = SessionState.get().getConf + + override def getConf(key: String, defaultValue: String): String = { + conf.get(key, defaultValue) + } + + // We use hive's conf for compatibility. + private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES) + private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf) + + /** + * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable. + */ + private def retryLocked[A](f: => A): A = clientLoader.synchronized { + // Hive sometimes retries internally, so set a deadline to avoid compounding delays. + val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong + var numTries = 0 + var caughtException: Exception = null + do { + numTries += 1 + try { + return f + } catch { + case e: Exception if causedByThrift(e) => + caughtException = e + logWarning( + "HiveClient got thrift exception, destroying client and retrying " + + s"(${retryLimit - numTries} tries remaining)", e) + clientLoader.cachedHive = null + Thread.sleep(retryDelayMillis) + } + } while (numTries <= retryLimit && System.nanoTime < deadline) + if (System.nanoTime > deadline) { + logWarning("Deadline exceeded") + } + throw caughtException + } + + private def causedByThrift(e: Throwable): Boolean = { + var target = e + while (target != null) { + val msg = target.getMessage() + if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) { + return true + } + target = target.getCause() + } + false + } + + def client: Hive = { + if (clientLoader.cachedHive != null) { + clientLoader.cachedHive.asInstanceOf[Hive] + } else { + val c = Hive.get(conf) + clientLoader.cachedHive = c + c + } + } + + /** + * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. + */ + def withHiveState[A](f: => A): A = retryLocked { + val original = Thread.currentThread().getContextClassLoader + // Set the thread local metastore client to the client associated with this HiveClientImpl. + Hive.set(client) + // The classloader in clientLoader could be changed after addJar, always use the latest + // classloader + state.getConf.setClassLoader(clientLoader.classLoader) + // setCurrentSessionState will use the classLoader associated + // with the HiveConf in `state` to override the context class loader of the current + // thread. + shim.setCurrentSessionState(state) + val ret = try f finally { + Thread.currentThread().setContextClassLoader(original) + } + ret + } + + def setOut(stream: PrintStream): Unit = withHiveState { + state.out = stream + } + + def setInfo(stream: PrintStream): Unit = withHiveState { + state.info = stream + } + + def setError(stream: PrintStream): Unit = withHiveState { + state.err = stream + } + + override def currentDatabase: String = withHiveState { + state.getCurrentDatabase + } + + override def setCurrentDatabase(databaseName: String): Unit = withHiveState { + if (getDatabaseOption(databaseName).isDefined) { + state.setCurrentDatabase(databaseName) + } else { + throw new NoSuchDatabaseException + } + } + + override def createDatabase(database: HiveDatabase): Unit = withHiveState { + client.createDatabase( + new Database( + database.name, + "", + new File(database.location).toURI.toString, + new java.util.HashMap), + true) + } + + override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState { + Option(client.getDatabase(name)).map { d => + HiveDatabase( + name = d.getName, + location = d.getLocationUri) + } + } + + override def getTableOption( + dbName: String, + tableName: String): Option[HiveTable] = withHiveState { + + logDebug(s"Looking up $dbName.$tableName") + + val hiveTable = Option(client.getTable(dbName, tableName, false)) + val converted = hiveTable.map { h => + + HiveTable( + name = h.getTableName, + specifiedDatabase = Option(h.getDbName), + schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)), + partitionColumns = h.getPartCols.asScala.map(f => + HiveColumn(f.getName, f.getType, f.getComment)), + properties = h.getParameters.asScala.toMap, + serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap, + tableType = h.getTableType match { + case HTableType.MANAGED_TABLE => ManagedTable + case HTableType.EXTERNAL_TABLE => ExternalTable + case HTableType.VIRTUAL_VIEW => VirtualView + case HTableType.INDEX_TABLE => IndexTable + }, + location = shim.getDataLocation(h), + inputFormat = Option(h.getInputFormatClass).map(_.getName), + outputFormat = Option(h.getOutputFormatClass).map(_.getName), + serde = Option(h.getSerializationLib), + viewText = Option(h.getViewExpandedText)).withClient(this) + } + converted + } + + private def toInputFormat(name: String) = + Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] + + private def toOutputFormat(name: String) = + Utils.classForName(name) + .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] + + private def toQlTable(table: HiveTable): metadata.Table = { + val qlTable = new metadata.Table(table.database, table.name) + + qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) + qlTable.setPartCols( + table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) + table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } + table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } + + // set owner + qlTable.setOwner(conf.getUser) + // set create time + qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) + + table.location.foreach { loc => shim.setDataLocation(qlTable, loc) } + table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass) + table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass) + table.serde.foreach(qlTable.setSerializationLib) + + qlTable + } + + private def toViewTable(view: HiveTable): metadata.Table = { + // TODO: this is duplicated with `toQlTable` except the table type stuff. + val tbl = new metadata.Table(view.database, view.name) + tbl.setTableType(HTableType.VIRTUAL_VIEW) + tbl.setSerializationLib(null) + tbl.clearSerDeInfo() + + // TODO: we will save the same SQL string to original and expanded text, which is different + // from Hive. + tbl.setViewOriginalText(view.viewText.get) + tbl.setViewExpandedText(view.viewText.get) + + tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) + view.properties.foreach { case (k, v) => tbl.setProperty(k, v) } + + // set owner + tbl.setOwner(conf.getUser) + // set create time + tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) + + tbl + } + + override def createView(view: HiveTable): Unit = withHiveState { + client.createTable(toViewTable(view)) + } + + override def alertView(view: HiveTable): Unit = withHiveState { + client.alterTable(view.qualifiedName, toViewTable(view)) + } + + override def createTable(table: HiveTable): Unit = withHiveState { + val qlTable = toQlTable(table) + client.createTable(qlTable) + } + + override def alterTable(table: HiveTable): Unit = withHiveState { + val qlTable = toQlTable(table) + client.alterTable(table.qualifiedName, qlTable) + } + + private def toHivePartition(partition: metadata.Partition): HivePartition = { + val apiPartition = partition.getTPartition + HivePartition( + values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), + storage = HiveStorageDescriptor( + location = apiPartition.getSd.getLocation, + inputFormat = apiPartition.getSd.getInputFormat, + outputFormat = apiPartition.getSd.getOutputFormat, + serde = apiPartition.getSd.getSerdeInfo.getSerializationLib, + serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + } + + override def getPartitionOption( + table: HiveTable, + partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState { + + val qlTable = toQlTable(table) + val qlPartition = client.getPartition(qlTable, partitionSpec, false) + Option(qlPartition).map(toHivePartition) + } + + override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState { + val qlTable = toQlTable(hTable) + shim.getAllPartitions(client, qlTable).map(toHivePartition) + } + + override def getPartitionsByFilter( + hTable: HiveTable, + predicates: Seq[Expression]): Seq[HivePartition] = withHiveState { + val qlTable = toQlTable(hTable) + shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition) + } + + override def listTables(dbName: String): Seq[String] = withHiveState { + client.getAllTables(dbName).asScala + } + + /** + * Runs the specified SQL query using Hive. + */ + override def runSqlHive(sql: String): Seq[String] = { + val maxResults = 100000 + val results = runHive(sql, maxResults) + // It is very confusing when you only get back some of the results... + if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED") + results + } + + /** + * Execute the command using Hive and return the results as a sequence. Each element + * in the sequence is one row. + */ + protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { + logDebug(s"Running hiveql '$cmd'") + if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") } + try { + val cmd_trimmed: String = cmd.trim() + val tokens: Array[String] = cmd_trimmed.split("\\s+") + // The remainder of the command. + val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() + val proc = shim.getCommandProcessor(tokens(0), conf) + proc match { + case driver: Driver => + val response: CommandProcessorResponse = driver.run(cmd) + // Throw an exception if there is an error in query processing. + if (response.getResponseCode != 0) { + driver.close() + throw new QueryExecutionException(response.getErrorMessage) + } + driver.setMaxRows(maxRows) + + val results = shim.getDriverResults(driver) + driver.close() + results + + case _ => + if (state.out != null) { + // scalastyle:off println + state.out.println(tokens(0) + " " + cmd_1) + // scalastyle:on println + } + Seq(proc.run(cmd_1).getResponseCode.toString) + } + } catch { + case e: Exception => + logError( + s""" + |====================== + |HIVE FAILURE OUTPUT + |====================== + |${outputBuffer.toString} + |====================== + |END HIVE FAILURE OUTPUT + |====================== + """.stripMargin) + throw e + } + } + + def loadPartition( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = withHiveState { + shim.loadPartition( + client, + new Path(loadPath), // TODO: Use URI + tableName, + partSpec, + replace, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } + + def loadTable( + loadPath: String, // TODO URI + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit = withHiveState { + shim.loadTable( + client, + new Path(loadPath), + tableName, + replace, + holdDDLTime) + } + + def loadDynamicPartitions( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit = withHiveState { + shim.loadDynamicPartitions( + client, + new Path(loadPath), + tableName, + partSpec, + replace, + numDP, + holdDDLTime, + listBucketingEnabled) + } + + def addJar(path: String): Unit = { + val uri = new Path(path).toUri + val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL + } else { + // `path` is a URL with a scheme + uri.toURL + } + clientLoader.addJar(jarURL) + runSqlHive(s"ADD JAR $path") + } + + def newSession(): HiveClientImpl = { + clientLoader.createClient().asInstanceOf[HiveClientImpl] + } + + def reset(): Unit = withHiveState { + client.getAllTables("default").asScala.foreach { t => + logDebug(s"Deleting table $t") + val table = client.getTable("default", t) + client.getIndexes("default", t, 255).asScala.foreach { index => + shim.dropIndex(client, "default", t, index.getIndexName) + } + if (!table.isIndexTable) { + client.dropTable("default", t) + } + } + client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => + logDebug(s"Dropping Database: $db") + client.dropDatabase(db, true, false, true) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index ca636b0..70c10be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{IntegralType, StringType} /** - * A shim that defines the interface between ClientWrapper and the underlying Hive library used to - * talk to the metastore. Each Hive version has its own implementation of this class, defining + * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used + * to talk to the metastore. Each Hive version has its own implementation of this class, defining * version-specific version of needed functions. * * The guideline for writing shims is: @@ -52,7 +52,6 @@ private[client] sealed abstract class Shim { /** * Set the current SessionState to the given SessionState. Also, set the context classloader of * the current thread to the one set in the HiveConf of this given `state`. - * @param state */ def setCurrentSessionState(state: SessionState): Unit http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 010051d..dca7396 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -124,15 +124,15 @@ private[hive] object IsolatedClientLoader extends Logging { } /** - * Creates a Hive `ClientInterface` using a classloader that works according to the following rules: + * Creates a [[HiveClient]] using a classloader that works according to the following rules: * - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader` - * allowing the results of calls to the `ClientInterface` to be visible externally. + * allowing the results of calls to the [[HiveClient]] to be visible externally. * - Hive classes: new instances are loaded from `execJars`. These classes are not * accessible externally due to their custom loading. - * - ClientWrapper: a new copy is created for each instance of `IsolatedClassLoader`. + * - [[HiveClientImpl]]: a new copy is created for each instance of `IsolatedClassLoader`. * This new instance is able to see a specific version of hive without using reflection. Since * this is a unique instance, it is not visible externally other than as a generic - * `ClientInterface`, unless `isolationOn` is set to `false`. + * [[HiveClient]], unless `isolationOn` is set to `false`. * * @param version The version of hive on the classpath. used to pick specific function signatures * that are not compatible across versions. @@ -179,7 +179,7 @@ private[hive] class IsolatedClientLoader( /** True if `name` refers to a spark class that must see specific version of Hive. */ protected def isBarrierClass(name: String): Boolean = - name.startsWith(classOf[ClientWrapper].getName) || + name.startsWith(classOf[HiveClientImpl].getName) || name.startsWith(classOf[Shim].getName) || barrierPrefixes.exists(name.startsWith) @@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader( } /** The isolated client interface to Hive. */ - private[hive] def createClient(): ClientInterface = { + private[hive] def createClient(): HiveClient = { if (!isolationOn) { - return new ClientWrapper(version, config, baseClassLoader, this) + return new HiveClientImpl(version, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -244,10 +244,10 @@ private[hive] class IsolatedClientLoader( try { classLoader - .loadClass(classOf[ClientWrapper].getName) + .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head .newInstance(version, config, classLoader, this) - .asInstanceOf[ClientInterface] + .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => if (e.getCause().isInstanceOf[NoClassDefFoundError]) { http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 56cab1a..d5ed838 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -38,13 +38,13 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.util.sequenceOption import org.apache.spark.sql.hive.HiveShim._ -import org.apache.spark.sql.hive.client.ClientWrapper +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types._ private[hive] class HiveFunctionRegistry( underlying: analysis.FunctionRegistry, - executionHive: ClientWrapper) + executionHive: HiveClientImpl) extends analysis.FunctionRegistry with HiveInspectors { def getFunctionInfo(name: String): FunctionInfo = { http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a33223a..246108e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.CacheTableCommand import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.ClientWrapper +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -458,7 +458,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive) } -private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: ClientWrapper) +private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl) extends HiveFunctionRegistry(fr, client) { private val removedFunctions = http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index ff10a25..1344a2c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils /** - * A simple set of tests that call the methods of a hive ClientInterface, loading different version + * A simple set of tests that call the methods of a [[HiveClient]], loading different version * of hive from maven central. These tests are simple in that they are mostly just testing to make * sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality * is not fully tested. @@ -101,7 +101,7 @@ class VersionsSuite extends SparkFunSuite with Logging { private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0") - private var client: ClientInterface = null + private var client: HiveClient = null versions.foreach { version => test(s"$version: create client") { http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 0d62d79..1ada2e3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -199,7 +199,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { "Extended Usage") checkExistence(sql("describe functioN abcadf"), true, - "Function: abcadf is not found.") + "Function: abcadf not found.") checkExistence(sql("describe functioN `~`"), true, "Function: ~", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
