This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 122e9f8a7465 [SPARK-45358][SQL] Remove shim classes for Hive prior
2.0.0
122e9f8a7465 is described below
commit 122e9f8a7465b59a075a292adacf48b1029aaaaa
Author: Cheng Pan <[email protected]>
AuthorDate: Sat Sep 30 20:40:13 2023 +0900
[SPARK-45358][SQL] Remove shim classes for Hive prior 2.0.0
### What changes were proposed in this pull request?
This PR aims to clean up unused Hive shim classes for Hive prior 2.0.0,
it's pure refactor, kind of something like `git squash`
### Why are the changes needed?
SPARK-45328 removed Hive support prior to 2.0.0, this PR is the next step
of the code clean up, see
https://github.com/apache/spark/pull/43116#discussion_r1336612849
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43151 from pan3793/SPARK-45358.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/sql/hive/client/HiveShim.scala | 1052 ++++++--------------
.../spark/sql/hive/client/FiltersSuite.scala | 2 +-
2 files changed, 327 insertions(+), 727 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 01945b3c6f73..90b39375a61e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
-import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap,
Set => JSet}
import java.util.concurrent.TimeUnit
@@ -236,7 +235,7 @@ private[client] sealed abstract class Shim {
protected def findStaticMethod(klass: Class[_], name: String, args:
Class[_]*): Method = {
val method = findMethod(klass, name, args: _*)
- require(Modifier.isStatic(method.getModifiers()),
+ require(Modifier.isStatic(method.getModifiers),
s"Method $name of class $klass is not static.")
method
}
@@ -255,11 +254,17 @@ private[client] sealed abstract class Shim {
}
}
-private class Shim_v0_12 extends Shim with Logging {
- // See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed
- protected lazy val holdDDLTime = JBoolean.FALSE
+private[client] class Shim_v2_0 extends Shim with Logging {
// deletes the underlying data along with metadata
protected lazy val deleteDataInDropIndex = JBoolean.TRUE
+ // true if this is an ACID operation
+ protected lazy val isAcid = JBoolean.FALSE
+ // true if list bucketing enabled
+ protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
+ // throws an exception if the index does not exist
+ protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
+ // txnId can be 0 unless isAcid == true
+ protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
protected lazy val getMSCMethod = {
// Since getMSC() in Hive 0.12 is private, findMethod() could not work here
@@ -272,49 +277,45 @@ private class Shim_v0_12 extends Shim with Logging {
getMSCMethod.invoke(hive).asInstanceOf[IMetaStoreClient]
}
- private lazy val startMethod =
+ private lazy val setCurrentSessionStateMethod =
findStaticMethod(
classOf[SessionState],
- "start",
+ "setCurrentSessionState",
classOf[SessionState])
private lazy val getDataLocationMethod = findMethod(classOf[Table],
"getDataLocation")
private lazy val setDataLocationMethod =
findMethod(
classOf[Table],
"setDataLocation",
- classOf[URI])
+ classOf[Path])
private lazy val getAllPartitionsMethod =
findMethod(
classOf[Hive],
- "getAllPartitionsForPruner",
+ "getAllPartitionsOf",
classOf[Table])
+ private lazy val getPartitionsByFilterMethod =
+ findMethod(
+ classOf[Hive],
+ "getPartitionsByFilter",
+ classOf[Table],
+ classOf[String])
private lazy val getCommandProcessorMethod =
findStaticMethod(
classOf[CommandProcessorFactory],
"get",
- classOf[String],
+ classOf[Array[String]],
classOf[HiveConf])
private lazy val getDriverResultsMethod =
findMethod(
classOf[Driver],
"getResults",
- classOf[JArrayList[String]])
- private lazy val createPartitionMethod =
- findMethod(
- classOf[Hive],
- "createPartition",
- classOf[Table],
- classOf[JMap[String, String]],
- classOf[Path],
- classOf[JMap[String, String]],
- classOf[String],
- classOf[String],
- JInteger.TYPE,
- classOf[JList[Object]],
- classOf[String],
- classOf[JMap[String, String]],
- classOf[JList[Object]],
classOf[JList[Object]])
+ private lazy val getTimeVarMethod =
+ findMethod(
+ classOf[HiveConf],
+ "getTimeVar",
+ classOf[HiveConf.ConfVars],
+ classOf[TimeUnit])
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
@@ -325,6 +326,7 @@ private class Shim_v0_12 extends Shim with Logging {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
+ JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
@@ -333,6 +335,8 @@ private class Shim_v0_12 extends Shim with Logging {
classOf[Path],
classOf[String],
JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
@@ -344,7 +348,8 @@ private class Shim_v0_12 extends Shim with Logging {
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
- JBoolean.TYPE)
+ JBoolean.TYPE,
+ JLong.TYPE)
private lazy val dropIndexMethod =
findMethod(
classOf[Hive],
@@ -352,6 +357,16 @@ private class Shim_v0_12 extends Shim with Logging {
classOf[String],
classOf[String],
classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val dropTableMethod =
+ findMethod(
+ classOf[Hive],
+ "dropTable",
+ classOf[String],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
JBoolean.TYPE)
private lazy val alterTableMethod =
findMethod(
@@ -365,62 +380,53 @@ private class Shim_v0_12 extends Shim with Logging {
"alterPartitions",
classOf[String],
classOf[JList[Partition]])
+ private lazy val dropOptionsClass =
+ Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
+ private lazy val dropOptionsDeleteData =
dropOptionsClass.getField("deleteData")
+ private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
+ private lazy val dropPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "dropPartition",
+ classOf[String],
+ classOf[String],
+ classOf[JList[String]],
+ dropOptionsClass)
+ private lazy val getDatabaseOwnerNameMethod =
+ findMethod(
+ classOf[Database],
+ "getOwnerName")
+ private lazy val setDatabaseOwnerNameMethod =
+ findMethod(
+ classOf[Database],
+ "setOwnerName",
+ classOf[String])
- override def setCurrentSessionState(state: SessionState): Unit = {
- // Starting from Hive 0.13, setCurrentSessionState will internally override
- // the context class loader of the current thread by the class loader set
in
- // the conf of the SessionState. So, for this Hive 0.12 shim, we add the
same
- // behavior and make shim.setCurrentSessionState of all Hive versions have
the
- // consistent behavior.
- Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
- startMethod.invoke(null, state)
- }
+ override def setCurrentSessionState(state: SessionState): Unit =
+ setCurrentSessionStateMethod.invoke(null, state)
override def getDataLocation(table: Table): Option[String] =
Option(getDataLocationMethod.invoke(table)).map(_.toString())
override def setDataLocation(table: Table, loc: String): Unit =
- setDataLocationMethod.invoke(table, new URI(loc))
+ setDataLocationMethod.invoke(table, new Path(loc))
- // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
override def createPartitions(
hive: Hive,
- database: String,
+ dbName: String,
tableName: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
- recordHiveCall()
- val table = hive.getTable(database, tableName)
- parts.foreach { s =>
- val location = s.storage.locationUri.map(
- uri => new Path(table.getPath, new Path(uri))).orNull
- val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
- val spec = s.spec.asJava
- recordHiveCall()
- if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
- // Ignore this partition since it already exists and ignoreIfExists ==
true
- } else {
- if (location == null && table.isView()) {
- throw
QueryExecutionErrors.illegalLocationClauseForViewPartitionError()
- }
-
- recordHiveCall()
- createPartitionMethod.invoke(
- hive,
- table,
- spec,
- location,
- params, // partParams
- null, // inputFormat
- null, // outputFormat
- -1: JInteger, // numBuckets
- null, // cols
- null, // serializationLib
- null, // serdeParams
- null, // bucketCols
- null) // sortCols
+ val addPartitionDesc = new AddPartitionDesc(dbName, tableName,
ignoreIfExists)
+ parts.zipWithIndex.foreach { case (s, i) =>
+ addPartitionDesc.addPartition(
+ s.spec.asJava,
s.storage.locationUri.map(CatalogUtils.URIToString).orNull)
+ if (s.parameters.nonEmpty) {
+ addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
}
}
+ recordHiveCall()
+ hive.createPartitions(addPartitionDesc)
}
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = {
@@ -433,24 +439,129 @@ private class Shim_v0_12 extends Shim with Logging {
table: Table,
predicates: Seq[Expression],
catalogTable: CatalogTable): Seq[Partition] = {
- // getPartitionsByFilter() doesn't support binary comparison ops in Hive
0.12.
- // See HIVE-4888.
- logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
- "Please use Hive 0.13 or higher.")
- getAllPartitions(hive, table)
+ // Hive getPartitionsByFilter() takes a string that represents partition
+ // predicates like "str_key=\"value\" and int_key=1 ..."
+ val filter = convertFilters(table, predicates)
+
+ val partitions =
+ if (filter.isEmpty) {
+ prunePartitionsFastFallback(hive, table, catalogTable, predicates)
+ } else {
+ logDebug(s"Hive metastore filter is '$filter'.")
+ val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
+ val shouldFallback =
SQLConf.get.metastorePartitionPruningFallbackOnException
+ try {
+ // Hive may throw an exception when calling this method in some
circumstances, such as
+ // when filtering on a non-string partition column when the hive
config key
+ // hive.metastore.try.direct.sql is false. In some cases the remote
metastore will throw
+ // exceptions even if the config is true, due to various reasons
including the
+ // underlying RDBMS, Hive bugs when generating the filter, etc.
+ //
+ // Because of the above we'll fallback to use
`Hive.getAllPartitionsOf` when the exception
+ // occurs and the
config`spark.sql.hive.metastorePartitionPruningFallbackOnException` is
+ // enabled.
+ recordHiveCall()
+ getPartitionsByFilterMethod.invoke(hive, table, filter)
+ .asInstanceOf[JArrayList[Partition]]
+ } catch {
+ case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] &&
+ shouldFallback =>
+ logWarning("Caught Hive MetaException attempting to get partition
metadata by " +
+ "filter from Hive. Falling back to fetching all partition
metadata, which will " +
+ "degrade performance. Modifying your Hive metastore
configuration to set " +
+ s"${tryDirectSqlConfVar.varname} to true (if it is not true
already) may resolve " +
+ "this problem. Or you can enable " +
+ s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key}
" +
+ "to alleviate performance downgrade. " +
+ "Otherwise, to avoid degraded performance you can set " +
+
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
+ " to false and let the query fail instead.", ex)
+ // HiveShim clients are expected to handle a superset of the
requested partitions
+ prunePartitionsFastFallback(hive, table, catalogTable, predicates)
+ case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] =>
+ throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
+ }
+ }
+
+ partitions.asScala.toSeq
+ }
+
+ private def prunePartitionsFastFallback(
+ hive: Hive,
+ table: Table,
+ catalogTable: CatalogTable,
+ predicates: Seq[Expression]): java.util.Collection[Partition] = {
+ val timeZoneId = SQLConf.get.sessionLocalTimeZone
+
+ // Because there is no way to know whether the partition properties has
timeZone,
+ // client-side filtering cannot be used with TimeZoneAwareExpression.
+ def hasTimeZoneAwareExpression(e: Expression): Boolean = {
+ e.exists {
+ case cast: Cast => cast.needsTimeZone
+ case tz: TimeZoneAwareExpression => !tz.isInstanceOf[Cast]
+ case _ => false
+ }
+ }
+
+ if (!SQLConf.get.metastorePartitionPruningFastFallback ||
+ predicates.isEmpty ||
+ predicates.exists(hasTimeZoneAwareExpression)) {
+ recordHiveCall()
+ getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
+ } else {
+ try {
+ val partitionSchema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+ catalogTable.partitionSchema)
+ val boundPredicate =
ExternalCatalogUtils.generatePartitionPredicateByFilter(
+ catalogTable, partitionSchema, predicates)
+
+ def toRow(spec: TablePartitionSpec): InternalRow = {
+ InternalRow.fromSeq(partitionSchema.map { field =>
+ val partValue = if (spec(field.name) ==
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
+ null
+ } else {
+ spec(field.name)
+ }
+ Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
+ })
+ }
+
+ recordHiveCall()
+ val allPartitionNames = hive.getPartitionNames(
+ table.getDbName, table.getTableName, -1).asScala
+ val partNames = allPartitionNames.filter { p =>
+ val spec = PartitioningUtils.parsePathFragment(p)
+ boundPredicate.eval(toRow(spec))
+ }
+ recordHiveCall()
+ hive.getPartitionsByNames(table, partNames.asJava)
+ } catch {
+ case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] =>
+ logWarning("Caught Hive MetaException attempting to get partition
metadata by " +
+ "filter from client side. Falling back to fetching all partition
metadata", ex)
+ recordHiveCall()
+ getAllPartitionsMethod.invoke(hive,
table).asInstanceOf[JSet[Partition]]
+ }
+ }
}
override def getCommandProcessor(token: String, conf: HiveConf):
CommandProcessor =
- getCommandProcessorMethod.invoke(null, token,
conf).asInstanceOf[CommandProcessor]
+ getCommandProcessorMethod.invoke(null, Array(token),
conf).asInstanceOf[CommandProcessor]
override def getDriverResults(driver: Driver): Seq[String] = {
- val res = new JArrayList[String]()
+ val res = new JArrayList[Object]()
getDriverResultsMethod.invoke(driver, res)
- res.asScala.toSeq
+ res.asScala.map {
+ case s: String => s
+ case a: Array[Object] => a(0).asInstanceOf[String]
+ }.toSeq
}
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
= {
- conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) *
1000L
+ getTimeVarMethod.invoke(
+ conf,
+ HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
+ TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
override def getTablesByType(
@@ -472,7 +583,8 @@ private class Shim_v0_12 extends Shim with Logging {
isSrcLocal: Boolean): Unit = {
recordHiveCall()
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace:
JBoolean,
- JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir:
JBoolean)
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid)
}
override def loadTable(
@@ -482,7 +594,8 @@ private class Shim_v0_12 extends Shim with Logging {
replace: Boolean,
isSrcLocal: Boolean): Unit = {
recordHiveCall()
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean,
holdDDLTime)
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean,
isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir, isAcid)
}
override def loadDynamicPartitions(
@@ -495,12 +608,13 @@ private class Shim_v0_12 extends Shim with Logging {
listBucketingEnabled: Boolean): Unit = {
recordHiveCall()
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec,
replace: JBoolean,
- numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean)
+ numDP: JInteger, listBucketingEnabled: JBoolean, isAcid,
txnIdInLoadDynamicPartitions)
}
override def dropIndex(hive: Hive, dbName: String, tableName: String,
indexName: String): Unit = {
recordHiveCall()
- dropIndexMethod.invoke(hive, dbName, tableName, indexName,
deleteDataInDropIndex)
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName,
throwExceptionInDropIndex,
+ deleteDataInDropIndex)
}
override def dropTable(
@@ -510,11 +624,9 @@ private class Shim_v0_12 extends Shim with Logging {
deleteData: Boolean,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
- if (purge) {
- throw QueryExecutionErrors.dropTableWithPurgeUnsupportedError()
- }
recordHiveCall()
- hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
+ dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
+ ignoreIfNotExists: JBoolean, purge: JBoolean)
}
override def alterTable(hive: Hive, tableName: String, table: Table): Unit =
{
@@ -534,297 +646,65 @@ private class Shim_v0_12 extends Shim with Logging {
part: JList[String],
deleteData: Boolean,
purge: Boolean): Unit = {
- if (purge) {
- throw
QueryExecutionErrors.alterTableWithDropPartitionAndPurgeUnsupportedError()
- }
+ val dropOptions =
dropOptionsClass.getConstructor().newInstance().asInstanceOf[Object]
+ dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
+ dropOptionsPurge.setBoolean(dropOptions, purge)
recordHiveCall()
- hive.dropPartition(dbName, tableName, part, deleteData)
+ dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
}
- override def createFunction(hive: Hive, db: String, func: CatalogFunction):
Unit = {
- throw QueryCompilationErrors.hiveCreatePermanentFunctionsUnsupportedError()
+ private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+ val resourceUris = f.resources.map { resource =>
+ new ResourceUri(ResourceType.valueOf(
+ resource.resourceType.resourceType.toUpperCase(Locale.ROOT)),
resource.uri)
+ }
+ new HiveFunction(
+ f.identifier.funcName,
+ db,
+ f.className,
+ null,
+ PrincipalType.USER,
+ TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
+ FunctionType.JAVA,
+ resourceUris.asJava)
}
- def dropFunction(hive: Hive, db: String, name: String): Unit = {
- throw new NoSuchPermanentFunctionException(db, name)
+ override def createFunction(hive: Hive, db: String, func: CatalogFunction):
Unit = {
+ recordHiveCall()
+ hive.createFunction(toHiveFunction(func, db))
}
- def renameFunction(hive: Hive, db: String, oldName: String, newName:
String): Unit = {
- throw new NoSuchPermanentFunctionException(db, oldName)
+ override def dropFunction(hive: Hive, db: String, name: String): Unit = {
+ recordHiveCall()
+ hive.dropFunction(db, name)
}
- def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
- throw new NoSuchPermanentFunctionException(db, func.identifier.funcName)
+ override def renameFunction(hive: Hive, db: String, oldName: String,
newName: String): Unit = {
+ val catalogFunc = getFunctionOption(hive, db, oldName)
+ .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
+ .copy(identifier = FunctionIdentifier(newName, Some(db)))
+ val hiveFunc = toHiveFunction(catalogFunc, db)
+ recordHiveCall()
+ hive.alterFunction(db, oldName, hiveFunc)
}
- def getFunctionOption(hive: Hive, db: String, name: String):
Option[CatalogFunction] = {
- None
+ override def alterFunction(hive: Hive, db: String, func: CatalogFunction):
Unit = {
+ recordHiveCall()
+ hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
}
- def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
- Seq.empty[String]
- }
-
- override def getDatabaseOwnerName(db: Database): String = ""
-
- override def setDatabaseOwnerName(db: Database, owner: String): Unit = {}
-
- override def createDatabase(hive: Hive, db: Database, ignoreIfExists:
Boolean): Unit = {
- recordHiveCall()
- hive.createDatabase(db, ignoreIfExists)
- }
-
- override def dropDatabase(
- hive: Hive,
- dbName: String,
- deleteData: Boolean,
- ignoreUnknownDb: Boolean,
- cascade: Boolean): Unit = {
- recordHiveCall()
- hive.dropDatabase(dbName, deleteData, ignoreUnknownDb, cascade)
- }
-
- override def alterDatabase(hive: Hive, dbName: String, d: Database): Unit = {
- recordHiveCall()
- hive.alterDatabase(dbName, d)
- }
-
- override def getDatabase(hive: Hive, dbName: String): Database = {
- recordHiveCall()
- hive.getDatabase(dbName)
- }
-
- override def getAllDatabases(hive: Hive): Seq[String] = {
- recordHiveCall()
- hive.getAllDatabases.asScala.toSeq
- }
-
- override def getDatabasesByPattern(hive: Hive, pattern: String): Seq[String]
= {
- recordHiveCall()
- hive.getDatabasesByPattern(pattern).asScala.toSeq
- }
-
- override def databaseExists(hive: Hive, dbName: String): Boolean = {
- recordHiveCall()
- hive.databaseExists(dbName)
- }
-
- override def createTable(hive: Hive, table: Table, ifNotExists: Boolean):
Unit = {
- recordHiveCall()
- hive.createTable(table, ifNotExists)
- }
-
- override def getTable(
- hive: Hive,
- dbName: String,
- tableName: String,
- throwException: Boolean): Table = {
- recordHiveCall()
- val table = hive.getTable(dbName, tableName, throwException)
- if (table != null) {
- table.getTTable.setTableName(tableName)
- table.getTTable.setDbName(dbName)
- }
- table
- }
-
- override def getTablesByPattern(hive: Hive, dbName: String, pattern:
String): Seq[String] = {
- recordHiveCall()
- hive.getTablesByPattern(dbName, pattern).asScala.toSeq
- }
-
- override def getAllTables(hive: Hive, dbName: String): Seq[String] = {
- recordHiveCall()
- hive.getAllTables(dbName).asScala.toSeq
- }
-
- override def dropTable(hive: Hive, dbName: String, tableName: String): Unit
= {
- recordHiveCall()
- hive.dropTable(dbName, tableName)
- }
-
- override def getPartition(
- hive: Hive,
- table: Table,
- partSpec: JMap[String, String],
- forceCreate: Boolean): Partition = {
- recordHiveCall()
- hive.getPartition(table, partSpec, forceCreate)
- }
-
- override def getPartitions(
- hive: Hive,
- table: Table,
- partSpec: JMap[String, String]): Seq[Partition] = {
- recordHiveCall()
- hive.getPartitions(table, partSpec).asScala.toSeq
- }
-
- override def getPartitionNames(
- hive: Hive,
- dbName: String,
- tableName: String,
- max: Short): Seq[String] = {
- recordHiveCall()
- hive.getPartitionNames(dbName, tableName, max).asScala.toSeq
- }
-
- override def getPartitionNames(
- hive: Hive,
- dbName: String,
- tableName: String,
- partSpec: JMap[String, String],
- max: Short): Seq[String] = {
- recordHiveCall()
- hive.getPartitionNames(dbName, tableName, partSpec, max).asScala.toSeq
- }
-
- override def renamePartition(
- hive: Hive,
- table: Table,
- oldPartSpec: JMap[String, String],
- newPart: Partition): Unit = {
- recordHiveCall()
- hive.renamePartition(table, oldPartSpec, newPart)
- }
-
- override def getIndexes(
- hive: Hive,
- dbName: String,
- tableName: String,
- max: Short): Seq[Index] = {
- recordHiveCall()
- hive.getIndexes(dbName, tableName, max).asScala.toSeq
- }
-}
-
-private class Shim_v0_13 extends Shim_v0_12 {
-
- private lazy val setCurrentSessionStateMethod =
- findStaticMethod(
- classOf[SessionState],
- "setCurrentSessionState",
- classOf[SessionState])
- private lazy val setDataLocationMethod =
- findMethod(
- classOf[Table],
- "setDataLocation",
- classOf[Path])
- private lazy val getAllPartitionsMethod =
- findMethod(
- classOf[Hive],
- "getAllPartitionsOf",
- classOf[Table])
- private lazy val getPartitionsByFilterMethod =
- findMethod(
- classOf[Hive],
- "getPartitionsByFilter",
- classOf[Table],
- classOf[String])
- private lazy val getCommandProcessorMethod =
- findStaticMethod(
- classOf[CommandProcessorFactory],
- "get",
- classOf[Array[String]],
- classOf[HiveConf])
- private lazy val getDriverResultsMethod =
- findMethod(
- classOf[Driver],
- "getResults",
- classOf[JList[Object]])
-
- private lazy val getDatabaseOwnerNameMethod =
- findMethod(
- classOf[Database],
- "getOwnerName")
-
- private lazy val setDatabaseOwnerNameMethod =
- findMethod(
- classOf[Database],
- "setOwnerName",
- classOf[String])
-
- override def setCurrentSessionState(state: SessionState): Unit =
- setCurrentSessionStateMethod.invoke(null, state)
-
- override def setDataLocation(table: Table, loc: String): Unit =
- setDataLocationMethod.invoke(table, new Path(loc))
-
- override def createPartitions(
- hive: Hive,
- db: String,
- table: String,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = {
- val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
- parts.zipWithIndex.foreach { case (s, i) =>
- addPartitionDesc.addPartition(
- s.spec.asJava,
s.storage.locationUri.map(CatalogUtils.URIToString(_)).orNull)
- if (s.parameters.nonEmpty) {
- addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
- }
- }
- recordHiveCall()
- hive.createPartitions(addPartitionDesc)
- }
-
- override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = {
- recordHiveCall()
- getAllPartitionsMethod.invoke(hive,
table).asInstanceOf[JSet[Partition]].asScala.toSeq
- }
-
- private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
- val resourceUris = f.resources.map { resource =>
- new ResourceUri(ResourceType.valueOf(
- resource.resourceType.resourceType.toUpperCase(Locale.ROOT)),
resource.uri)
- }
- new HiveFunction(
- f.identifier.funcName,
- db,
- f.className,
- null,
- PrincipalType.USER,
- TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
- FunctionType.JAVA,
- resourceUris.asJava)
- }
-
- override def createFunction(hive: Hive, db: String, func: CatalogFunction):
Unit = {
- recordHiveCall()
- hive.createFunction(toHiveFunction(func, db))
- }
-
- override def dropFunction(hive: Hive, db: String, name: String): Unit = {
- recordHiveCall()
- hive.dropFunction(db, name)
- }
-
- override def renameFunction(hive: Hive, db: String, oldName: String,
newName: String): Unit = {
- val catalogFunc = getFunctionOption(hive, db, oldName)
- .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
- .copy(identifier = FunctionIdentifier(newName, Some(db)))
- val hiveFunc = toHiveFunction(catalogFunc, db)
- recordHiveCall()
- hive.alterFunction(db, oldName, hiveFunc)
- }
-
- override def alterFunction(hive: Hive, db: String, func: CatalogFunction):
Unit = {
- recordHiveCall()
- hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
- }
-
- private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
- val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
- val resources = hf.getResourceUris.asScala.map { uri =>
- val resourceType = uri.getResourceType() match {
- case ResourceType.ARCHIVE => "archive"
- case ResourceType.FILE => "file"
- case ResourceType.JAR => "jar"
- case r => throw
QueryCompilationErrors.unknownHiveResourceTypeError(r.toString)
- }
- FunctionResource(FunctionResourceType.fromString(resourceType),
uri.getUri())
- }
- CatalogFunction(name, hf.getClassName, resources.toSeq)
+ private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+ val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
+ val resources = hf.getResourceUris.asScala.map { uri =>
+ val resourceType = uri.getResourceType match {
+ case ResourceType.ARCHIVE => "archive"
+ case ResourceType.FILE => "file"
+ case ResourceType.JAR => "jar"
+ case r => throw
QueryCompilationErrors.unknownHiveResourceTypeError(r.toString)
+ }
+ FunctionResource(FunctionResourceType.fromString(resourceType),
uri.getUri)
+ }
+ CatalogFunction(name, hf.getClassName, resources.toSeq)
}
override def getFunctionOption(hive: Hive, db: String, name: String):
Option[CatalogFunction] = {
@@ -900,10 +780,10 @@ private class Shim_v0_13 extends Shim_v0_12 {
// pushing down these predicates, then this optimization will become
incorrect and need
// to be changed.
val extractables = exprs
- .filter {
- case Literal(null, _) => false
- case _ => true
- }.map(ExtractableLiteral.unapply)
+ .filter {
+ case Literal(null, _) => false
+ case _ => true
+ }.map(ExtractableLiteral.unapply)
if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
Some(extractables.map(_.get))
} else {
@@ -1006,7 +886,7 @@ private class Shim_v0_13 extends Shim_v0_12 {
Some(convertInToOr(name, values))
case Not(In(ExtractAttribute(SupportedAttribute(name)),
ExtractableLiterals(values)))
- if useAdvanced =>
+ if useAdvanced =>
Some(convertNotInToAnd(name, values))
case InSet(child, values) if useAdvanced && values.size > inSetThreshold
=>
@@ -1030,7 +910,7 @@ private class Shim_v0_13 extends Shim_v0_12 {
Some(convertInToOr(name, values))
case Not(InSet(ExtractAttribute(SupportedAttribute(name)),
ExtractableValues(values)))
- if useAdvanced =>
+ if useAdvanced =>
Some(convertNotInToAnd(name, values))
case op @ SpecialBinaryComparison(
@@ -1088,421 +968,141 @@ private class Shim_v0_13 extends Shim_v0_12 {
}
}
- override def getPartitionsByFilter(
- hive: Hive,
- table: Table,
- predicates: Seq[Expression],
- catalogTable: CatalogTable): Seq[Partition] = {
- // Hive getPartitionsByFilter() takes a string that represents partition
- // predicates like "str_key=\"value\" and int_key=1 ..."
- val filter = convertFilters(table, predicates)
+ override def getDatabaseOwnerName(db: Database): String = {
+
Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("")
+ }
- val partitions =
- if (filter.isEmpty) {
- prunePartitionsFastFallback(hive, table, catalogTable, predicates)
- } else {
- logDebug(s"Hive metastore filter is '$filter'.")
- val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
- val shouldFallback =
SQLConf.get.metastorePartitionPruningFallbackOnException
- try {
- // Hive may throw an exception when calling this method in some
circumstances, such as
- // when filtering on a non-string partition column when the hive
config key
- // hive.metastore.try.direct.sql is false. In some cases the remote
metastore will throw
- // exceptions even if the config is true, due to various reasons
including the
- // underlying RDBMS, Hive bugs when generating the filter, etc.
- //
- // Because of the above we'll fallback to use
`Hive.getAllPartitionsOf` when the exception
- // occurs and the
config`spark.sql.hive.metastorePartitionPruningFallbackOnException` is
- // enabled.
- recordHiveCall()
- getPartitionsByFilterMethod.invoke(hive, table, filter)
- .asInstanceOf[JArrayList[Partition]]
- } catch {
- case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] &&
- shouldFallback =>
- logWarning("Caught Hive MetaException attempting to get partition
metadata by " +
- "filter from Hive. Falling back to fetching all partition
metadata, which will " +
- "degrade performance. Modifying your Hive metastore
configuration to set " +
- s"${tryDirectSqlConfVar.varname} to true (if it is not true
already) may resolve " +
- "this problem. Or you can enable " +
- s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key}
" +
- "to alleviate performance downgrade. " +
- "Otherwise, to avoid degraded performance you can set " +
-
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
- " to false and let the query fail instead.", ex)
- // HiveShim clients are expected to handle a superset of the
requested partitions
- prunePartitionsFastFallback(hive, table, catalogTable, predicates)
- case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] =>
- throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
- }
- }
+ override def setDatabaseOwnerName(db: Database, owner: String): Unit = {
+ setDatabaseOwnerNameMethod.invoke(db, owner)
+ }
- partitions.asScala.toSeq
+ override def createDatabase(hive: Hive, db: Database, ignoreIfExists:
Boolean): Unit = {
+ recordHiveCall()
+ hive.createDatabase(db, ignoreIfExists)
}
- private def prunePartitionsFastFallback(
+ override def dropDatabase(
hive: Hive,
- table: Table,
- catalogTable: CatalogTable,
- predicates: Seq[Expression]): java.util.Collection[Partition] = {
- val timeZoneId = SQLConf.get.sessionLocalTimeZone
-
- // Because there is no way to know whether the partition properties has
timeZone,
- // client-side filtering cannot be used with TimeZoneAwareExpression.
- def hasTimeZoneAwareExpression(e: Expression): Boolean = {
- e.exists {
- case cast: Cast => cast.needsTimeZone
- case tz: TimeZoneAwareExpression => !tz.isInstanceOf[Cast]
- case _ => false
- }
- }
-
- if (!SQLConf.get.metastorePartitionPruningFastFallback ||
- predicates.isEmpty ||
- predicates.exists(hasTimeZoneAwareExpression)) {
- recordHiveCall()
- getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
- } else {
- try {
- val partitionSchema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
- catalogTable.partitionSchema)
- val boundPredicate =
ExternalCatalogUtils.generatePartitionPredicateByFilter(
- catalogTable, partitionSchema, predicates)
-
- def toRow(spec: TablePartitionSpec): InternalRow = {
- InternalRow.fromSeq(partitionSchema.map { field =>
- val partValue = if (spec(field.name) ==
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
- null
- } else {
- spec(field.name)
- }
- Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
- })
- }
-
- recordHiveCall()
- val allPartitionNames = hive.getPartitionNames(
- table.getDbName, table.getTableName, -1).asScala
- val partNames = allPartitionNames.filter { p =>
- val spec = PartitioningUtils.parsePathFragment(p)
- boundPredicate.eval(toRow(spec))
- }
- recordHiveCall()
- hive.getPartitionsByNames(table, partNames.asJava)
- } catch {
- case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] =>
- logWarning("Caught Hive MetaException attempting to get partition
metadata by " +
- "filter from client side. Falling back to fetching all partition
metadata", ex)
- recordHiveCall()
- getAllPartitionsMethod.invoke(hive,
table).asInstanceOf[JSet[Partition]]
- }
- }
+ dbName: String,
+ deleteData: Boolean,
+ ignoreUnknownDb: Boolean,
+ cascade: Boolean): Unit = {
+ recordHiveCall()
+ hive.dropDatabase(dbName, deleteData, ignoreUnknownDb, cascade)
}
- override def getCommandProcessor(token: String, conf: HiveConf):
CommandProcessor =
- getCommandProcessorMethod.invoke(null, Array(token),
conf).asInstanceOf[CommandProcessor]
-
- override def getDriverResults(driver: Driver): Seq[String] = {
- val res = new JArrayList[Object]()
- getDriverResultsMethod.invoke(driver, res)
- res.asScala.map { r =>
- r match {
- case s: String => s
- case a: Array[Object] => a(0).asInstanceOf[String]
- }
- }.toSeq
+ override def alterDatabase(hive: Hive, dbName: String, d: Database): Unit = {
+ recordHiveCall()
+ hive.alterDatabase(dbName, d)
}
- override def getDatabaseOwnerName(db: Database): String = {
-
Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("")
+ override def getDatabase(hive: Hive, dbName: String): Database = {
+ recordHiveCall()
+ hive.getDatabase(dbName)
}
- override def setDatabaseOwnerName(db: Database, owner: String): Unit = {
- setDatabaseOwnerNameMethod.invoke(db, owner)
+ override def getAllDatabases(hive: Hive): Seq[String] = {
+ recordHiveCall()
+ hive.getAllDatabases.asScala.toSeq
}
-}
-
-private class Shim_v0_14 extends Shim_v0_13 {
-
- // true if this is an ACID operation
- protected lazy val isAcid = JBoolean.FALSE
- // true if list bucketing enabled
- protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
-
- private lazy val loadPartitionMethod =
- findMethod(
- classOf[Hive],
- "loadPartition",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadTableMethod =
- findMethod(
- classOf[Hive],
- "loadTable",
- classOf[Path],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadDynamicPartitionsMethod =
- findMethod(
- classOf[Hive],
- "loadDynamicPartitions",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JInteger.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val dropTableMethod =
- findMethod(
- classOf[Hive],
- "dropTable",
- classOf[String],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val getTimeVarMethod =
- findMethod(
- classOf[HiveConf],
- "getTimeVar",
- classOf[HiveConf.ConfVars],
- classOf[TimeUnit])
- override def loadPartition(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean,
- isSrcLocal: Boolean): Unit = {
+ override def getDatabasesByPattern(hive: Hive, pattern: String): Seq[String]
= {
recordHiveCall()
- loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace:
JBoolean,
- holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir:
JBoolean,
- isSrcLocal: JBoolean, isAcid)
+ hive.getDatabasesByPattern(pattern).asScala.toSeq
}
- override def loadTable(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- replace: Boolean,
- isSrcLocal: Boolean): Unit = {
+ override def databaseExists(hive: Hive, dbName: String): Boolean = {
recordHiveCall()
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean,
holdDDLTime,
- isSrcLocal: JBoolean, isSkewedStoreAsSubdir, isAcid)
+ hive.databaseExists(dbName)
}
- override def loadDynamicPartitions(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- listBucketingEnabled: Boolean): Unit = {
+ override def createTable(hive: Hive, table: Table, ifNotExists: Boolean):
Unit = {
recordHiveCall()
- loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec,
replace: JBoolean,
- numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid)
+ hive.createTable(table, ifNotExists)
}
- override def dropTable(
+ override def getTable(
hive: Hive,
dbName: String,
tableName: String,
- deleteData: Boolean,
- ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = {
+ throwException: Boolean): Table = {
recordHiveCall()
- dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
- ignoreIfNotExists: JBoolean, purge: JBoolean)
+ val table = hive.getTable(dbName, tableName, throwException)
+ if (table != null) {
+ table.getTTable.setTableName(tableName)
+ table.getTTable.setDbName(dbName)
+ }
+ table
}
- override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
= {
- getTimeVarMethod.invoke(
- conf,
- HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
- TimeUnit.MILLISECONDS).asInstanceOf[Long]
+ override def getTablesByPattern(hive: Hive, dbName: String, pattern:
String): Seq[String] = {
+ recordHiveCall()
+ hive.getTablesByPattern(dbName, pattern).asScala.toSeq
}
-}
-
-private class Shim_v1_0 extends Shim_v0_14
-
-private class Shim_v1_1 extends Shim_v1_0 {
-
- // throws an exception if the index does not exist
- protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
-
- private lazy val dropIndexMethod =
- findMethod(
- classOf[Hive],
- "dropIndex",
- classOf[String],
- classOf[String],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE)
-
- override def dropIndex(hive: Hive, dbName: String, tableName: String,
indexName: String): Unit = {
+ override def getAllTables(hive: Hive, dbName: String): Seq[String] = {
recordHiveCall()
- dropIndexMethod.invoke(hive, dbName, tableName, indexName,
throwExceptionInDropIndex,
- deleteDataInDropIndex)
+ hive.getAllTables(dbName).asScala.toSeq
}
-}
-
-private class Shim_v1_2 extends Shim_v1_1 {
-
- // txnId can be 0 unless isAcid == true
- protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
-
- private lazy val loadDynamicPartitionsMethod =
- findMethod(
- classOf[Hive],
- "loadDynamicPartitions",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JInteger.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JLong.TYPE)
-
- private lazy val dropOptionsClass =
-
Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
- private lazy val dropOptionsDeleteData =
dropOptionsClass.getField("deleteData")
- private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
- private lazy val dropPartitionMethod =
- findMethod(
- classOf[Hive],
- "dropPartition",
- classOf[String],
- classOf[String],
- classOf[JList[String]],
- dropOptionsClass)
+ override def dropTable(hive: Hive, dbName: String, tableName: String): Unit
= {
+ recordHiveCall()
+ hive.dropTable(dbName, tableName)
+ }
- override def loadDynamicPartitions(
+ override def getPartition(
hive: Hive,
- loadPath: Path,
- tableName: String,
+ table: Table,
partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- listBucketingEnabled: Boolean): Unit = {
+ forceCreate: Boolean): Partition = {
recordHiveCall()
- loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec,
replace: JBoolean,
- numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid,
- txnIdInLoadDynamicPartitions)
+ hive.getPartition(table, partSpec, forceCreate)
}
- override def dropPartition(
+ override def getPartitions(
+ hive: Hive,
+ table: Table,
+ partSpec: JMap[String, String]): Seq[Partition] = {
+ recordHiveCall()
+ hive.getPartitions(table, partSpec).asScala.toSeq
+ }
+
+ override def getPartitionNames(
hive: Hive,
dbName: String,
tableName: String,
- part: JList[String],
- deleteData: Boolean,
- purge: Boolean): Unit = {
- val dropOptions =
dropOptionsClass.getConstructor().newInstance().asInstanceOf[Object]
- dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
- dropOptionsPurge.setBoolean(dropOptions, purge)
+ max: Short): Seq[String] = {
recordHiveCall()
- dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
+ hive.getPartitionNames(dbName, tableName, max).asScala.toSeq
}
-}
-
-private[client] class Shim_v2_0 extends Shim_v1_2 {
- private lazy val loadPartitionMethod =
- findMethod(
- classOf[Hive],
- "loadPartition",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadTableMethod =
- findMethod(
- classOf[Hive],
- "loadTable",
- classOf[Path],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadDynamicPartitionsMethod =
- findMethod(
- classOf[Hive],
- "loadDynamicPartitions",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JInteger.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JLong.TYPE)
-
- override def loadPartition(
+ override def getPartitionNames(
hive: Hive,
- loadPath: Path,
+ dbName: String,
tableName: String,
partSpec: JMap[String, String],
- replace: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean,
- isSrcLocal: Boolean): Unit = {
+ max: Short): Seq[String] = {
recordHiveCall()
- loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace:
JBoolean,
- inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal: JBoolean, isAcid)
+ hive.getPartitionNames(dbName, tableName, partSpec, max).asScala.toSeq
}
- override def loadTable(
+ override def renamePartition(
hive: Hive,
- loadPath: Path,
- tableName: String,
- replace: Boolean,
- isSrcLocal: Boolean): Unit = {
+ table: Table,
+ oldPartSpec: JMap[String, String],
+ newPart: Partition): Unit = {
recordHiveCall()
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean,
isSrcLocal: JBoolean,
- isSkewedStoreAsSubdir, isAcid)
+ hive.renamePartition(table, oldPartSpec, newPart)
}
- override def loadDynamicPartitions(
+ override def getIndexes(
hive: Hive,
- loadPath: Path,
+ dbName: String,
tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- listBucketingEnabled: Boolean): Unit = {
+ max: Short): Seq[Index] = {
recordHiveCall()
- loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec,
replace: JBoolean,
- numDP: JInteger, listBucketingEnabled: JBoolean, isAcid,
txnIdInLoadDynamicPartitions)
+ hive.getIndexes(dbName, tableName, max).asScala.toSeq
}
-
}
private[client] class Shim_v2_1 extends Shim_v2_0 {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
index 627206c583e8..da4f193101a6 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String
* metastore
*/
class FiltersSuite extends SparkFunSuite with PlanTest {
- private val shim = new Shim_v0_13
+ private val shim = new Shim_v2_0
private val testTable = new
org.apache.hadoop.hive.ql.metadata.Table("default", "test")
private val varCharCol = new FieldSchema()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]