This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f89cdec [SPARK-26435][SQL] Support creating partitioned table using
Hive CTAS by specifying partition column names
f89cdec is described below
commit f89cdec8b9a9fcc95ba7458869b4ba9d038560f9
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Dec 27 16:03:14 2018 +0800
[SPARK-26435][SQL] Support creating partitioned table using Hive CTAS by
specifying partition column names
## What changes were proposed in this pull request?
Spark SQL doesn't support creating partitioned table using Hive CTAS in SQL
syntax. However it is supported by using DataFrameWriter API.
```scala
val df = Seq(("a", 1)).toDF("part", "id")
df.write.format("hive").partitionBy("part").saveAsTable("t")
```
Hive begins to support this syntax in newer version:
https://issues.apache.org/jira/browse/HIVE-20241:
```
CREATE TABLE t PARTITIONED BY (part) AS SELECT 1 as id, "a" as part
```
This patch adds this support to SQL syntax.
## How was this patch tested?
Added tests.
Closes #23376 from viirya/hive-ctas-partitioned-table.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +-
.../spark/sql/execution/SparkSqlParser.scala | 33 +++++++++------
.../spark/sql/hive/execution/HiveDDLSuite.scala | 48 +++++++++++++++++++++-
.../spark/sql/hive/execution/SQLQuerySuite.scala | 4 +-
4 files changed, 71 insertions(+), 17 deletions(-)
diff --git
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 5e732ed..b39681d 100644
---
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -88,7 +88,8 @@ statement
(AS? query)?
#createTable
| createTableHeader ('(' columns=colTypeList ')')?
((COMMENT comment=STRING) |
- (PARTITIONED BY '(' partitionColumns=colTypeList ')') |
+ (PARTITIONED BY '(' partitionColumns=colTypeList ')' |
+ PARTITIONED BY partitionColumnNames=identifierList) |
bucketSpec |
skewSpec |
rowFormat |
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 364efea..8deb55b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1196,33 +1196,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends
AstBuilder(conf) {
selectQuery match {
case Some(q) =>
- // Hive does not allow to use a CTAS statement to create a partitioned
table.
- if (tableDesc.partitionColumnNames.nonEmpty) {
- val errorMessage = "A Create Table As Select (CTAS) statement is not
allowed to " +
- "create a partitioned table using Hive's file formats. " +
- "Please use the syntax of \"CREATE TABLE tableName USING
dataSource " +
- "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table
through a " +
- "CTAS statement."
- operationNotAllowed(errorMessage, ctx)
- }
-
// Don't allow explicit specification of schema for CTAS.
- if (schema.nonEmpty) {
+ if (dataCols.nonEmpty) {
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS)
statement",
ctx)
}
+ // When creating partitioned table with CTAS statement, we can't
specify data type for the
+ // partition columns.
+ if (partitionCols.nonEmpty) {
+ val errorMessage = "Create Partitioned Table As Select cannot
specify data type for " +
+ "the partition columns of the target table."
+ operationNotAllowed(errorMessage, ctx)
+ }
+
+ // Hive CTAS supports dynamic partition by specifying partition column
names.
+ val partitionColumnNames =
+ Option(ctx.partitionColumnNames)
+ .map(visitIdentifierList(_).toArray)
+ .getOrElse(Array.empty[String])
+
+ val tableDescWithPartitionColNames =
+ tableDesc.copy(partitionColumnNames = partitionColumnNames)
+
val hasStorageProperties = (ctx.createFileFormat.size != 0) ||
(ctx.rowFormat.size != 0)
if (conf.convertCTAS && !hasStorageProperties) {
// At here, both rowStorage.serdeProperties and
fileStorage.serdeProperties
// are empty Maps.
- val newTableDesc = tableDesc.copy(
+ val newTableDesc = tableDescWithPartitionColNames.copy(
storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
provider = Some(conf.defaultDataSourceName))
CreateTable(newTableDesc, mode, Some(q))
} else {
- CreateTable(tableDesc, mode, Some(q))
+ CreateTable(tableDescWithPartitionColNames, mode, Some(q))
}
case None => CreateTable(tableDesc, mode, None)
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fd38944..6abdc40 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.net.URI
-import java.util.Date
import scala.language.existentials
@@ -33,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest,
Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException,
TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -2370,4 +2370,50 @@ class HiveDDLSuite
))
}
}
+
+ test("Hive CTAS can't create partitioned table by specifying schema") {
+ val err1 = intercept[ParseException] {
+ spark.sql(
+ s"""
+ |CREATE TABLE t (a int)
+ |PARTITIONED BY (b string)
+ |STORED AS parquet
+ |AS SELECT 1 as a, "a" as b
+ """.stripMargin)
+ }.getMessage
+ assert(err1.contains("Schema may not be specified in a Create Table As
Select " +
+ "(CTAS) statement"))
+
+ val err2 = intercept[ParseException] {
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |PARTITIONED BY (b string)
+ |STORED AS parquet
+ |AS SELECT 1 as a, "a" as b
+ """.stripMargin)
+ }.getMessage
+ assert(err2.contains("Create Partitioned Table As Select cannot specify
data type for " +
+ "the partition columns of the target table"))
+ }
+
+ test("Hive CTAS with dynamic partition") {
+ Seq("orc", "parquet").foreach { format =>
+ withTable("t") {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |PARTITIONED BY (b)
+ |STORED AS $format
+ |AS SELECT 1 as a, "a" as b
+ """.stripMargin)
+ checkAnswer(spark.table("t"), Row(1, "a"))
+
+
assert(spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ .partitionColumnNames === Seq("b"))
+ }
+ }
+ }
+ }
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6acf446..70efad1 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -692,8 +692,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils
with TestHiveSingleton {
|AS SELECT key, value FROM mytable1
""".stripMargin)
}.getMessage
- assert(e.contains("A Create Table As Select (CTAS) statement is not
allowed to " +
- "create a partitioned table using Hive's file formats"))
+ assert(e.contains("Create Partitioned Table As Select cannot specify
data type for " +
+ "the partition columns of the target table"))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]