This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3a48ea1 [SPARK-31184][SQL] Support getTablesByType API of Hive Client
3a48ea1 is described below
commit 3a48ea1fe0fb85253f12d86caea01ffcb7e678d0
Author: Eric Wu <[email protected]>
AuthorDate: Sat Mar 21 17:41:23 2020 -0700
[SPARK-31184][SQL] Support getTablesByType API of Hive Client
### What changes were proposed in this pull request?
Hive 2.3+ supports `getTablesByType` API, which will provide an efficient
way to get HiveTable with specific type. Now, we have following mappings when
using `HiveExternalCatalog`.
```
CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE
CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE
CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
```
Without this API, we need to achieve the goal by `getTables` +
`getTablesByName` + `filter with type`.
This PR add `getTablesByType` in `HiveShim`. For those hive versions don't
support this API, `UnsupportedOperationException` will be thrown. And the upper
logic should catch the exception and fallback to the filter solution mentioned
above.
Since the JDK11 related fix in `Hive` is not released yet, manual tests
against hive 2.3.7-SNAPSHOT is done by following the instructions of
SPARK-29245.
### Why are the changes needed?
This API will provide better usability and performance if we want to get a
list of hiveTables with specific type. For example `HiveTableType.VIRTUAL_VIEW`
corresponding to `CatalogTableType.VIEW`.
### Does this PR introduce any user-facing change?
No, this is a support function.
### How was this patch tested?
Add tests in VersionsSuite and manually run JDK11 test with following
settings:
- Hive 2.3.6 Metastore on JDK8
- Hive 2.3.7-SNAPSHOT library build from source of Hive 2.3 branch
- Spark build with Hive 2.3.7-SNAPSHOT on jdk-11.0.6
Closes #27952 from Eric5553/GetTableByType.
Authored-by: Eric Wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/hive/client/HiveClient.scala | 9 +++++
.../spark/sql/hive/client/HiveClientImpl.scala | 42 ++++++++++++++++------
.../apache/spark/sql/hive/client/HiveShim.scala | 35 +++++++++++++++++-
.../spark/sql/hive/client/VersionsSuite.scala | 22 +++++++++---
4 files changed, 92 insertions(+), 16 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index e31dffa..3ea80ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -61,6 +61,15 @@ private[hive] trait HiveClient {
/** Returns the names of tables in the given database that matches the given
pattern. */
def listTables(dbName: String, pattern: String): Seq[String]
+ /**
+ * Returns the names of tables with specific tableType in the given database
that matches
+ * the given pattern.
+ */
+ def listTablesByType(
+ dbName: String,
+ pattern: String,
+ tableType: CatalogTableType): Seq[String]
+
/** Sets the name of current database. */
def setCurrentDatabase(databaseName: String): Unit
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 4a3e813..6ad5e9d 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -755,6 +755,22 @@ private[hive] class HiveClientImpl(
client.getTablesByPattern(dbName, pattern).asScala
}
+ override def listTablesByType(
+ dbName: String,
+ pattern: String,
+ tableType: CatalogTableType): Seq[String] = withHiveState {
+ try {
+ // Try with Hive API getTablesByType first, it's supported from Hive
2.3+.
+ shim.getTablesByType(client, dbName, pattern, toHiveTableType(tableType))
+ } catch {
+ case _: UnsupportedOperationException =>
+ // Fallback to filter logic if getTablesByType not supported.
+ val tableNames = client.getTablesByPattern(dbName, pattern).asScala
+ val tables = getTablesByName(dbName, tableNames).filter(_.tableType ==
tableType)
+ tables.map(_.identifier.table)
+ }
+ }
+
/**
* Runs the specified SQL query using Hive.
*/
@@ -1011,25 +1027,29 @@ private[hive] object HiveClientImpl extends Logging {
private def toOutputFormat(name: String) =
Utils.classForName[org.apache.hadoop.hive.ql.io.HiveOutputFormat[_,
_]](name)
+ def toHiveTableType(catalogTableType: CatalogTableType): HiveTableType = {
+ catalogTableType match {
+ case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE
+ case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE
+ case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
+ case t =>
+ throw new IllegalArgumentException(
+ s"Unknown table type is found at toHiveTableType: $t")
+ }
+ }
+
/**
* Converts the native table metadata representation format CatalogTable to
Hive's Table.
*/
def toHiveTable(table: CatalogTable, userName: Option[String] = None):
HiveTable = {
val hiveTable = new HiveTable(table.database, table.identifier.table)
+ hiveTable.setTableType(toHiveTableType(table.tableType))
// For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table
properties.
// Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
//
(metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
- hiveTable.setTableType(table.tableType match {
- case CatalogTableType.EXTERNAL =>
- hiveTable.setProperty("EXTERNAL", "TRUE")
- HiveTableType.EXTERNAL_TABLE
- case CatalogTableType.MANAGED =>
- HiveTableType.MANAGED_TABLE
- case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
- case t =>
- throw new IllegalArgumentException(
- s"Unknown table type is found at toHiveTable: $t")
- })
+ if (table.tableType == CatalogTableType.EXTERNAL) {
+ hiveTable.setProperty("EXTERNAL", "TRUE")
+ }
// Note: In Hive the schema and partition columns must be disjoint sets
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
table.partitionColumnNames.contains(c.getName)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 50ce536..2b80660 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.IMetaStoreClient
+import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext,
Function => HiveFunction, FunctionType, MetaException, PrincipalType,
ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
@@ -90,6 +91,12 @@ private[client] sealed abstract class Shim {
def alterPartitions(hive: Hive, tableName: String, newParts:
JList[Partition]): Unit
+ def getTablesByType(
+ hive: Hive,
+ dbName: String,
+ pattern: String,
+ tableType: TableType): Seq[String]
+
def createPartitions(
hive: Hive,
db: String,
@@ -363,6 +370,15 @@ private[client] class Shim_v0_12 extends Shim with Logging
{
conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) *
1000L
}
+ override def getTablesByType(
+ hive: Hive,
+ dbName: String,
+ pattern: String,
+ tableType: TableType): Seq[String] = {
+ throw new UnsupportedOperationException("Hive 2.2 and lower versions don't
support " +
+ "getTablesByType. Please use Hive 2.3 or higher version.")
+ }
+
override def loadPartition(
hive: Hive,
loadPath: Path,
@@ -1220,7 +1236,24 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
private[client] class Shim_v2_2 extends Shim_v2_1
-private[client] class Shim_v2_3 extends Shim_v2_1
+private[client] class Shim_v2_3 extends Shim_v2_1 {
+ private lazy val getTablesByTypeMethod =
+ findMethod(
+ classOf[Hive],
+ "getTablesByType",
+ classOf[String],
+ classOf[String],
+ classOf[TableType])
+
+ override def getTablesByType(
+ hive: Hive,
+ dbName: String,
+ pattern: String,
+ tableType: TableType): Seq[String] = {
+ getTablesByTypeMethod.invoke(hive, dbName, pattern, tableType)
+ .asInstanceOf[JList[String]].asScala
+ }
+}
private[client] class Shim_v3_0 extends Shim_v2_3 {
// Spark supports only non-ACID operations
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 7471142..ba75dcf 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -154,10 +154,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
.client.version.fullVersion.startsWith(version))
}
- def table(database: String, tableName: String): CatalogTable = {
+ def table(database: String, tableName: String,
+ tableType: CatalogTableType = CatalogTableType.MANAGED): CatalogTable
= {
CatalogTable(
identifier = TableIdentifier(tableName, Some(database)),
- tableType = CatalogTableType.MANAGED,
+ tableType = tableType,
schema = new StructType().add("key", "int"),
storage = CatalogStorageFormat(
locationUri = None,
@@ -273,7 +274,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: createTable") {
client.createTable(table("default", tableName = "src"), ignoreIfExists =
false)
- client.createTable(table("default", "temporary"), ignoreIfExists = false)
+ client.createTable(table("default", tableName = "temporary"),
ignoreIfExists = false)
+ client.createTable(table("default", tableName = "view1", tableType =
CatalogTableType.VIEW),
+ ignoreIfExists = false)
}
test(s"$version: loadTable") {
@@ -389,7 +392,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: listTables(database)") {
- assert(client.listTables("default") === Seq("src", "temporary"))
+ assert(client.listTables("default") === Seq("src", "temporary", "view1"))
}
test(s"$version: listTables(database, pattern)") {
@@ -397,6 +400,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(client.listTables("default", pattern = "nonexist").isEmpty)
}
+ test(s"$version: listTablesByType(database, pattern, tableType)") {
+ assert(client.listTablesByType("default", pattern = "view1",
+ CatalogTableType.VIEW) === Seq("view1"))
+ assert(client.listTablesByType("default", pattern = "nonexist",
+ CatalogTableType.VIEW).isEmpty)
+ }
+
test(s"$version: dropTable") {
val versionsWithoutPurge =
if (versions.contains("0.14")) versions.takeWhile(_ != "0.14") else Nil
@@ -405,12 +415,16 @@ class VersionsSuite extends SparkFunSuite with Logging {
try {
client.dropTable("default", tableName = "temporary", ignoreIfNotExists
= false,
purge = true)
+ client.dropTable("default", tableName = "view1", ignoreIfNotExists =
false,
+ purge = true)
assert(!versionsWithoutPurge.contains(version))
} catch {
case _: UnsupportedOperationException =>
assert(versionsWithoutPurge.contains(version))
client.dropTable("default", tableName = "temporary",
ignoreIfNotExists = false,
purge = false)
+ client.dropTable("default", tableName = "view1", ignoreIfNotExists =
false,
+ purge = false)
}
assert(client.listTables("default") === Seq("src"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]