Repository: spark
Updated Branches:
  refs/heads/master f405de87c -> 24bea0004


[SPARK-14954] [SQL] Add PARTITION BY and BUCKET BY clause for data source CTAS 
syntax

Currently, we can only create persisted partitioned and/or bucketed data source 
tables using the Dataset API but not using SQL DDL. This PR implements the 
following syntax to add partitioning and bucketing support to the SQL DDL:

```
CREATE TABLE <table-name>
USING <provider> [OPTIONS (<key1> <value1>, <key2> <value2>, ...)]
[PARTITIONED BY (col1, col2, ...)]
[CLUSTERED BY (col1, col2, ...) [SORTED BY (col1, col2, ...)] INTO <n> BUCKETS]
AS SELECT ...
```

Test cases are added in `MetastoreDataSourcesSuite` to check the newly added 
syntax.

Author: Cheng Lian <[email protected]>
Author: Yin Huai <[email protected]>

Closes #12734 from liancheng/spark-14954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24bea000
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24bea000
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24bea000

Branch: refs/heads/master
Commit: 24bea000476cdd0b43be5160a76bc5b170ef0b42
Parents: f405de8
Author: Cheng Lian <[email protected]>
Authored: Wed Apr 27 13:55:07 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Wed Apr 27 13:55:13 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  4 +-
 .../spark/sql/execution/SparkSqlParser.scala    | 12 ++-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 93 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24bea000/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6e04f6e..c356f0c 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -47,7 +47,9 @@ statement
     | createTableHeader ('(' colTypeList ')')? tableProvider
         (OPTIONS tablePropertyList)?                                   
#createTableUsing
     | createTableHeader tableProvider
-        (OPTIONS tablePropertyList)? AS? query                         
#createTableUsing
+        (OPTIONS tablePropertyList)?
+        (PARTITIONED BY partitionColumnNames=identifierList)?
+        bucketSpec? AS? query                                          
#createTableUsing
     | createTableHeader ('(' columns=colTypeList ')')?
         (COMMENT STRING)?
         (PARTITIONED BY '(' partitionColumns=colTypeList ')')?

http://git-wip-us.apache.org/repos/asf/spark/blob/24bea000/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 79fdf9f..e4c837a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -289,6 +289,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
     val options = 
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
+    val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
     if (ctx.query != null) {
       // Get the backing query.
@@ -302,9 +303,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
       } else {
         SaveMode.ErrorIfExists
       }
-      CreateTableUsingAsSelect(table, provider, temp, Array.empty, None, mode, 
options, query)
+
+      val partitionColumnNames =
+        Option(ctx.partitionColumnNames)
+          .map(visitIdentifierList(_).toArray)
+          .getOrElse(Array.empty[String])
+
+      CreateTableUsingAsSelect(
+        table, provider, temp, partitionColumnNames, bucketSpec, mode, 
options, query)
     } else {
-      val struct = Option(ctx.colTypeList).map(createStructType)
+      val struct = Option(ctx.colTypeList()).map(createStructType)
       CreateTableUsing(table, struct, provider, temp, options, ifNotExists, 
managedIfNoPath = false)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/24bea000/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b21ca4f..cb10002 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -940,4 +940,97 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         .schema.forall { c => DataTypeParser.parse(c.dataType) == 
ArrayType(StringType) })
     }
   }
+
+  test("CTAS: persisted partitioned data source table") {
+    withTempDir { dir =>
+      withTable("t") {
+        val path = dir.getCanonicalPath
+
+        sql(
+          s"""CREATE TABLE t USING PARQUET
+              |OPTIONS (PATH '$path')
+              |PARTITIONED BY (a)
+              |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+
+        val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt 
=== 1)
+        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
+        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
+        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+
+        checkAnswer(table("t"), Row(2, 1))
+      }
+    }
+  }
+
+  test("CTAS: persisted bucketed data source table") {
+    withTempDir { dir =>
+      withTable("t") {
+        val path = dir.getCanonicalPath
+
+        sql(
+          s"""CREATE TABLE t USING PARQUET
+              |OPTIONS (PATH '$path')
+              |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
+              |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+
+        val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
+        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt 
=== 2)
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
 === 1)
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt 
=== 1)
+
+        checkAnswer(table("t"), Row(1, 2))
+      }
+
+      withTable("t") {
+        val path = dir.getCanonicalPath
+
+        sql(
+          s"""CREATE TABLE t USING PARQUET
+              |OPTIONS (PATH '$path')
+              |CLUSTERED BY (a) INTO 2 BUCKETS
+              |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+
+        val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
+        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt 
=== 2)
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
 === 1)
+        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+
+        checkAnswer(table("t"), Row(1, 2))
+      }
+    }
+  }
+
+  test("CTAS: persisted partitioned bucketed data source table") {
+    withTempDir { dir =>
+      withTable("t") {
+        val path = dir.getCanonicalPath
+
+        sql(
+          s"""CREATE TABLE t USING PARQUET
+              |OPTIONS (PATH '$path')
+              |PARTITIONED BY (a)
+              |CLUSTERED BY (b) SORTED BY (c) INTO 2 BUCKETS
+              |AS SELECT 1 AS a, 2 AS b, 3 AS c
+           """.stripMargin
+        )
+
+        val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt 
=== 1)
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt 
=== 2)
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
 === 1)
+        
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt 
=== 1)
+
+        checkAnswer(table("t"), Row(2, 3, 1))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to