This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 122e9f8a7465 [SPARK-45358][SQL] Remove shim classes for Hive prior 
2.0.0
122e9f8a7465 is described below

commit 122e9f8a7465b59a075a292adacf48b1029aaaaa
Author: Cheng Pan <[email protected]>
AuthorDate: Sat Sep 30 20:40:13 2023 +0900

    [SPARK-45358][SQL] Remove shim classes for Hive prior 2.0.0
    
    ### What changes were proposed in this pull request?
    
    This PR aims to clean up unused Hive shim classes for Hive prior 2.0.0, 
it's pure refactor, kind of something like `git squash`
    
    ### Why are the changes needed?
    
    SPARK-45328 removed Hive support prior to 2.0.0, this PR is the next step 
of the code clean up, see 
https://github.com/apache/spark/pull/43116#discussion_r1336612849
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43151 from pan3793/SPARK-45358.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../apache/spark/sql/hive/client/HiveShim.scala    | 1052 ++++++--------------
 .../spark/sql/hive/client/FiltersSuite.scala       |    2 +-
 2 files changed, 327 insertions(+), 727 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 01945b3c6f73..90b39375a61e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.client
 
 import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
 import java.lang.reflect.{InvocationTargetException, Method, Modifier}
-import java.net.URI
 import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, 
Set => JSet}
 import java.util.concurrent.TimeUnit
 
@@ -236,7 +235,7 @@ private[client] sealed abstract class Shim {
 
   protected def findStaticMethod(klass: Class[_], name: String, args: 
Class[_]*): Method = {
     val method = findMethod(klass, name, args: _*)
-    require(Modifier.isStatic(method.getModifiers()),
+    require(Modifier.isStatic(method.getModifiers),
       s"Method $name of class $klass is not static.")
     method
   }
@@ -255,11 +254,17 @@ private[client] sealed abstract class Shim {
   }
 }
 
-private class Shim_v0_12 extends Shim with Logging {
-  // See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed
-  protected lazy val holdDDLTime = JBoolean.FALSE
+private[client] class Shim_v2_0 extends Shim with Logging {
   // deletes the underlying data along with metadata
   protected lazy val deleteDataInDropIndex = JBoolean.TRUE
+  // true if this is an ACID operation
+  protected lazy val isAcid = JBoolean.FALSE
+  // true if list bucketing enabled
+  protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
+  // throws an exception if the index does not exist
+  protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
+  // txnId can be 0 unless isAcid == true
+  protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
 
   protected lazy val getMSCMethod = {
     // Since getMSC() in Hive 0.12 is private, findMethod() could not work here
@@ -272,49 +277,45 @@ private class Shim_v0_12 extends Shim with Logging {
     getMSCMethod.invoke(hive).asInstanceOf[IMetaStoreClient]
   }
 
-  private lazy val startMethod =
+  private lazy val setCurrentSessionStateMethod =
     findStaticMethod(
       classOf[SessionState],
-      "start",
+      "setCurrentSessionState",
       classOf[SessionState])
   private lazy val getDataLocationMethod = findMethod(classOf[Table], 
"getDataLocation")
   private lazy val setDataLocationMethod =
     findMethod(
       classOf[Table],
       "setDataLocation",
-      classOf[URI])
+      classOf[Path])
   private lazy val getAllPartitionsMethod =
     findMethod(
       classOf[Hive],
-      "getAllPartitionsForPruner",
+      "getAllPartitionsOf",
       classOf[Table])
+  private lazy val getPartitionsByFilterMethod =
+    findMethod(
+      classOf[Hive],
+      "getPartitionsByFilter",
+      classOf[Table],
+      classOf[String])
   private lazy val getCommandProcessorMethod =
     findStaticMethod(
       classOf[CommandProcessorFactory],
       "get",
-      classOf[String],
+      classOf[Array[String]],
       classOf[HiveConf])
   private lazy val getDriverResultsMethod =
     findMethod(
       classOf[Driver],
       "getResults",
-      classOf[JArrayList[String]])
-  private lazy val createPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "createPartition",
-      classOf[Table],
-      classOf[JMap[String, String]],
-      classOf[Path],
-      classOf[JMap[String, String]],
-      classOf[String],
-      classOf[String],
-      JInteger.TYPE,
-      classOf[JList[Object]],
-      classOf[String],
-      classOf[JMap[String, String]],
-      classOf[JList[Object]],
       classOf[JList[Object]])
+  private lazy val getTimeVarMethod =
+    findMethod(
+      classOf[HiveConf],
+      "getTimeVar",
+      classOf[HiveConf.ConfVars],
+      classOf[TimeUnit])
   private lazy val loadPartitionMethod =
     findMethod(
       classOf[Hive],
@@ -325,6 +326,7 @@ private class Shim_v0_12 extends Shim with Logging {
       JBoolean.TYPE,
       JBoolean.TYPE,
       JBoolean.TYPE,
+      JBoolean.TYPE,
       JBoolean.TYPE)
   private lazy val loadTableMethod =
     findMethod(
@@ -333,6 +335,8 @@ private class Shim_v0_12 extends Shim with Logging {
       classOf[Path],
       classOf[String],
       JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
       JBoolean.TYPE)
   private lazy val loadDynamicPartitionsMethod =
     findMethod(
@@ -344,7 +348,8 @@ private class Shim_v0_12 extends Shim with Logging {
       JBoolean.TYPE,
       JInteger.TYPE,
       JBoolean.TYPE,
-      JBoolean.TYPE)
+      JBoolean.TYPE,
+      JLong.TYPE)
   private lazy val dropIndexMethod =
     findMethod(
       classOf[Hive],
@@ -352,6 +357,16 @@ private class Shim_v0_12 extends Shim with Logging {
       classOf[String],
       classOf[String],
       classOf[String],
+      JBoolean.TYPE,
+      JBoolean.TYPE)
+  private lazy val dropTableMethod =
+    findMethod(
+      classOf[Hive],
+      "dropTable",
+      classOf[String],
+      classOf[String],
+      JBoolean.TYPE,
+      JBoolean.TYPE,
       JBoolean.TYPE)
   private lazy val alterTableMethod =
     findMethod(
@@ -365,62 +380,53 @@ private class Shim_v0_12 extends Shim with Logging {
       "alterPartitions",
       classOf[String],
       classOf[JList[Partition]])
+  private lazy val dropOptionsClass =
+    Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
+  private lazy val dropOptionsDeleteData = 
dropOptionsClass.getField("deleteData")
+  private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
+  private lazy val dropPartitionMethod =
+    findMethod(
+      classOf[Hive],
+      "dropPartition",
+      classOf[String],
+      classOf[String],
+      classOf[JList[String]],
+      dropOptionsClass)
+  private lazy val getDatabaseOwnerNameMethod =
+    findMethod(
+      classOf[Database],
+      "getOwnerName")
+  private lazy val setDatabaseOwnerNameMethod =
+    findMethod(
+      classOf[Database],
+      "setOwnerName",
+      classOf[String])
 
-  override def setCurrentSessionState(state: SessionState): Unit = {
-    // Starting from Hive 0.13, setCurrentSessionState will internally override
-    // the context class loader of the current thread by the class loader set 
in
-    // the conf of the SessionState. So, for this Hive 0.12 shim, we add the 
same
-    // behavior and make shim.setCurrentSessionState of all Hive versions have 
the
-    // consistent behavior.
-    Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
-    startMethod.invoke(null, state)
-  }
+  override def setCurrentSessionState(state: SessionState): Unit =
+    setCurrentSessionStateMethod.invoke(null, state)
 
   override def getDataLocation(table: Table): Option[String] =
     Option(getDataLocationMethod.invoke(table)).map(_.toString())
 
   override def setDataLocation(table: Table, loc: String): Unit =
-    setDataLocationMethod.invoke(table, new URI(loc))
+    setDataLocationMethod.invoke(table, new Path(loc))
 
-  // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
   override def createPartitions(
       hive: Hive,
-      database: String,
+      dbName: String,
       tableName: String,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
-    recordHiveCall()
-    val table = hive.getTable(database, tableName)
-    parts.foreach { s =>
-      val location = s.storage.locationUri.map(
-        uri => new Path(table.getPath, new Path(uri))).orNull
-      val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
-      val spec = s.spec.asJava
-      recordHiveCall()
-      if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
-        // Ignore this partition since it already exists and ignoreIfExists == 
true
-      } else {
-        if (location == null && table.isView()) {
-          throw 
QueryExecutionErrors.illegalLocationClauseForViewPartitionError()
-        }
-
-        recordHiveCall()
-        createPartitionMethod.invoke(
-          hive,
-          table,
-          spec,
-          location,
-          params, // partParams
-          null, // inputFormat
-          null, // outputFormat
-          -1: JInteger, // numBuckets
-          null, // cols
-          null, // serializationLib
-          null, // serdeParams
-          null, // bucketCols
-          null) // sortCols
+    val addPartitionDesc = new AddPartitionDesc(dbName, tableName, 
ignoreIfExists)
+    parts.zipWithIndex.foreach { case (s, i) =>
+      addPartitionDesc.addPartition(
+        s.spec.asJava, 
s.storage.locationUri.map(CatalogUtils.URIToString).orNull)
+      if (s.parameters.nonEmpty) {
+        addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
       }
     }
+    recordHiveCall()
+    hive.createPartitions(addPartitionDesc)
   }
 
   override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = {
@@ -433,24 +439,129 @@ private class Shim_v0_12 extends Shim with Logging {
       table: Table,
       predicates: Seq[Expression],
       catalogTable: CatalogTable): Seq[Partition] = {
-    // getPartitionsByFilter() doesn't support binary comparison ops in Hive 
0.12.
-    // See HIVE-4888.
-    logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
-      "Please use Hive 0.13 or higher.")
-    getAllPartitions(hive, table)
+    // Hive getPartitionsByFilter() takes a string that represents partition
+    // predicates like "str_key=\"value\" and int_key=1 ..."
+    val filter = convertFilters(table, predicates)
+
+    val partitions =
+      if (filter.isEmpty) {
+        prunePartitionsFastFallback(hive, table, catalogTable, predicates)
+      } else {
+        logDebug(s"Hive metastore filter is '$filter'.")
+        val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
+        val shouldFallback = 
SQLConf.get.metastorePartitionPruningFallbackOnException
+        try {
+          // Hive may throw an exception when calling this method in some 
circumstances, such as
+          // when filtering on a non-string partition column when the hive 
config key
+          // hive.metastore.try.direct.sql is false. In some cases the remote 
metastore will throw
+          // exceptions even if the config is true, due to various reasons 
including the
+          // underlying RDBMS, Hive bugs when generating the filter, etc.
+          //
+          // Because of the above we'll fallback to use 
`Hive.getAllPartitionsOf` when the exception
+          // occurs and the 
config`spark.sql.hive.metastorePartitionPruningFallbackOnException` is
+          // enabled.
+          recordHiveCall()
+          getPartitionsByFilterMethod.invoke(hive, table, filter)
+            .asInstanceOf[JArrayList[Partition]]
+        } catch {
+          case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
+            shouldFallback =>
+            logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
+              "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
+              "degrade performance. Modifying your Hive metastore 
configuration to set " +
+              s"${tryDirectSqlConfVar.varname} to true (if it is not true 
already) may resolve " +
+              "this problem. Or you can enable " +
+              s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key} 
" +
+              "to alleviate performance downgrade. " +
+              "Otherwise, to avoid degraded performance you can set " +
+              
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
+              " to false and let the query fail instead.", ex)
+            // HiveShim clients are expected to handle a superset of the 
requested partitions
+            prunePartitionsFastFallback(hive, table, catalogTable, predicates)
+          case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
+            throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
+        }
+      }
+
+    partitions.asScala.toSeq
+  }
+
+  private def prunePartitionsFastFallback(
+      hive: Hive,
+      table: Table,
+      catalogTable: CatalogTable,
+      predicates: Seq[Expression]): java.util.Collection[Partition] = {
+    val timeZoneId = SQLConf.get.sessionLocalTimeZone
+
+    // Because there is no way to know whether the partition properties has 
timeZone,
+    // client-side filtering cannot be used with TimeZoneAwareExpression.
+    def hasTimeZoneAwareExpression(e: Expression): Boolean = {
+      e.exists {
+        case cast: Cast => cast.needsTimeZone
+        case tz: TimeZoneAwareExpression => !tz.isInstanceOf[Cast]
+        case _ => false
+      }
+    }
+
+    if (!SQLConf.get.metastorePartitionPruningFastFallback ||
+      predicates.isEmpty ||
+      predicates.exists(hasTimeZoneAwareExpression)) {
+      recordHiveCall()
+      getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
+    } else {
+      try {
+        val partitionSchema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+          catalogTable.partitionSchema)
+        val boundPredicate = 
ExternalCatalogUtils.generatePartitionPredicateByFilter(
+          catalogTable, partitionSchema, predicates)
+
+        def toRow(spec: TablePartitionSpec): InternalRow = {
+          InternalRow.fromSeq(partitionSchema.map { field =>
+            val partValue = if (spec(field.name) == 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
+              null
+            } else {
+              spec(field.name)
+            }
+            Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
+          })
+        }
+
+        recordHiveCall()
+        val allPartitionNames = hive.getPartitionNames(
+          table.getDbName, table.getTableName, -1).asScala
+        val partNames = allPartitionNames.filter { p =>
+          val spec = PartitioningUtils.parsePathFragment(p)
+          boundPredicate.eval(toRow(spec))
+        }
+        recordHiveCall()
+        hive.getPartitionsByNames(table, partNames.asJava)
+      } catch {
+        case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
+          logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
+            "filter from client side. Falling back to fetching all partition 
metadata", ex)
+          recordHiveCall()
+          getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
+      }
+    }
   }
 
   override def getCommandProcessor(token: String, conf: HiveConf): 
CommandProcessor =
-    getCommandProcessorMethod.invoke(null, token, 
conf).asInstanceOf[CommandProcessor]
+    getCommandProcessorMethod.invoke(null, Array(token), 
conf).asInstanceOf[CommandProcessor]
 
   override def getDriverResults(driver: Driver): Seq[String] = {
-    val res = new JArrayList[String]()
+    val res = new JArrayList[Object]()
     getDriverResultsMethod.invoke(driver, res)
-    res.asScala.toSeq
+    res.asScala.map {
+      case s: String => s
+      case a: Array[Object] => a(0).asInstanceOf[String]
+    }.toSeq
   }
 
   override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
-    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 
1000L
+    getTimeVarMethod.invoke(
+      conf,
+      HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
+      TimeUnit.MILLISECONDS).asInstanceOf[Long]
   }
 
   override def getTablesByType(
@@ -472,7 +583,8 @@ private class Shim_v0_12 extends Shim with Logging {
       isSrcLocal: Boolean): Unit = {
     recordHiveCall()
     loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
-      JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: 
JBoolean)
+      inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+      isSrcLocal: JBoolean, isAcid)
   }
 
   override def loadTable(
@@ -482,7 +594,8 @@ private class Shim_v0_12 extends Shim with Logging {
       replace: Boolean,
       isSrcLocal: Boolean): Unit = {
     recordHiveCall()
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
holdDDLTime)
+    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
isSrcLocal: JBoolean,
+      isSkewedStoreAsSubdir, isAcid)
   }
 
   override def loadDynamicPartitions(
@@ -495,12 +608,13 @@ private class Shim_v0_12 extends Shim with Logging {
       listBucketingEnabled: Boolean): Unit = {
     recordHiveCall()
     loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean)
+      numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, 
txnIdInLoadDynamicPartitions)
   }
 
   override def dropIndex(hive: Hive, dbName: String, tableName: String, 
indexName: String): Unit = {
     recordHiveCall()
-    dropIndexMethod.invoke(hive, dbName, tableName, indexName, 
deleteDataInDropIndex)
+    dropIndexMethod.invoke(hive, dbName, tableName, indexName, 
throwExceptionInDropIndex,
+      deleteDataInDropIndex)
   }
 
   override def dropTable(
@@ -510,11 +624,9 @@ private class Shim_v0_12 extends Shim with Logging {
       deleteData: Boolean,
       ignoreIfNotExists: Boolean,
       purge: Boolean): Unit = {
-    if (purge) {
-      throw QueryExecutionErrors.dropTableWithPurgeUnsupportedError()
-    }
     recordHiveCall()
-    hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
+    dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
+      ignoreIfNotExists: JBoolean, purge: JBoolean)
   }
 
   override def alterTable(hive: Hive, tableName: String, table: Table): Unit = 
{
@@ -534,297 +646,65 @@ private class Shim_v0_12 extends Shim with Logging {
       part: JList[String],
       deleteData: Boolean,
       purge: Boolean): Unit = {
-    if (purge) {
-      throw 
QueryExecutionErrors.alterTableWithDropPartitionAndPurgeUnsupportedError()
-    }
+    val dropOptions = 
dropOptionsClass.getConstructor().newInstance().asInstanceOf[Object]
+    dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
+    dropOptionsPurge.setBoolean(dropOptions, purge)
     recordHiveCall()
-    hive.dropPartition(dbName, tableName, part, deleteData)
+    dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
   }
 
-  override def createFunction(hive: Hive, db: String, func: CatalogFunction): 
Unit = {
-    throw QueryCompilationErrors.hiveCreatePermanentFunctionsUnsupportedError()
+  private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+    val resourceUris = f.resources.map { resource =>
+      new ResourceUri(ResourceType.valueOf(
+        resource.resourceType.resourceType.toUpperCase(Locale.ROOT)), 
resource.uri)
+    }
+    new HiveFunction(
+      f.identifier.funcName,
+      db,
+      f.className,
+      null,
+      PrincipalType.USER,
+      TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
+      FunctionType.JAVA,
+      resourceUris.asJava)
   }
 
-  def dropFunction(hive: Hive, db: String, name: String): Unit = {
-    throw new NoSuchPermanentFunctionException(db, name)
+  override def createFunction(hive: Hive, db: String, func: CatalogFunction): 
Unit = {
+    recordHiveCall()
+    hive.createFunction(toHiveFunction(func, db))
   }
 
-  def renameFunction(hive: Hive, db: String, oldName: String, newName: 
String): Unit = {
-    throw new NoSuchPermanentFunctionException(db, oldName)
+  override def dropFunction(hive: Hive, db: String, name: String): Unit = {
+    recordHiveCall()
+    hive.dropFunction(db, name)
   }
 
-  def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
-    throw new NoSuchPermanentFunctionException(db, func.identifier.funcName)
+  override def renameFunction(hive: Hive, db: String, oldName: String, 
newName: String): Unit = {
+    val catalogFunc = getFunctionOption(hive, db, oldName)
+      .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
+      .copy(identifier = FunctionIdentifier(newName, Some(db)))
+    val hiveFunc = toHiveFunction(catalogFunc, db)
+    recordHiveCall()
+    hive.alterFunction(db, oldName, hiveFunc)
   }
 
-  def getFunctionOption(hive: Hive, db: String, name: String): 
Option[CatalogFunction] = {
-    None
+  override def alterFunction(hive: Hive, db: String, func: CatalogFunction): 
Unit = {
+    recordHiveCall()
+    hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
   }
 
-  def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
-    Seq.empty[String]
-  }
-
-  override def getDatabaseOwnerName(db: Database): String = ""
-
-  override def setDatabaseOwnerName(db: Database, owner: String): Unit = {}
-
-  override def createDatabase(hive: Hive, db: Database, ignoreIfExists: 
Boolean): Unit = {
-    recordHiveCall()
-    hive.createDatabase(db, ignoreIfExists)
-  }
-
-  override def dropDatabase(
-      hive: Hive,
-      dbName: String,
-      deleteData: Boolean,
-      ignoreUnknownDb: Boolean,
-      cascade: Boolean): Unit = {
-    recordHiveCall()
-    hive.dropDatabase(dbName, deleteData, ignoreUnknownDb, cascade)
-  }
-
-  override def alterDatabase(hive: Hive, dbName: String, d: Database): Unit = {
-    recordHiveCall()
-    hive.alterDatabase(dbName, d)
-  }
-
-  override def getDatabase(hive: Hive, dbName: String): Database = {
-    recordHiveCall()
-    hive.getDatabase(dbName)
-  }
-
-  override def getAllDatabases(hive: Hive): Seq[String] = {
-    recordHiveCall()
-    hive.getAllDatabases.asScala.toSeq
-  }
-
-  override def getDatabasesByPattern(hive: Hive, pattern: String): Seq[String] 
= {
-    recordHiveCall()
-    hive.getDatabasesByPattern(pattern).asScala.toSeq
-  }
-
-  override def databaseExists(hive: Hive, dbName: String): Boolean = {
-    recordHiveCall()
-    hive.databaseExists(dbName)
-  }
-
-  override def createTable(hive: Hive, table: Table, ifNotExists: Boolean): 
Unit = {
-    recordHiveCall()
-    hive.createTable(table, ifNotExists)
-  }
-
-  override def getTable(
-      hive: Hive,
-      dbName: String,
-      tableName: String,
-      throwException: Boolean): Table = {
-    recordHiveCall()
-    val table = hive.getTable(dbName, tableName, throwException)
-    if (table != null) {
-      table.getTTable.setTableName(tableName)
-      table.getTTable.setDbName(dbName)
-    }
-    table
-  }
-
-  override def getTablesByPattern(hive: Hive, dbName: String, pattern: 
String): Seq[String] = {
-    recordHiveCall()
-    hive.getTablesByPattern(dbName, pattern).asScala.toSeq
-  }
-
-  override def getAllTables(hive: Hive, dbName: String): Seq[String] = {
-    recordHiveCall()
-    hive.getAllTables(dbName).asScala.toSeq
-  }
-
-  override def dropTable(hive: Hive, dbName: String, tableName: String): Unit 
= {
-    recordHiveCall()
-    hive.dropTable(dbName, tableName)
-  }
-
-  override def getPartition(
-      hive: Hive,
-      table: Table,
-      partSpec: JMap[String, String],
-      forceCreate: Boolean): Partition = {
-    recordHiveCall()
-    hive.getPartition(table, partSpec, forceCreate)
-  }
-
-  override def getPartitions(
-      hive: Hive,
-      table: Table,
-      partSpec: JMap[String, String]): Seq[Partition] = {
-    recordHiveCall()
-    hive.getPartitions(table, partSpec).asScala.toSeq
-  }
-
-  override def getPartitionNames(
-      hive: Hive,
-      dbName: String,
-      tableName: String,
-      max: Short): Seq[String] = {
-    recordHiveCall()
-    hive.getPartitionNames(dbName, tableName, max).asScala.toSeq
-  }
-
-  override def getPartitionNames(
-      hive: Hive,
-      dbName: String,
-      tableName: String,
-      partSpec: JMap[String, String],
-      max: Short): Seq[String] = {
-    recordHiveCall()
-    hive.getPartitionNames(dbName, tableName, partSpec, max).asScala.toSeq
-  }
-
-  override def renamePartition(
-      hive: Hive,
-      table: Table,
-      oldPartSpec: JMap[String, String],
-      newPart: Partition): Unit = {
-    recordHiveCall()
-    hive.renamePartition(table, oldPartSpec, newPart)
-  }
-
-  override def getIndexes(
-      hive: Hive,
-      dbName: String,
-      tableName: String,
-      max: Short): Seq[Index] = {
-    recordHiveCall()
-    hive.getIndexes(dbName, tableName, max).asScala.toSeq
-  }
-}
-
-private class Shim_v0_13 extends Shim_v0_12 {
-
-  private lazy val setCurrentSessionStateMethod =
-    findStaticMethod(
-      classOf[SessionState],
-      "setCurrentSessionState",
-      classOf[SessionState])
-  private lazy val setDataLocationMethod =
-    findMethod(
-      classOf[Table],
-      "setDataLocation",
-      classOf[Path])
-  private lazy val getAllPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "getAllPartitionsOf",
-      classOf[Table])
-  private lazy val getPartitionsByFilterMethod =
-    findMethod(
-      classOf[Hive],
-      "getPartitionsByFilter",
-      classOf[Table],
-      classOf[String])
-  private lazy val getCommandProcessorMethod =
-    findStaticMethod(
-      classOf[CommandProcessorFactory],
-      "get",
-      classOf[Array[String]],
-      classOf[HiveConf])
-  private lazy val getDriverResultsMethod =
-    findMethod(
-      classOf[Driver],
-      "getResults",
-      classOf[JList[Object]])
-
-  private lazy val getDatabaseOwnerNameMethod =
-    findMethod(
-      classOf[Database],
-      "getOwnerName")
-
-  private lazy val setDatabaseOwnerNameMethod =
-    findMethod(
-      classOf[Database],
-      "setOwnerName",
-      classOf[String])
-
-  override def setCurrentSessionState(state: SessionState): Unit =
-    setCurrentSessionStateMethod.invoke(null, state)
-
-  override def setDataLocation(table: Table, loc: String): Unit =
-    setDataLocationMethod.invoke(table, new Path(loc))
-
-  override def createPartitions(
-      hive: Hive,
-      db: String,
-      table: String,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
-    parts.zipWithIndex.foreach { case (s, i) =>
-      addPartitionDesc.addPartition(
-        s.spec.asJava, 
s.storage.locationUri.map(CatalogUtils.URIToString(_)).orNull)
-      if (s.parameters.nonEmpty) {
-        addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
-      }
-    }
-    recordHiveCall()
-    hive.createPartitions(addPartitionDesc)
-  }
-
-  override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = {
-    recordHiveCall()
-    getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]].asScala.toSeq
-  }
-
-  private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
-    val resourceUris = f.resources.map { resource =>
-      new ResourceUri(ResourceType.valueOf(
-        resource.resourceType.resourceType.toUpperCase(Locale.ROOT)), 
resource.uri)
-    }
-    new HiveFunction(
-      f.identifier.funcName,
-      db,
-      f.className,
-      null,
-      PrincipalType.USER,
-      TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
-      FunctionType.JAVA,
-      resourceUris.asJava)
-  }
-
-  override def createFunction(hive: Hive, db: String, func: CatalogFunction): 
Unit = {
-    recordHiveCall()
-    hive.createFunction(toHiveFunction(func, db))
-  }
-
-  override def dropFunction(hive: Hive, db: String, name: String): Unit = {
-    recordHiveCall()
-    hive.dropFunction(db, name)
-  }
-
-  override def renameFunction(hive: Hive, db: String, oldName: String, 
newName: String): Unit = {
-    val catalogFunc = getFunctionOption(hive, db, oldName)
-      .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
-      .copy(identifier = FunctionIdentifier(newName, Some(db)))
-    val hiveFunc = toHiveFunction(catalogFunc, db)
-    recordHiveCall()
-    hive.alterFunction(db, oldName, hiveFunc)
-  }
-
-  override def alterFunction(hive: Hive, db: String, func: CatalogFunction): 
Unit = {
-    recordHiveCall()
-    hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
-  }
-
-  private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
-    val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
-    val resources = hf.getResourceUris.asScala.map { uri =>
-      val resourceType = uri.getResourceType() match {
-        case ResourceType.ARCHIVE => "archive"
-        case ResourceType.FILE => "file"
-        case ResourceType.JAR => "jar"
-        case r => throw 
QueryCompilationErrors.unknownHiveResourceTypeError(r.toString)
-      }
-      FunctionResource(FunctionResourceType.fromString(resourceType), 
uri.getUri())
-    }
-    CatalogFunction(name, hf.getClassName, resources.toSeq)
+  private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+    val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
+    val resources = hf.getResourceUris.asScala.map { uri =>
+      val resourceType = uri.getResourceType match {
+        case ResourceType.ARCHIVE => "archive"
+        case ResourceType.FILE => "file"
+        case ResourceType.JAR => "jar"
+        case r => throw 
QueryCompilationErrors.unknownHiveResourceTypeError(r.toString)
+      }
+      FunctionResource(FunctionResourceType.fromString(resourceType), 
uri.getUri)
+    }
+    CatalogFunction(name, hf.getClassName, resources.toSeq)
   }
 
   override def getFunctionOption(hive: Hive, db: String, name: String): 
Option[CatalogFunction] = {
@@ -900,10 +780,10 @@ private class Shim_v0_13 extends Shim_v0_12 {
         // pushing down these predicates, then this optimization will become 
incorrect and need
         // to be changed.
         val extractables = exprs
-            .filter {
-              case Literal(null, _) => false
-              case _ => true
-            }.map(ExtractableLiteral.unapply)
+          .filter {
+            case Literal(null, _) => false
+            case _ => true
+          }.map(ExtractableLiteral.unapply)
         if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
           Some(extractables.map(_.get))
         } else {
@@ -1006,7 +886,7 @@ private class Shim_v0_13 extends Shim_v0_12 {
         Some(convertInToOr(name, values))
 
       case Not(In(ExtractAttribute(SupportedAttribute(name)), 
ExtractableLiterals(values)))
-        if useAdvanced =>
+          if useAdvanced =>
         Some(convertNotInToAnd(name, values))
 
       case InSet(child, values) if useAdvanced && values.size > inSetThreshold 
=>
@@ -1030,7 +910,7 @@ private class Shim_v0_13 extends Shim_v0_12 {
         Some(convertInToOr(name, values))
 
       case Not(InSet(ExtractAttribute(SupportedAttribute(name)), 
ExtractableValues(values)))
-        if useAdvanced =>
+          if useAdvanced =>
         Some(convertNotInToAnd(name, values))
 
       case op @ SpecialBinaryComparison(
@@ -1088,421 +968,141 @@ private class Shim_v0_13 extends Shim_v0_12 {
     }
   }
 
-  override def getPartitionsByFilter(
-      hive: Hive,
-      table: Table,
-      predicates: Seq[Expression],
-      catalogTable: CatalogTable): Seq[Partition] = {
-    // Hive getPartitionsByFilter() takes a string that represents partition
-    // predicates like "str_key=\"value\" and int_key=1 ..."
-    val filter = convertFilters(table, predicates)
+  override def getDatabaseOwnerName(db: Database): String = {
+    
Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("")
+  }
 
-    val partitions =
-      if (filter.isEmpty) {
-        prunePartitionsFastFallback(hive, table, catalogTable, predicates)
-      } else {
-        logDebug(s"Hive metastore filter is '$filter'.")
-        val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-        val shouldFallback = 
SQLConf.get.metastorePartitionPruningFallbackOnException
-        try {
-          // Hive may throw an exception when calling this method in some 
circumstances, such as
-          // when filtering on a non-string partition column when the hive 
config key
-          // hive.metastore.try.direct.sql is false. In some cases the remote 
metastore will throw
-          // exceptions even if the config is true, due to various reasons 
including the
-          // underlying RDBMS, Hive bugs when generating the filter, etc.
-          //
-          // Because of the above we'll fallback to use 
`Hive.getAllPartitionsOf` when the exception
-          // occurs and the 
config`spark.sql.hive.metastorePartitionPruningFallbackOnException` is
-          // enabled.
-          recordHiveCall()
-          getPartitionsByFilterMethod.invoke(hive, table, filter)
-            .asInstanceOf[JArrayList[Partition]]
-        } catch {
-          case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-              shouldFallback =>
-            logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
-              "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-              "degrade performance. Modifying your Hive metastore 
configuration to set " +
-              s"${tryDirectSqlConfVar.varname} to true (if it is not true 
already) may resolve " +
-              "this problem. Or you can enable " +
-              s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key} 
" +
-              "to alleviate performance downgrade. " +
-              "Otherwise, to avoid degraded performance you can set " +
-              
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
-              " to false and let the query fail instead.", ex)
-            // HiveShim clients are expected to handle a superset of the 
requested partitions
-            prunePartitionsFastFallback(hive, table, catalogTable, predicates)
-          case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
-            throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
-        }
-      }
+  override def setDatabaseOwnerName(db: Database, owner: String): Unit = {
+    setDatabaseOwnerNameMethod.invoke(db, owner)
+  }
 
-    partitions.asScala.toSeq
+  override def createDatabase(hive: Hive, db: Database, ignoreIfExists: 
Boolean): Unit = {
+    recordHiveCall()
+    hive.createDatabase(db, ignoreIfExists)
   }
 
-  private def prunePartitionsFastFallback(
+  override def dropDatabase(
       hive: Hive,
-      table: Table,
-      catalogTable: CatalogTable,
-      predicates: Seq[Expression]): java.util.Collection[Partition] = {
-    val timeZoneId = SQLConf.get.sessionLocalTimeZone
-
-    // Because there is no way to know whether the partition properties has 
timeZone,
-    // client-side filtering cannot be used with TimeZoneAwareExpression.
-    def hasTimeZoneAwareExpression(e: Expression): Boolean = {
-      e.exists {
-        case cast: Cast => cast.needsTimeZone
-        case tz: TimeZoneAwareExpression => !tz.isInstanceOf[Cast]
-        case _ => false
-      }
-    }
-
-    if (!SQLConf.get.metastorePartitionPruningFastFallback ||
-        predicates.isEmpty ||
-        predicates.exists(hasTimeZoneAwareExpression)) {
-      recordHiveCall()
-      getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
-    } else {
-      try {
-        val partitionSchema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
-          catalogTable.partitionSchema)
-        val boundPredicate = 
ExternalCatalogUtils.generatePartitionPredicateByFilter(
-          catalogTable, partitionSchema, predicates)
-
-        def toRow(spec: TablePartitionSpec): InternalRow = {
-          InternalRow.fromSeq(partitionSchema.map { field =>
-            val partValue = if (spec(field.name) == 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
-              null
-            } else {
-              spec(field.name)
-            }
-            Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
-          })
-        }
-
-        recordHiveCall()
-        val allPartitionNames = hive.getPartitionNames(
-          table.getDbName, table.getTableName, -1).asScala
-        val partNames = allPartitionNames.filter { p =>
-          val spec = PartitioningUtils.parsePathFragment(p)
-          boundPredicate.eval(toRow(spec))
-        }
-        recordHiveCall()
-        hive.getPartitionsByNames(table, partNames.asJava)
-      } catch {
-        case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
-          logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
-            "filter from client side. Falling back to fetching all partition 
metadata", ex)
-          recordHiveCall()
-          getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
-      }
-    }
+      dbName: String,
+      deleteData: Boolean,
+      ignoreUnknownDb: Boolean,
+      cascade: Boolean): Unit = {
+    recordHiveCall()
+    hive.dropDatabase(dbName, deleteData, ignoreUnknownDb, cascade)
   }
 
-  override def getCommandProcessor(token: String, conf: HiveConf): 
CommandProcessor =
-    getCommandProcessorMethod.invoke(null, Array(token), 
conf).asInstanceOf[CommandProcessor]
-
-  override def getDriverResults(driver: Driver): Seq[String] = {
-    val res = new JArrayList[Object]()
-    getDriverResultsMethod.invoke(driver, res)
-    res.asScala.map { r =>
-      r match {
-        case s: String => s
-        case a: Array[Object] => a(0).asInstanceOf[String]
-      }
-    }.toSeq
+  override def alterDatabase(hive: Hive, dbName: String, d: Database): Unit = {
+    recordHiveCall()
+    hive.alterDatabase(dbName, d)
   }
 
-  override def getDatabaseOwnerName(db: Database): String = {
-    
Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("")
+  override def getDatabase(hive: Hive, dbName: String): Database = {
+    recordHiveCall()
+    hive.getDatabase(dbName)
   }
 
-  override def setDatabaseOwnerName(db: Database, owner: String): Unit = {
-    setDatabaseOwnerNameMethod.invoke(db, owner)
+  override def getAllDatabases(hive: Hive): Seq[String] = {
+    recordHiveCall()
+    hive.getAllDatabases.asScala.toSeq
   }
-}
-
-private class Shim_v0_14 extends Shim_v0_13 {
-
-  // true if this is an ACID operation
-  protected lazy val isAcid = JBoolean.FALSE
-  // true if list bucketing enabled
-  protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
-
-  private lazy val loadPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "loadPartition",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadTableMethod =
-    findMethod(
-      classOf[Hive],
-      "loadTable",
-      classOf[Path],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val dropTableMethod =
-    findMethod(
-      classOf[Hive],
-      "dropTable",
-      classOf[String],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val getTimeVarMethod =
-    findMethod(
-      classOf[HiveConf],
-      "getTimeVar",
-      classOf[HiveConf.ConfVars],
-      classOf[TimeUnit])
 
-  override def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean,
-      isSrcLocal: Boolean): Unit = {
+  override def getDatabasesByPattern(hive: Hive, pattern: String): Seq[String] 
= {
     recordHiveCall()
-    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
-      holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: 
JBoolean,
-      isSrcLocal: JBoolean, isAcid)
+    hive.getDatabasesByPattern(pattern).asScala.toSeq
   }
 
-  override def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      isSrcLocal: Boolean): Unit = {
+  override def databaseExists(hive: Hive, dbName: String): Boolean = {
     recordHiveCall()
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
holdDDLTime,
-      isSrcLocal: JBoolean, isSkewedStoreAsSubdir, isAcid)
+    hive.databaseExists(dbName)
   }
 
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      listBucketingEnabled: Boolean): Unit = {
+  override def createTable(hive: Hive, table: Table, ifNotExists: Boolean): 
Unit = {
     recordHiveCall()
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid)
+    hive.createTable(table, ifNotExists)
   }
 
-  override def dropTable(
+  override def getTable(
       hive: Hive,
       dbName: String,
       tableName: String,
-      deleteData: Boolean,
-      ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = {
+      throwException: Boolean): Table = {
     recordHiveCall()
-    dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
-      ignoreIfNotExists: JBoolean, purge: JBoolean)
+    val table = hive.getTable(dbName, tableName, throwException)
+    if (table != null) {
+      table.getTTable.setTableName(tableName)
+      table.getTTable.setDbName(dbName)
+    }
+    table
   }
 
-  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
-    getTimeVarMethod.invoke(
-      conf,
-      HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
-      TimeUnit.MILLISECONDS).asInstanceOf[Long]
+  override def getTablesByPattern(hive: Hive, dbName: String, pattern: 
String): Seq[String] = {
+    recordHiveCall()
+    hive.getTablesByPattern(dbName, pattern).asScala.toSeq
   }
 
-}
-
-private class Shim_v1_0 extends Shim_v0_14
-
-private class Shim_v1_1 extends Shim_v1_0 {
-
-  // throws an exception if the index does not exist
-  protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
-
-  private lazy val dropIndexMethod =
-    findMethod(
-      classOf[Hive],
-      "dropIndex",
-      classOf[String],
-      classOf[String],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-
-  override def dropIndex(hive: Hive, dbName: String, tableName: String, 
indexName: String): Unit = {
+  override def getAllTables(hive: Hive, dbName: String): Seq[String] = {
     recordHiveCall()
-    dropIndexMethod.invoke(hive, dbName, tableName, indexName, 
throwExceptionInDropIndex,
-      deleteDataInDropIndex)
+    hive.getAllTables(dbName).asScala.toSeq
   }
 
-}
-
-private class Shim_v1_2 extends Shim_v1_1 {
-
-  // txnId can be 0 unless isAcid == true
-  protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
-
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JLong.TYPE)
-
-  private lazy val dropOptionsClass =
-      
Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
-  private lazy val dropOptionsDeleteData = 
dropOptionsClass.getField("deleteData")
-  private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
-  private lazy val dropPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "dropPartition",
-      classOf[String],
-      classOf[String],
-      classOf[JList[String]],
-      dropOptionsClass)
+  override def dropTable(hive: Hive, dbName: String, tableName: String): Unit 
= {
+    recordHiveCall()
+    hive.dropTable(dbName, tableName)
+  }
 
-  override def loadDynamicPartitions(
+  override def getPartition(
       hive: Hive,
-      loadPath: Path,
-      tableName: String,
+      table: Table,
       partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      listBucketingEnabled: Boolean): Unit = {
+      forceCreate: Boolean): Partition = {
     recordHiveCall()
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid,
-      txnIdInLoadDynamicPartitions)
+    hive.getPartition(table, partSpec, forceCreate)
   }
 
-  override def dropPartition(
+  override def getPartitions(
+      hive: Hive,
+      table: Table,
+      partSpec: JMap[String, String]): Seq[Partition] = {
+    recordHiveCall()
+    hive.getPartitions(table, partSpec).asScala.toSeq
+  }
+
+  override def getPartitionNames(
       hive: Hive,
       dbName: String,
       tableName: String,
-      part: JList[String],
-      deleteData: Boolean,
-      purge: Boolean): Unit = {
-    val dropOptions = 
dropOptionsClass.getConstructor().newInstance().asInstanceOf[Object]
-    dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
-    dropOptionsPurge.setBoolean(dropOptions, purge)
+      max: Short): Seq[String] = {
     recordHiveCall()
-    dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
+    hive.getPartitionNames(dbName, tableName, max).asScala.toSeq
   }
 
-}
-
-private[client] class Shim_v2_0 extends Shim_v1_2 {
-  private lazy val loadPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "loadPartition",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadTableMethod =
-    findMethod(
-      classOf[Hive],
-      "loadTable",
-      classOf[Path],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JLong.TYPE)
-
-  override def loadPartition(
+  override def getPartitionNames(
       hive: Hive,
-      loadPath: Path,
+      dbName: String,
       tableName: String,
       partSpec: JMap[String, String],
-      replace: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean,
-      isSrcLocal: Boolean): Unit = {
+      max: Short): Seq[String] = {
     recordHiveCall()
-    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
-      inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
-      isSrcLocal: JBoolean, isAcid)
+    hive.getPartitionNames(dbName, tableName, partSpec, max).asScala.toSeq
   }
 
-  override def loadTable(
+  override def renamePartition(
       hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      isSrcLocal: Boolean): Unit = {
+      table: Table,
+      oldPartSpec: JMap[String, String],
+      newPart: Partition): Unit = {
     recordHiveCall()
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
isSrcLocal: JBoolean,
-      isSkewedStoreAsSubdir, isAcid)
+    hive.renamePartition(table, oldPartSpec, newPart)
   }
 
-  override def loadDynamicPartitions(
+  override def getIndexes(
       hive: Hive,
-      loadPath: Path,
+      dbName: String,
       tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      listBucketingEnabled: Boolean): Unit = {
+      max: Short): Seq[Index] = {
     recordHiveCall()
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, 
replace: JBoolean,
-      numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, 
txnIdInLoadDynamicPartitions)
+    hive.getIndexes(dbName, tableName, max).asScala.toSeq
   }
-
 }
 
 private[client] class Shim_v2_1 extends Shim_v2_0 {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
index 627206c583e8..da4f193101a6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String
  * metastore
  */
 class FiltersSuite extends SparkFunSuite with PlanTest {
-  private val shim = new Shim_v0_13
+  private val shim = new Shim_v2_0
 
   private val testTable = new 
org.apache.hadoop.hive.ql.metadata.Table("default", "test")
   private val varCharCol = new FieldSchema()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to