Repository: spark
Updated Branches:
  refs/heads/branch-2.0 86acb5efd -> beda3938c


[SPARK-15160][SQL] support data source table in InMemoryCatalog

## What changes were proposed in this pull request?

This PR adds a new rule to convert `SimpleCatalogRelation` to data source table 
if its table property contains data source information.

## How was this patch tested?

new test in SQLQuerySuite

Author: Wenchen Fan <[email protected]>

Closes #12935 from cloud-fan/ds-table.

(cherry picked from commit 46991448aa6f78f413a761059d7d7bb586f9d63e)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/beda3938
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/beda3938
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/beda3938

Branch: refs/heads/branch-2.0
Commit: beda3938c2901de81a1df9ed802b136b7abe29f4
Parents: 86acb5e
Author: Wenchen Fan <[email protected]>
Authored: Wed May 11 23:55:42 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Wed May 11 23:55:58 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  2 +
 .../command/createDataSourceTables.scala        |  4 +-
 .../spark/sql/execution/command/ddl.scala       | 76 ++++++++------------
 .../spark/sql/execution/command/tables.scala    | 27 ++++---
 .../datasources/DataSourceStrategy.scala        | 47 +++++++++++-
 .../spark/sql/internal/SessionState.scala       |  7 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 16 +++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +
 8 files changed, 114 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index fc2068c..d215655 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -185,6 +185,8 @@ case class SimpleCatalogRelation(
 
   override def catalogTable: CatalogTable = metadata
 
+  override lazy val resolved: Boolean = false
+
   override val output: Seq[Attribute] = {
     val cols = catalogTable.schema
       .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index de3c868..7d3c5257 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, 
CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.HiveSerDe
@@ -200,6 +200,8 @@ case class CreateDataSourceTableAsSelectCommand(
                     s"doesn't match the data schema[${query.schema}]'s")
               }
               existingSchema = Some(l.schema)
+            case s: SimpleCatalogRelation if 
DDLUtils.isDatasourceTable(s.metadata) =>
+              existingSchema = 
DDLUtils.getSchemaFromTableProperties(s.metadata)
             case o =>
               throw new AnalysisException(s"Saving data in ${o.toString} is 
not supported.")
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0b0b618..1c1716f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
 
@@ -492,33 +493,28 @@ private[sql] object DDLUtils {
       table.properties.contains("spark.sql.sources.schema.numPartCols")
   }
 
-  def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] 
= {
-    getSchemaFromTableProperties(metadata.properties)
-  }
-
   // A persisted data source table may not store its schema in the catalog. In 
this case, its schema
   // will be inferred at runtime when the table is referenced.
-  def getSchemaFromTableProperties(props: Map[String, String]): 
Option[StructType] = {
-    require(isDatasourceTable(props))
+  def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] 
= {
+    require(isDatasourceTable(metadata))
 
-    val schemaParts = for {
-      numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
-      index <- 0 until numParts.toInt
-    } yield props.getOrElse(
-      s"spark.sql.sources.schema.part.$index",
-      throw new AnalysisException(
-        s"Corrupted schema in catalog: $numParts parts expected, but part 
$index is missing."
-      )
-    )
+    metadata.properties.get("spark.sql.sources.schema.numParts").map { 
numParts =>
+      val parts = (0 until numParts.toInt).map { index =>
+        val part = 
metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+        if (part == null) {
+          throw new AnalysisException(
+            "Could not read schema from the metastore because it is corrupted 
" +
+              s"(missing part $index of the schema, $numParts parts are 
expected).")
+        }
 
-    if (schemaParts.isEmpty) {
-      None
-    } else {
-      Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType])
+        part
+      }
+      // Stick all parts back to a single schema string.
+      DataType.fromJson(parts.mkString).asInstanceOf[StructType]
     }
   }
 
-  private def getColumnNamesByTypeFromTableProperties(
+  private def getColumnNamesByType(
       props: Map[String, String], colType: String, typeName: String): 
Seq[String] = {
     require(isDatasourceTable(props))
 
@@ -534,35 +530,19 @@ private[sql] object DDLUtils {
   }
 
   def getPartitionColumnsFromTableProperties(metadata: CatalogTable): 
Seq[String] = {
-    getPartitionColumnsFromTableProperties(metadata.properties)
-  }
-
-  def getPartitionColumnsFromTableProperties(props: Map[String, String]): 
Seq[String] = {
-    getColumnNamesByTypeFromTableProperties(props, "part", "partitioning 
columns")
+    getColumnNamesByType(metadata.properties, "part", "partitioning columns")
   }
 
-  def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = {
-    getNumBucketFromTableProperties(metadata.properties)
-  }
-
-  def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] 
= {
-    require(isDatasourceTable(props))
-    props.get("spark.sql.sources.schema.numBuckets").map(_.toInt)
-  }
-
-  def getBucketingColumnsFromTableProperties(metadata: CatalogTable): 
Seq[String] = {
-    getBucketingColumnsFromTableProperties(metadata.properties)
-  }
-
-  def getBucketingColumnsFromTableProperties(props: Map[String, String]): 
Seq[String] = {
-    getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing 
columns")
-  }
-
-  def getSortingColumnsFromTableProperties(metadata: CatalogTable): 
Seq[String] = {
-    getSortingColumnsFromTableProperties(metadata.properties)
-  }
-
-  def getSortingColumnsFromTableProperties(props: Map[String, String]): 
Seq[String] = {
-    getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns")
+  def getBucketSpecFromTableProperties(metadata: CatalogTable): 
Option[BucketSpec] = {
+    if (isDatasourceTable(metadata)) {
+      metadata.properties.get("spark.sql.sources.schema.numBuckets").map { 
numBuckets =>
+        BucketSpec(
+          numBuckets.toInt,
+          getColumnNamesByType(metadata.properties, "bucket", "bucketing 
columns"),
+          getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
+      }
+    } else {
+      None
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e6dcd1e..bb4f1ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -385,17 +385,22 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
   }
 
   private def describeBucketingInfo(metadata: CatalogTable, buffer: 
ArrayBuffer[Row]): Unit = {
-    if (DDLUtils.isDatasourceTable(metadata)) {
-      val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata)
-      val bucketCols = 
DDLUtils.getBucketingColumnsFromTableProperties(metadata)
-      val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata)
-      append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), 
"")
-      append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), 
"")
-      append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "")
-    } else {
-      append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
-      append(buffer, "Bucket Columns:", 
metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
-      append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", 
", ", "]"), "")
+    def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], 
sortColumns: Seq[String]) = {
+      append(buffer, "Num Buckets:", numBuckets.toString, "")
+      append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", 
"]"), "")
+      append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
+    }
+
+    DDLUtils.getBucketSpecFromTableProperties(metadata).map { bucketSpec =>
+      appendBucketInfo(
+        bucketSpec.numBuckets,
+        bucketSpec.bucketColumnNames,
+        bucketSpec.sortColumnNames)
+    }.getOrElse {
+      appendBucketInfo(
+        metadata.numBuckets,
+        metadata.bucketColumnNames,
+        metadata.sortColumnNames)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index bc249f4..0494faf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -32,8 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, 
PUSHED_FILTERS}
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, 
DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -77,6 +78,48 @@ private[sql] object DataSourceAnalysis extends 
Rule[LogicalPlan] {
   }
 }
 
+
+/**
+ * Replaces [[SimpleCatalogRelation]] with data source table if its table 
property contains data
+ * source information.
+ */
+private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
+  private def readDataSourceTable(sparkSession: SparkSession, table: 
CatalogTable): LogicalPlan = {
+    val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
+
+    // We only need names at here since userSpecifiedSchema we loaded from the 
metastore
+    // contains partition columns. We can always get datatypes of partitioning 
columns
+    // from userSpecifiedSchema.
+    val partitionColumns = 
DDLUtils.getPartitionColumnsFromTableProperties(table)
+
+    val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table)
+
+    val options = table.storage.serdeProperties
+    val dataSource =
+      DataSource(
+        sparkSession,
+        userSpecifiedSchema = userSpecifiedSchema,
+        partitionColumns = partitionColumns,
+        bucketSpec = bucketSpec,
+        className = table.properties("spark.sql.sources.provider"),
+        options = options)
+
+    LogicalRelation(
+      dataSource.resolveRelation(),
+      metastoreTableIdentifier = Some(table.identifier))
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+        if DDLUtils.isDatasourceTable(s.metadata) =>
+      i.copy(table = readDataSourceTable(sparkSession, s.metadata))
+
+    case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+      readDataSourceTable(sparkSession, s.metadata)
+  }
+}
+
+
 /**
  * A Strategy for planning scans over data sources defined using the sources 
API.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ebff756..f0b8a83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -18,14 +18,10 @@
 package org.apache.spark.sql.internal
 
 import java.io.File
-import java.util.Properties
-
-import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _}
@@ -34,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.AnalyzeTable
-import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, 
PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, 
FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
@@ -114,6 +110,7 @@ private[sql] class SessionState(sparkSession: SparkSession) 
{
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
         PreInsertCastAndRename ::
+        new FindDataSourceTable(sparkSession) ::
         DataSourceAnalysis ::
         (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil 
else Nil)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 4ef4b48..3bbe87a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2480,4 +2480,20 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       Row(null) :: Nil
     )
   }
+
+  test("data source table created in InMemoryCatalog should be able to 
read/write") {
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
+      checkAnswer(sql("SELECT i, j FROM tbl"), Nil)
+
+      Seq(1 -> "a", 2 -> "b").toDF("i", 
"j").write.mode("overwrite").insertInto("tbl")
+      checkAnswer(sql("SELECT i, j FROM tbl"), Row(1, "a") :: Row(2, "b") :: 
Nil)
+
+      Seq(3 -> "c", 4 -> "d").toDF("i", 
"j").write.mode("append").saveAsTable("tbl")
+      checkAnswer(
+        sql("SELECT i, j FROM tbl"),
+        Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/beda3938/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8cfcec7..2f20cde 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -71,6 +71,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         logDebug(s"Creating new cached data source for $in")
         val table = client.getTable(in.database, in.name)
 
+        // TODO: the following code is duplicated with 
FindDataSourceTable.readDataSourceTable
+
         def schemaStringFromParts: Option[String] = {
           table.properties.get("spark.sql.sources.schema.numParts").map { 
numParts =>
             val parts = (0 until numParts.toInt).map { index =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to