Repository: spark Updated Branches: refs/heads/master 200f01c8f -> 890baaca5
[SPARK-15674][SQL] Deprecates "CREATE TEMPORARY TABLE USING...", uses "CREAT TEMPORARY VIEW USING..." instead ## What changes were proposed in this pull request? The current implementation of "CREATE TEMPORARY TABLE USING datasource..." is NOT creating any intermediate temporary data directory like temporary HDFS folder, instead, it only stores a SQL string in memory. Probably we should use "TEMPORARY VIEW" instead. This PR assumes a temporary table has to link with some temporary intermediate data. It follows the definition of temporary table like this (from [hortonworks doc](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_dataintegration/content/temp-tables.html)): > A temporary table is a convenient way for an application to automatically > manage intermediate data generated during a complex query **Example**: ``` scala> spark.sql("CREATE temporary view my_tab7 (c1: String, c2: String) USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')") scala> spark.sql("select c1, c2 from my_tab7").show() +----+-----+ | c1| c2| +----+-----+ |year| make| |2012|Tesla| ... ``` It NOW prints a **deprecation warning** if "CREATE TEMPORARY TABLE USING..." is used. ``` scala> spark.sql("CREATE temporary table my_tab7 (c1: String, c2: String) USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')") 16/05/31 10:39:27 WARN SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE tableName USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead ``` ## How was this patch tested? Unit test. Author: Sean Zhong <[email protected]> Closes #13414 from clockfly/create_temp_view_using. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/890baaca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/890baaca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/890baaca Branch: refs/heads/master Commit: 890baaca5078df0b50c0054f55a2c33023f7fd67 Parents: 200f01c Author: Sean Zhong <[email protected]> Authored: Tue Jun 7 15:21:55 2016 -0700 Committer: Herman van Hovell <[email protected]> Committed: Tue Jun 7 15:21:55 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +++ .../spark/sql/execution/SparkSqlParser.scala | 15 +++++++++++++- .../spark/sql/execution/SparkStrategies.scala | 9 +++++++-- .../spark/sql/execution/datasources/ddl.scala | 5 +++-- .../spark/sql/execution/command/DDLSuite.scala | 21 +++++++++++++++++++- 5 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/890baaca/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index b0e71c7..2dd3cfa 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -90,6 +90,9 @@ statement identifierCommentList? (COMMENT STRING)? (PARTITIONED ON identifierList)? (TBLPROPERTIES tablePropertyList)? AS query #createView + | CREATE (OR REPLACE)? TEMPORARY VIEW + tableIdentifier ('(' colTypeList ')')? tableProvider + (OPTIONS tablePropertyList)? #createTempViewUsing | ALTER VIEW tableIdentifier AS? query #alterViewQuery | CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING (USING resource (',' resource)*)? #createFunction http://git-wip-us.apache.org/repos/asf/spark/blob/890baaca/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c68c8f8..dc74222 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.types.DataType @@ -347,6 +347,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Creates a [[CreateTempViewUsing]] logical plan. + */ + override def visitCreateTempViewUsing( + ctx: CreateTempViewUsingContext): LogicalPlan = withOrigin(ctx) { + CreateTempViewUsing( + tableIdent = visitTableIdentifier(ctx.tableIdentifier()), + userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType), + replace = ctx.REPLACE != null, + provider = ctx.tableProvider.qualifiedName.getText, + options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) + } + + /** * Create a [[LoadDataCommand]] command. * * For example: http://git-wip-us.apache.org/repos/asf/spark/blob/890baaca/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b20897e..a36fe78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -376,9 +376,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case c: CreateTableUsing if c.temporary && !c.allowExisting => + logWarning( + s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + + s"please use CREATE TEMPORARY VIEW viewName USING... instead") ExecutedCommandExec( - CreateTempTableUsing( - c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil + CreateTempViewUsing( + c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil case c: CreateTableUsing if !c.temporary => val cmd = @@ -409,6 +412,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { c.child) ExecutedCommandExec(cmd) :: Nil + case c: CreateTempViewUsing => + ExecutedCommandExec(c) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/spark/blob/890baaca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index bf272e3..aa42eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -64,9 +64,10 @@ case class CreateTableUsingAsSelect( override def output: Seq[Attribute] = Seq.empty[Attribute] } -case class CreateTempTableUsing( +case class CreateTempViewUsing( tableIdent: TableIdentifier, userSpecifiedSchema: Option[StructType], + replace: Boolean, provider: String, options: Map[String, String]) extends RunnableCommand { @@ -84,7 +85,7 @@ case class CreateTempTableUsing( sparkSession.sessionState.catalog.createTempView( tableIdent.table, Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan, - overrideIfExists = true) + replace) Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/890baaca/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 741ea67..a7e6893 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} @@ -422,6 +422,25 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create temporary view using") { + val csvFile = Thread.currentThread().getContextClassLoader.getResource("cars.csv").toString() + withView("testview") { + sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1: String, c2: String) USING " + + "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat " + + s"OPTIONS (PATH '$csvFile')") + + checkAnswer( + sql("select c1, c2 from testview order by c1 limit 1"), + Row("1997", "Ford") :: Nil) + + // Fails if creating a new view with the same name + intercept[TempTableAlreadyExistsException] { + sql(s"CREATE TEMPORARY VIEW testview USING " + + s"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '$csvFile')") + } + } + } + test("alter table: rename") { val catalog = spark.sessionState.catalog val tableIdent1 = TableIdentifier("tab1", Some("dbx")) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
