Repository: spark
Updated Branches:
  refs/heads/master 6c29b3de7 -> 2fb12b0a3


[SPARK-17903][SQL] MetastoreRelation should talk to external catalog instead of 
hive client

## What changes were proposed in this pull request?

`HiveExternalCatalog` should be the only interface to talk to the hive 
metastore. In `MetastoreRelation` we can just use `ExternalCatalog` instead of 
`HiveClient` to interact with hive metastore,  and add missing API in 
`ExternalCatalog`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #15460 from cloud-fan/relation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fb12b0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fb12b0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fb12b0a

Branch: refs/heads/master
Commit: 2fb12b0a33deeeadfac451095f64dea6c967caac
Parents: 6c29b3d
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Oct 14 15:53:50 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Oct 14 15:53:50 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala   | 13 +++++++++++++
 .../sql/catalyst/catalog/InMemoryCatalog.scala   |  8 ++++++++
 .../spark/sql/hive/HiveExternalCatalog.scala     |  8 ++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala    |  7 +++----
 .../spark/sql/hive/MetastoreRelation.scala       | 19 ++++++++++++-------
 .../org/apache/spark/sql/hive/TableReader.scala  |  3 +--
 .../spark/sql/hive/client/HiveClient.scala       | 15 +++------------
 .../spark/sql/hive/client/HiveClientImpl.scala   | 10 ++++++----
 .../sql/hive/HiveExternalCatalogSuite.scala      |  9 +++++++++
 .../spark/sql/hive/MetastoreRelationSuite.scala  |  2 +-
 .../spark/sql/hive/client/VersionsSuite.scala    |  4 ++--
 11 files changed, 66 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index dd93b46..348d3d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
NoSuchDatabaseException, NoSuchFunctionException}
+import org.apache.spark.sql.catalyst.expressions.Expression
 
 
 /**
@@ -196,6 +197,18 @@ abstract class ExternalCatalog {
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): 
Seq[CatalogTablePartition]
 
+  /**
+   * List the metadata of selected partitions according to the given partition 
predicates.
+   *
+   * @param db database name
+   * @param table table name
+   * @param predicates partition predicated
+   */
+  def listPartitionsByFilter(
+      db: String,
+      table: String,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition]
+
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 3e31127..49280f8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.util.StringUtils
 
 /**
@@ -477,6 +478,13 @@ class InMemoryCatalog(
     catalog(db).tables(table).partitions.values.toSeq
   }
 
+  override def listPartitionsByFilter(
+      db: String,
+      table: String,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+    throw new UnsupportedOperationException("listPartitionsByFilter is not 
implemented.")
+  }
+
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 237b829..b5d93c3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
 import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
@@ -646,6 +647,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     client.getPartitions(db, table, partialSpec)
   }
 
+  override def listPartitionsByFilter(
+      db: String,
+      table: String,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+    client.getPartitionsByFilter(db, table, predicates)
+  }
+
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8410a2e..c44f0ad 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -44,8 +44,6 @@ import org.apache.spark.sql.types._
  */
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends 
Logging {
   private val sessionState = 
sparkSession.sessionState.asInstanceOf[HiveSessionState]
-  private val client =
-    
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
 
   /** A fully qualified identifier for a table (i.e., database.tableName) */
   case class QualifiedTableName(database: String, name: String)
@@ -104,7 +102,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-    new Path(new Path(client.getDatabase(dbName).locationUri), 
tblName).toString
+    val dbLocation = 
sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
+    new Path(new Path(dbLocation), tblName).toString
   }
 
   def lookupRelation(
@@ -129,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     } else {
       val qualifiedTable =
         MetastoreRelation(
-          qualifiedTableName.database, qualifiedTableName.name)(table, client, 
sparkSession)
+          qualifiedTableName.database, qualifiedTableName.name)(table, 
sparkSession)
       alias.map(a => SubqueryAlias(a, qualifiedTable, 
None)).getOrElse(qualifiedTable)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 33f0ecf..da809cf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -43,7 +43,6 @@ private[hive] case class MetastoreRelation(
     databaseName: String,
     tableName: String)
     (val catalogTable: CatalogTable,
-     @transient private val client: HiveClient,
      @transient private val sparkSession: SparkSession)
   extends LeafNode with MultiInstanceRelation with FileRelation with 
CatalogRelation {
 
@@ -59,7 +58,7 @@ private[hive] case class MetastoreRelation(
     Objects.hashCode(databaseName, tableName, output)
   }
 
-  override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: client 
:: sparkSession :: Nil
+  override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: 
sparkSession :: Nil
 
   private def toHiveColumn(c: StructField): FieldSchema = {
     new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
@@ -146,11 +145,18 @@ private[hive] case class MetastoreRelation(
 
   // When metastore partition pruning is turned off, we cache the list of all 
partitions to
   // mimic the behavior of Spark < 1.5
-  private lazy val allPartitions: Seq[CatalogTablePartition] = 
client.getPartitions(catalogTable)
+  private lazy val allPartitions: Seq[CatalogTablePartition] = {
+    sparkSession.sharedState.externalCatalog.listPartitions(
+      catalogTable.database,
+      catalogTable.identifier.table)
+  }
 
   def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = 
{
     val rawPartitions = if 
(sparkSession.sessionState.conf.metastorePartitionPruning) {
-      client.getPartitionsByFilter(catalogTable, predicates)
+      sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
+        catalogTable.database,
+        catalogTable.identifier.table,
+        predicates)
     } else {
       allPartitions
     }
@@ -234,8 +240,7 @@ private[hive] case class MetastoreRelation(
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def inputFiles: Array[String] = {
-    val partLocations = client
-      .getPartitionsByFilter(catalogTable, Nil)
+    val partLocations = allPartitions
       .flatMap(_.storage.locationUri)
       .toArray
     if (partLocations.nonEmpty) {
@@ -248,6 +253,6 @@ private[hive] case class MetastoreRelation(
   }
 
   override def newInstance(): MetastoreRelation = {
-    MetastoreRelation(databaseName, tableName)(catalogTable, client, 
sparkSession)
+    MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 2a54163..aaf30f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -149,8 +149,7 @@ class HadoopTableReader(
    *     subdirectory of each partition being read. If None, then all files 
are accepted.
    */
   def makeRDDForPartitionedTable(
-      partitionToDeserializer: Map[HivePartition,
-      Class[_ <: Deserializer]],
+      partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
       filterOpt: Option[PathFilter]): RDD[InternalRow] = {
 
     // SPARK-5068:get FileStatus and do the filtering locally when the path is 
not exists

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 984d23b..9ee3d62 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -172,24 +172,15 @@ private[hive] trait HiveClient {
    * Returns the partitions for the given table that match the supplied 
partition spec.
    * If no partition spec is specified, all partitions are returned.
    */
-  final def getPartitions(
+  def getPartitions(
       db: String,
       table: String,
-      partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
-    getPartitions(getTable(db, table), partialSpec)
-  }
-
-  /**
-   * Returns the partitions for the given table that match the supplied 
partition spec.
-   * If no partition spec is specified, all partitions are returned.
-   */
-  def getPartitions(
-      table: CatalogTable,
       partialSpec: Option[TablePartitionSpec] = None): 
Seq[CatalogTablePartition]
 
   /** Returns partitions filtered by predicates for the given table. */
   def getPartitionsByFilter(
-      table: CatalogTable,
+      db: String,
+      table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition]
 
   /** Loads a static partition into an existing table. */

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index dd33d75..5c8f7ff 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -525,9 +525,10 @@ private[hive] class HiveClientImpl(
    * If no partition spec is specified, all partitions are returned.
    */
   override def getPartitions(
-      table: CatalogTable,
+      db: String,
+      table: String,
       spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = 
withHiveState {
-    val hiveTable = toHiveTable(table)
+    val hiveTable = toHiveTable(getTable(db, table))
     spec match {
       case None => shim.getAllPartitions(client, 
hiveTable).map(fromHivePartition)
       case Some(s) => client.getPartitions(hiveTable, 
s.asJava).asScala.map(fromHivePartition)
@@ -535,9 +536,10 @@ private[hive] class HiveClientImpl(
   }
 
   override def getPartitionsByFilter(
-      table: CatalogTable,
+      db: String,
+      table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState 
{
-    val hiveTable = toHiveTable(table)
+    val hiveTable = toHiveTable(getTable(db, table))
     shim.getPartitionsByFilter(client, hiveTable, 
predicates).map(fromHivePartition)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 26c2549..efa0beb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.dsl.expressions._
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
@@ -43,4 +44,12 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     externalCatalog.client.reset()
   }
 
+  import utils._
+
+  test("list partitions by filter") {
+    val catalog = newBasicCatalog()
+    val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", 
Seq('a.int === 1))
+    assert(selectedPartitions.length == 1)
+    assert(selectedPartitions.head.spec == part1.spec)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
index 2f3055d..c28e41a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
@@ -29,7 +29,7 @@ class MetastoreRelationSuite extends SparkFunSuite {
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
       schema = StructType(StructField("a", IntegerType, true) :: Nil))
-    val relation = MetastoreRelation("db", "test")(table, null, null)
+    val relation = MetastoreRelation("db", "test")(table, null)
 
     // No exception should be thrown
     relation.makeCopy(Array("db", "test"))

http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9a10957..c158bf1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -295,12 +295,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
     }
 
     test(s"$version: getPartitions(catalogTable)") {
-      assert(2 == client.getPartitions(client.getTable("default", 
"src_part")).size)
+      assert(2 == client.getPartitions("default", "src_part").size)
     }
 
     test(s"$version: getPartitionsByFilter") {
       // Only one partition [1, 1] for key2 == 1
-      val result = client.getPartitionsByFilter(client.getTable("default", 
"src_part"),
+      val result = client.getPartitionsByFilter("default", "src_part",
         Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
 
       // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the 
filter condition.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to