Repository: spark Updated Branches: refs/heads/master 26445c2e4 -> a9b93e073
http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala deleted file mode 100644 index 6fe0475..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.types.StructType - -private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends CatalystQl(conf) { - import ParserUtils._ - - /** Check if a command should not be explained. */ - protected def isNoExplainCommand(command: String): Boolean = { - "TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command - } - - /** - * For each node, extract properties in the form of a list - * ['key_part1', 'key_part2', 'key_part3', 'value'] - * into a pair (key_part1.key_part2.key_part3, value). - * - * Example format: - * - * TOK_TABLEPROPERTY - * :- 'k1' - * +- 'v1' - * TOK_TABLEPROPERTY - * :- 'k2' - * +- 'v2' - * TOK_TABLEPROPERTY - * :- 'k3' - * +- 'v3' - */ - private def extractProps( - props: Seq[ASTNode], - expectedNodeText: String): Seq[(String, String)] = { - props.map { - case Token(x, keysAndValue) if x == expectedNodeText => - val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".") - val value = unquoteString(keysAndValue.last.text) - (key, value) - case p => - parseFailed(s"Expected property '$expectedNodeText' in command", p) - } - } - - protected override def nodeToPlan(node: ASTNode): LogicalPlan = { - node match { - case Token("TOK_SETCONFIG", Nil) => - val keyValueSeparatorIndex = node.remainder.indexOf('=') - if (keyValueSeparatorIndex >= 0) { - val key = node.remainder.substring(0, keyValueSeparatorIndex).trim - val value = node.remainder.substring(keyValueSeparatorIndex + 1).trim - SetCommand(Some(key -> Option(value))) - } else if (node.remainder.nonEmpty) { - SetCommand(Some(node.remainder -> None)) - } else { - SetCommand(None) - } - - // Just fake explain for any of the native commands. - case Token("TOK_EXPLAIN", explainArgs) if isNoExplainCommand(explainArgs.head.text) => - ExplainCommand(OneRowRelation) - - case Token("TOK_EXPLAIN", explainArgs) if "TOK_CREATETABLE" == explainArgs.head.text => - val Some(crtTbl) :: _ :: extended :: Nil = - getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs) - ExplainCommand(nodeToPlan(crtTbl), extended = extended.isDefined) - - case Token("TOK_EXPLAIN", explainArgs) => - // Ignore FORMATTED if present. - val Some(query) :: _ :: extended :: Nil = - getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs) - ExplainCommand(nodeToPlan(query), extended = extended.isDefined) - - case Token("TOK_REFRESHTABLE", nameParts :: Nil) => - val tableIdent = extractTableIdent(nameParts) - RefreshTable(tableIdent) - - // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] - // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]; - case Token("TOK_CREATEDATABASE", Token(dbName, Nil) :: args) => - val databaseName = cleanIdentifier(dbName) - val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq( - "TOK_IFNOTEXISTS", - "TOK_DATABASELOCATION", - "TOK_DATABASECOMMENT", - "TOK_DATABASEPROPERTIES"), args) - val location = dbLocation.map { - case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc) - case _ => parseFailed("Invalid CREATE DATABASE command", node) - } - val comment = databaseComment.map { - case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com) - case _ => parseFailed("Invalid CREATE DATABASE command", node) - } - val props = dbprops.toSeq.flatMap { - case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) => - // Example format: - // - // TOK_DATABASEPROPERTIES - // +- TOK_DBPROPLIST - // :- TOK_TABLEPROPERTY - // : :- 'k1' - // : +- 'v1' - // :- TOK_TABLEPROPERTY - // :- 'k2' - // +- 'v2' - extractProps(propList, "TOK_TABLEPROPERTY") - case _ => parseFailed("Invalid CREATE DATABASE command", node) - }.toMap - CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props) - - // DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; - case Token("TOK_DROPDATABASE", Token(dbName, Nil) :: otherArgs) => - // Example format: - // - // TOK_DROPDATABASE - // :- database_name - // :- TOK_IFEXISTS - // +- TOK_RESTRICT/TOK_CASCADE - val databaseName = cleanIdentifier(dbName) - // The default is RESTRICT - val Seq(ifExists, _, cascade) = getClauses(Seq( - "TOK_IFEXISTS", "TOK_RESTRICT", "TOK_CASCADE"), otherArgs) - DropDatabase(databaseName, ifExists.isDefined, cascade.isDefined) - - // ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) - case Token("TOK_ALTERDATABASE_PROPERTIES", Token(dbName, Nil) :: args) => - val databaseName = cleanIdentifier(dbName) - val dbprops = getClause("TOK_DATABASEPROPERTIES", args) - val props = dbprops match { - case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) => - // Example format: - // - // TOK_DATABASEPROPERTIES - // +- TOK_DBPROPLIST - // :- TOK_TABLEPROPERTY - // : :- 'k1' - // : +- 'v1' - // :- TOK_TABLEPROPERTY - // :- 'k2' - // +- 'v2' - extractProps(propList, "TOK_TABLEPROPERTY") - case _ => parseFailed("Invalid ALTER DATABASE command", node) - } - AlterDatabaseProperties(databaseName, props.toMap) - - // DESCRIBE DATABASE [EXTENDED] db_name - case Token("TOK_DESCDATABASE", Token(dbName, Nil) :: describeArgs) => - val databaseName = cleanIdentifier(dbName) - val extended = getClauseOption("EXTENDED", describeArgs) - DescribeDatabase(databaseName, extended.isDefined) - - // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name - // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; - case Token("TOK_CREATEFUNCTION", args) => - // Example format: - // - // TOK_CREATEFUNCTION - // :- db_name - // :- func_name - // :- alias - // +- TOK_RESOURCE_LIST - // :- TOK_RESOURCE_URI - // : :- TOK_JAR - // : +- '/path/to/jar' - // +- TOK_RESOURCE_URI - // :- TOK_FILE - // +- 'path/to/file' - val (funcNameArgs, otherArgs) = args.partition { - case Token("TOK_RESOURCE_LIST", _) => false - case Token("TOK_TEMPORARY", _) => false - case Token(_, Nil) => true - case _ => parseFailed("Invalid CREATE FUNCTION command", node) - } - // If database name is specified, there are 3 tokens, otherwise 2. - val (dbName, funcName, alias) = funcNameArgs match { - case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil => - (Some(unquoteString(dbName)), unquoteString(fname), unquoteString(aname)) - case Token(fname, Nil) :: Token(aname, Nil) :: Nil => - (None, unquoteString(fname), unquoteString(aname)) - case _ => - parseFailed("Invalid CREATE FUNCTION command", node) - } - // Extract other keywords, if they exist - val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs) - val resources: Seq[(String, String)] = rList.toSeq.flatMap { - case Token("TOK_RESOURCE_LIST", resList) => - resList.map { - case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) => - val resourceType = rType match { - case Token("TOK_JAR", Nil) => "jar" - case Token("TOK_FILE", Nil) => "file" - case Token("TOK_ARCHIVE", Nil) => "archive" - case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node) - } - (resourceType, unquoteString(rPath)) - case _ => parseFailed("Invalid CREATE FUNCTION command", node) - } - case _ => parseFailed("Invalid CREATE FUNCTION command", node) - } - CreateFunction(dbName, funcName, alias, resources, temp.isDefined)(node.source) - - // DROP [TEMPORARY] FUNCTION [IF EXISTS] function_name; - case Token("TOK_DROPFUNCTION", args) => - // Example format: - // - // TOK_DROPFUNCTION - // :- db_name - // :- func_name - // :- TOK_IFEXISTS - // +- TOK_TEMPORARY - val (funcNameArgs, otherArgs) = args.partition { - case Token("TOK_IFEXISTS", _) => false - case Token("TOK_TEMPORARY", _) => false - case Token(_, Nil) => true - case _ => parseFailed("Invalid DROP FUNCTION command", node) - } - // If database name is specified, there are 2 tokens, otherwise 1. - val (dbName, funcName) = funcNameArgs match { - case Token(dbName, Nil) :: Token(fname, Nil) :: Nil => - (Some(unquoteString(dbName)), unquoteString(fname)) - case Token(fname, Nil) :: Nil => - (None, unquoteString(fname)) - case _ => - parseFailed("Invalid DROP FUNCTION command", node) - } - - val Seq(ifExists, temp) = getClauses(Seq( - "TOK_IFEXISTS", "TOK_TEMPORARY"), otherArgs) - - DropFunction(dbName, funcName, ifExists.isDefined, temp.isDefined)(node.source) - - case Token("TOK_ALTERTABLE", alterTableArgs) => - AlterTableCommandParser.parse(node) - - case Token("TOK_CREATETABLEUSING", createTableArgs) => - val Seq( - temp, - ifNotExists, - Some(tabName), - tableCols, - Some(Token("TOK_TABLEPROVIDER", providerNameParts)), - tableOpts, - tableAs) = getClauses(Seq( - "TEMPORARY", - "TOK_IFNOTEXISTS", - "TOK_TABNAME", "TOK_TABCOLLIST", - "TOK_TABLEPROVIDER", - "TOK_TABLEOPTIONS", - "TOK_QUERY"), createTableArgs) - val tableIdent: TableIdentifier = extractTableIdent(tabName) - val columns = tableCols.map { - case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField)) - case _ => parseFailed("Invalid CREATE TABLE command", node) - } - val provider = providerNameParts.map { - case Token(name, Nil) => name - case _ => parseFailed("Invalid CREATE TABLE command", node) - }.mkString(".") - val options = tableOpts.toSeq.flatMap { - case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION") - case _ => parseFailed("Invalid CREATE TABLE command", node) - }.toMap - val asClause = tableAs.map(nodeToPlan) - - if (temp.isDefined && ifNotExists.isDefined) { - throw new AnalysisException( - "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") - } - - if (asClause.isDefined) { - if (columns.isDefined) { - throw new AnalysisException( - "a CREATE TABLE AS SELECT statement does not allow column definitions.") - } - - val mode = if (ifNotExists.isDefined) { - SaveMode.Ignore - } else if (temp.isDefined) { - SaveMode.Overwrite - } else { - SaveMode.ErrorIfExists - } - - CreateTableUsingAsSelect(tableIdent, - provider, - temp.isDefined, - Array.empty[String], - bucketSpec = None, - mode, - options, - asClause.get) - } else { - CreateTableUsing( - tableIdent, - columns, - provider, - temp.isDefined, - options, - ifNotExists.isDefined, - managedIfNoPath = false) - } - - case Token("TOK_SWITCHDATABASE", Token(database, Nil) :: Nil) => - SetDatabaseCommand(cleanIdentifier(database)) - - case Token("TOK_DESCTABLE", describeArgs) => - // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL - val Some(tableType) :: formatted :: extended :: pretty :: Nil = - getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs) - if (formatted.isDefined || pretty.isDefined) { - // FORMATTED and PRETTY are not supported and this statement will be treated as - // a Hive native command. - nodeToDescribeFallback(node) - } else { - tableType match { - case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts) :: Nil) => - nameParts match { - case Token(dbName, Nil) :: Token(tableName, Nil) :: Nil => - // It is describing a table with the format like "describe db.table". - // TODO: Actually, a user may mean tableName.columnName. Need to resolve this - // issue. - val tableIdent = TableIdentifier( - cleanIdentifier(tableName), Some(cleanIdentifier(dbName))) - datasources.DescribeCommand(tableIdent, isExtended = extended.isDefined) - case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil => - // It is describing a column with the format like "describe db.table column". - nodeToDescribeFallback(node) - case tableName :: Nil => - // It is describing a table with the format like "describe table". - datasources.DescribeCommand( - TableIdentifier(cleanIdentifier(tableName.text)), - isExtended = extended.isDefined) - case _ => - nodeToDescribeFallback(node) - } - // All other cases. - case _ => - nodeToDescribeFallback(node) - } - } - - case Token("TOK_CACHETABLE", Token(tableName, Nil) :: args) => - val Seq(lzy, selectAst) = getClauses(Seq("LAZY", "TOK_QUERY"), args) - CacheTableCommand(tableName, selectAst.map(nodeToPlan), lzy.isDefined) - - case Token("TOK_UNCACHETABLE", Token(tableName, Nil) :: Nil) => - UncacheTableCommand(tableName) - - case Token("TOK_CLEARCACHE", Nil) => - ClearCacheCommand - - case Token("TOK_SHOWTABLES", args) => - val databaseName = args match { - case Nil => None - case Token("TOK_FROM", Token(dbName, Nil) :: Nil) :: Nil => Option(dbName) - case _ => noParseRule("SHOW TABLES", node) - } - ShowTablesCommand(databaseName) - - case _ => - super.nodeToPlan(node) - } - } - - protected def nodeToDescribeFallback(node: ASTNode): LogicalPlan = noParseRule("Describe", node) -} http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8333074..b4687c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -20,8 +20,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder, ParseException} -import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._ +import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.execution.command.{DescribeCommand => _, _} import org.apache.spark.sql.execution.datasources._ @@ -37,7 +37,7 @@ object SparkSqlParser extends AbstractSqlParser{ * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. */ class SparkSqlAstBuilder extends AstBuilder { - import org.apache.spark.sql.catalyst.parser.ng.ParserUtils._ + import org.apache.spark.sql.catalyst.parser.ParserUtils._ /** * Create a [[SetCommand]] logical plan. http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala deleted file mode 100644 index 9fbe6db..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, SortDirection} -import org.apache.spark.sql.catalyst.parser._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.types.StructType - - -/** - * Helper object to parse alter table commands. - */ -object AlterTableCommandParser { - import ParserUtils._ - - /** - * Parse the given node assuming it is an alter table command. - */ - def parse(node: ASTNode): LogicalPlan = { - node.children match { - case (tabName @ Token("TOK_TABNAME", _)) :: otherNodes => - val tableIdent = extractTableIdent(tabName) - val partSpec = getClauseOption("TOK_PARTSPEC", node.children).map(parsePartitionSpec) - matchAlterTableCommands(node, otherNodes, tableIdent, partSpec) - case _ => - parseFailed("Could not parse ALTER TABLE command", node) - } - } - - private def cleanAndUnquoteString(s: String): String = { - cleanIdentifier(unquoteString(s)) - } - - /** - * Extract partition spec from the given [[ASTNode]] as a map, assuming it exists. - * - * Example format: - * - * TOK_PARTSPEC - * :- TOK_PARTVAL - * : :- dt - * : +- '2008-08-08' - * +- TOK_PARTVAL - * :- country - * +- 'us' - */ - private def parsePartitionSpec(node: ASTNode): Map[String, String] = { - node match { - case Token("TOK_PARTSPEC", partitions) => - partitions.map { - // Note: sometimes there's a "=", "<" or ">" between the key and the value - // (e.g. when dropping all partitions with value > than a certain constant) - case Token("TOK_PARTVAL", ident :: conj :: constant :: Nil) => - (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text)) - case Token("TOK_PARTVAL", ident :: constant :: Nil) => - (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text)) - case Token("TOK_PARTVAL", ident :: Nil) => - (cleanAndUnquoteString(ident.text), null) - case _ => - parseFailed("Invalid ALTER TABLE command", node) - }.toMap - case _ => - parseFailed("Expected partition spec in ALTER TABLE command", node) - } - } - - /** - * Extract table properties from the given [[ASTNode]] as a map, assuming it exists. - * - * Example format: - * - * TOK_TABLEPROPERTIES - * +- TOK_TABLEPROPLIST - * :- TOK_TABLEPROPERTY - * : :- 'test' - * : +- 'value' - * +- TOK_TABLEPROPERTY - * :- 'comment' - * +- 'new_comment' - */ - private def extractTableProps(node: ASTNode): Map[String, String] = { - node match { - case Token("TOK_TABLEPROPERTIES", propsList) => - propsList.flatMap { - case Token("TOK_TABLEPROPLIST", props) => - props.map { case Token("TOK_TABLEPROPERTY", key :: value :: Nil) => - val k = cleanAndUnquoteString(key.text) - val v = value match { - case Token("TOK_NULL", Nil) => null - case _ => cleanAndUnquoteString(value.text) - } - (k, v) - } - case _ => - parseFailed("Invalid ALTER TABLE command", node) - }.toMap - case _ => - parseFailed("Expected table properties in ALTER TABLE command", node) - } - } - - /** - * Parse an alter table command from a [[ASTNode]] into a [[LogicalPlan]]. - * This follows https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL. - * - * @param node the original [[ASTNode]] to parse. - * @param otherNodes the other [[ASTNode]]s after the first one containing the table name. - * @param tableIdent identifier of the table, parsed from the first [[ASTNode]]. - * @param partition spec identifying the partition this command is concerned with, if any. - */ - // TODO: This method is massive. Break it down. - private def matchAlterTableCommands( - node: ASTNode, - otherNodes: Seq[ASTNode], - tableIdent: TableIdentifier, - partition: Option[TablePartitionSpec]): LogicalPlan = { - otherNodes match { - // ALTER TABLE table_name RENAME TO new_table_name; - case Token("TOK_ALTERTABLE_RENAME", renameArgs) :: _ => - val tableNameClause = getClause("TOK_TABNAME", renameArgs) - val newTableIdent = extractTableIdent(tableNameClause) - AlterTableRename(tableIdent, newTableIdent)(node.source) - - // ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment); - case Token("TOK_ALTERTABLE_PROPERTIES", args) :: _ => - val properties = extractTableProps(args.head) - AlterTableSetProperties(tableIdent, properties)(node.source) - - // ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'key'); - case Token("TOK_ALTERTABLE_DROPPROPERTIES", args) :: _ => - val properties = extractTableProps(args.head) - val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined - AlterTableUnsetProperties(tableIdent, properties, ifExists)(node.source) - - // ALTER TABLE table_name [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; - case Token("TOK_ALTERTABLE_SERIALIZER", Token(serdeClassName, Nil) :: serdeArgs) :: _ => - AlterTableSerDeProperties( - tableIdent, - Some(cleanAndUnquoteString(serdeClassName)), - serdeArgs.headOption.map(extractTableProps), - partition)(node.source) - - // ALTER TABLE table_name [PARTITION spec] SET SERDEPROPERTIES serde_properties; - case Token("TOK_ALTERTABLE_SERDEPROPERTIES", args) :: _ => - AlterTableSerDeProperties( - tableIdent, - None, - Some(extractTableProps(args.head)), - partition)(node.source) - - // ALTER TABLE table_name CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS; - case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_ALTERTABLE_BUCKETS", b) :: Nil) :: _ => - val clusterCols: Seq[String] = b.head match { - case Token("TOK_TABCOLNAME", children) => children.map(_.text) - case _ => parseFailed("Invalid ALTER TABLE command", node) - } - // If sort columns are specified, num buckets should be the third arg. - // If sort columns are not specified, num buckets should be the second arg. - // TODO: actually use `sortDirections` once we actually store that in the metastore - val (sortCols: Seq[String], sortDirections: Seq[SortDirection], numBuckets: Int) = { - b.tail match { - case Token("TOK_TABCOLNAME", children) :: numBucketsNode :: Nil => - val (cols, directions) = children.map { - case Token("TOK_TABSORTCOLNAMEASC", Token(col, Nil) :: Nil) => (col, Ascending) - case Token("TOK_TABSORTCOLNAMEDESC", Token(col, Nil) :: Nil) => (col, Descending) - }.unzip - (cols, directions, numBucketsNode.text.toInt) - case numBucketsNode :: Nil => - (Nil, Nil, numBucketsNode.text.toInt) - case _ => - parseFailed("Invalid ALTER TABLE command", node) - } - } - AlterTableStorageProperties( - tableIdent, - BucketSpec(numBuckets, clusterCols, sortCols))(node.source) - - // ALTER TABLE table_name NOT CLUSTERED - case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_CLUSTERED", Nil) :: Nil) :: _ => - AlterTableNotClustered(tableIdent)(node.source) - - // ALTER TABLE table_name NOT SORTED - case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_SORTED", Nil) :: Nil) :: _ => - AlterTableNotSorted(tableIdent)(node.source) - - // ALTER TABLE table_name SKEWED BY (col1, col2) - // ON ((col1_value, col2_value) [, (col1_value, col2_value), ...]) - // [STORED AS DIRECTORIES]; - case Token("TOK_ALTERTABLE_SKEWED", - Token("TOK_TABLESKEWED", - Token("TOK_TABCOLNAME", colNames) :: colValues :: rest) :: Nil) :: _ => - // Example format: - // - // TOK_ALTERTABLE_SKEWED - // :- TOK_TABLESKEWED - // : :- TOK_TABCOLNAME - // : : :- dt - // : : +- country - // :- TOK_TABCOLVALUE_PAIR - // : :- TOK_TABCOLVALUES - // : : :- TOK_TABCOLVALUE - // : : : :- '2008-08-08' - // : : : +- 'us' - // : :- TOK_TABCOLVALUES - // : : :- TOK_TABCOLVALUE - // : : : :- '2009-09-09' - // : : : +- 'uk' - // +- TOK_STOREASDIR - val names = colNames.map { n => cleanAndUnquoteString(n.text) } - val values = colValues match { - case Token("TOK_TABCOLVALUE", vals) => - Seq(vals.map { n => cleanAndUnquoteString(n.text) }) - case Token("TOK_TABCOLVALUE_PAIR", pairs) => - pairs.map { - case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", vals) :: Nil) => - vals.map { n => cleanAndUnquoteString(n.text) } - case _ => - parseFailed("Invalid ALTER TABLE command", node) - } - case _ => - parseFailed("Invalid ALTER TABLE command", node) - } - val storedAsDirs = rest match { - case Token("TOK_STOREDASDIRS", Nil) :: Nil => true - case _ => false - } - AlterTableSkewed( - tableIdent, - names, - values, - storedAsDirs)(node.source) - - // ALTER TABLE table_name NOT SKEWED - case Token("TOK_ALTERTABLE_SKEWED", Nil) :: _ => - AlterTableNotSkewed(tableIdent)(node.source) - - // ALTER TABLE table_name NOT STORED AS DIRECTORIES - case Token("TOK_ALTERTABLE_SKEWED", Token("TOK_STOREDASDIRS", Nil) :: Nil) :: _ => - AlterTableNotStoredAsDirs(tableIdent)(node.source) - - // ALTER TABLE table_name SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] ); - case Token("TOK_ALTERTABLE_SKEWED_LOCATION", - Token("TOK_SKEWED_LOCATIONS", - Token("TOK_SKEWED_LOCATION_LIST", locationMaps) :: Nil) :: Nil) :: _ => - // Example format: - // - // TOK_ALTERTABLE_SKEWED_LOCATION - // +- TOK_SKEWED_LOCATIONS - // +- TOK_SKEWED_LOCATION_LIST - // :- TOK_SKEWED_LOCATION_MAP - // : :- 'col1' - // : +- 'loc1' - // +- TOK_SKEWED_LOCATION_MAP - // :- TOK_TABCOLVALUES - // : +- TOK_TABCOLVALUE - // : :- 'col2' - // : +- 'col3' - // +- 'loc2' - val skewedMaps = locationMaps.flatMap { - case Token("TOK_SKEWED_LOCATION_MAP", col :: loc :: Nil) => - col match { - case Token(const, Nil) => - Seq((cleanAndUnquoteString(const), cleanAndUnquoteString(loc.text))) - case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", keys) :: Nil) => - keys.map { k => (cleanAndUnquoteString(k.text), cleanAndUnquoteString(loc.text)) } - } - case _ => - parseFailed("Invalid ALTER TABLE command", node) - }.toMap - AlterTableSkewedLocation(tableIdent, skewedMaps)(node.source) - - // ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] - // spec [LOCATION 'loc2'] ...; - case Token("TOK_ALTERTABLE_ADDPARTS", args) :: _ => - val (ifNotExists, parts) = args.head match { - case Token("TOK_IFNOTEXISTS", Nil) => (true, args.tail) - case _ => (false, args) - } - // List of (spec, location) to describe partitions to add - // Each partition spec may or may not be followed by a location - val parsedParts = new ArrayBuffer[(TablePartitionSpec, Option[String])] - parts.foreach { - case t @ Token("TOK_PARTSPEC", _) => - parsedParts += ((parsePartitionSpec(t), None)) - case Token("TOK_PARTITIONLOCATION", loc :: Nil) => - // Update the location of the last partition we just added - if (parsedParts.nonEmpty) { - val (spec, _) = parsedParts.remove(parsedParts.length - 1) - parsedParts += ((spec, Some(unquoteString(loc.text)))) - } - case _ => - parseFailed("Invalid ALTER TABLE command", node) - } - AlterTableAddPartition(tableIdent, parsedParts, ifNotExists)(node.source) - - // ALTER TABLE table_name PARTITION spec1 RENAME TO PARTITION spec2; - case Token("TOK_ALTERTABLE_RENAMEPART", spec :: Nil) :: _ => - val newPartition = parsePartitionSpec(spec) - val oldPartition = partition.getOrElse { - parseFailed("Expected old partition spec in ALTER TABLE rename partition command", node) - } - AlterTableRenamePartition(tableIdent, oldPartition, newPartition)(node.source) - - // ALTER TABLE table_name_1 EXCHANGE PARTITION spec WITH TABLE table_name_2; - case Token("TOK_ALTERTABLE_EXCHANGEPARTITION", spec :: newTable :: Nil) :: _ => - val parsedSpec = parsePartitionSpec(spec) - val newTableIdent = extractTableIdent(newTable) - AlterTableExchangePartition(tableIdent, newTableIdent, parsedSpec)(node.source) - - // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; - case Token("TOK_ALTERTABLE_DROPPARTS", args) :: _ => - val parts = args.collect { case p @ Token("TOK_PARTSPEC", _) => parsePartitionSpec(p) } - val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined - val purge = getClauseOption("PURGE", args).isDefined - AlterTableDropPartition(tableIdent, parts, ifExists, purge)(node.source) - - // ALTER TABLE table_name ARCHIVE PARTITION spec; - case Token("TOK_ALTERTABLE_ARCHIVE", spec :: Nil) :: _ => - AlterTableArchivePartition(tableIdent, parsePartitionSpec(spec))(node.source) - - // ALTER TABLE table_name UNARCHIVE PARTITION spec; - case Token("TOK_ALTERTABLE_UNARCHIVE", spec :: Nil) :: _ => - AlterTableUnarchivePartition(tableIdent, parsePartitionSpec(spec))(node.source) - - // ALTER TABLE table_name [PARTITION spec] SET FILEFORMAT file_format; - case Token("TOK_ALTERTABLE_FILEFORMAT", args) :: _ => - val Seq(fileFormat, genericFormat) = - getClauses(Seq("TOK_TABLEFILEFORMAT", "TOK_FILEFORMAT_GENERIC"), args) - // Note: the AST doesn't contain information about which file format is being set here. - // E.g. we can't differentiate between INPUTFORMAT and OUTPUTFORMAT if either is set. - // Right now this just stores the values, but we should figure out how to get the keys. - val fFormat = fileFormat - .map { _.children.map { n => cleanAndUnquoteString(n.text) }} - .getOrElse(Seq()) - val gFormat = genericFormat.map { f => cleanAndUnquoteString(f.children(0).text) } - AlterTableSetFileFormat(tableIdent, partition, fFormat, gFormat)(node.source) - - // ALTER TABLE table_name [PARTITION spec] SET LOCATION "loc"; - case Token("TOK_ALTERTABLE_LOCATION", Token(loc, Nil) :: Nil) :: _ => - AlterTableSetLocation(tableIdent, partition, cleanAndUnquoteString(loc))(node.source) - - // ALTER TABLE table_name TOUCH [PARTITION spec]; - case Token("TOK_ALTERTABLE_TOUCH", args) :: _ => - // Note: the partition spec, if it exists, comes after TOUCH, so `partition` should - // always be None here. Instead, we need to parse it from the TOUCH node's children. - val part = getClauseOption("TOK_PARTSPEC", args).map(parsePartitionSpec) - AlterTableTouch(tableIdent, part)(node.source) - - // ALTER TABLE table_name [PARTITION spec] COMPACT 'compaction_type'; - case Token("TOK_ALTERTABLE_COMPACT", Token(compactType, Nil) :: Nil) :: _ => - AlterTableCompact(tableIdent, partition, cleanAndUnquoteString(compactType))(node.source) - - // ALTER TABLE table_name [PARTITION spec] CONCATENATE; - case Token("TOK_ALTERTABLE_MERGEFILES", _) :: _ => - AlterTableMerge(tableIdent, partition)(node.source) - - // ALTER TABLE table_name [PARTITION spec] CHANGE [COLUMN] col_old_name col_new_name - // column_type [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT]; - case Token("TOK_ALTERTABLE_RENAMECOL", oldName :: newName :: dataType :: args) :: _ => - val afterColName: Option[String] = - getClauseOption("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", args).map { ap => - ap.children match { - case Token(col, Nil) :: Nil => col - case _ => parseFailed("Invalid ALTER TABLE command", node) - } - } - val restrict = getClauseOption("TOK_RESTRICT", args).isDefined - val cascade = getClauseOption("TOK_CASCADE", args).isDefined - val comment = args.headOption.map { - case Token("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", _) => null - case Token("TOK_RESTRICT", _) => null - case Token("TOK_CASCADE", _) => null - case Token(commentStr, Nil) => cleanAndUnquoteString(commentStr) - case _ => parseFailed("Invalid ALTER TABLE command", node) - } - AlterTableChangeCol( - tableIdent, - partition, - oldName.text, - newName.text, - nodeToDataType(dataType), - comment, - afterColName, - restrict, - cascade)(node.source) - - // ALTER TABLE table_name [PARTITION spec] ADD COLUMNS (name type [COMMENT comment], ...) - // [CASCADE|RESTRICT] - case Token("TOK_ALTERTABLE_ADDCOLS", args) :: _ => - val columnNodes = getClause("TOK_TABCOLLIST", args).children - val columns = StructType(columnNodes.map(nodeToStructField)) - val restrict = getClauseOption("TOK_RESTRICT", args).isDefined - val cascade = getClauseOption("TOK_CASCADE", args).isDefined - AlterTableAddCol(tableIdent, partition, columns, restrict, cascade)(node.source) - - // ALTER TABLE table_name [PARTITION spec] REPLACE COLUMNS (name type [COMMENT comment], ...) - // [CASCADE|RESTRICT] - case Token("TOK_ALTERTABLE_REPLACECOLS", args) :: _ => - val columnNodes = getClause("TOK_TABCOLLIST", args).children - val columns = StructType(columnNodes.map(nodeToStructField)) - val restrict = getClauseOption("TOK_RESTRICT", args).isDefined - val cascade = getClauseOption("TOK_CASCADE", args).isDefined - AlterTableReplaceCol(tableIdent, partition, columns, restrict, cascade)(node.source) - - case _ => - parseFailed("Unsupported ALTER TABLE command", node) - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d06e908..6cc72fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -26,7 +26,6 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.parser.ParserConf import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -500,19 +499,6 @@ object SQLConf { doc = "When true, we could use `datasource`.`path` as table in SQL query." ) - val PARSER_SUPPORT_QUOTEDID = booleanConf("spark.sql.parser.supportQuotedIdentifiers", - defaultValue = Some(true), - isPublic = false, - doc = "Whether to use quoted identifier.\n false: default(past) behavior. Implies only" + - "alphaNumeric and underscore are valid characters in identifiers.\n" + - " true: implies column names can contain any character.") - - val PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS = booleanConf( - "spark.sql.parser.supportSQL11ReservedKeywords", - defaultValue = Some(false), - isPublic = false, - doc = "This flag should be set to true to enable support for SQL2011 reserved keywords.") - val WHOLESTAGE_CODEGEN_ENABLED = booleanConf("spark.sql.codegen.wholeStage", defaultValue = Some(true), doc = "When true, the whole stage (of multiple operators) will be compiled into single java" + @@ -573,7 +559,7 @@ object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ -class SQLConf extends Serializable with CatalystConf with ParserConf with Logging { +class SQLConf extends Serializable with CatalystConf with Logging { import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ @@ -674,10 +660,6 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) - def supportQuotedId: Boolean = getConf(PARSER_SUPPORT_QUOTEDID) - - def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) - override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 47c9a22..f148f2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,11 +21,22 @@ import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.CatalogDatabase -import org.apache.spark.sql.catalyst.parser.ParserUtils._ import org.apache.spark.sql.test.SharedSQLContext class DDLSuite extends QueryTest with SharedSQLContext { + private val escapedIdentifier = "`(.+)`".r + + /** + * Strip backticks, if any, from the string. + */ + def cleanIdentifier(ident: String): String = { + ident match { + case escapedIdentifier(i) => i + case plainIdent => plainIdent + } + } + /** * Drops database `databaseName` after calling `f`. */ http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/pom.xml ---------------------------------------------------------------------- diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 22bad93..58efd80 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -225,25 +225,6 @@ <argLine>-da -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine> </configuration> </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-default-sources</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>v${hive.version.short}/src/main/scala</source> - <source>${project.build.directory/generated-sources/antlr</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> <!-- Deploy datanucleus jars to the spark/lib_managed/jars directory --> <plugin> http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9a5ec98..2cdc931 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,7 +25,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse} +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} import org.apache.hadoop.hive.ql.plan.TableDesc @@ -988,3 +988,28 @@ private[hive] object HiveMetastoreTypes { case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) } } + +private[hive] case class CreateTableAsSelect( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean) extends UnaryNode with Command { + + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = + tableDesc.identifier.database.isDefined && + tableDesc.schema.nonEmpty && + tableDesc.storage.serde.isDefined && + tableDesc.storage.inputFormat.isDefined && + tableDesc.storage.outputFormat.isDefined && + childrenResolved +} + +private[hive] case class CreateViewAsSelect( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + sql: String) extends UnaryNode with Command { + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = false +} http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala deleted file mode 100644 index 052c43a..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ /dev/null @@ -1,749 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.util.Locale - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} -import org.apache.hadoop.hive.ql.parse.EximUtil -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.parser._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.SparkQl -import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper -import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.AnalysisException - -/** - * Used when we need to start parsing the AST before deciding that we are going to pass the command - * back for Hive to execute natively. Will be replaced with a native command that contains the - * cmd string. - */ -private[hive] case object NativePlaceholder extends LogicalPlan { - override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[Attribute] = Seq.empty -} - -private[hive] case class CreateTableAsSelect( - tableDesc: CatalogTable, - child: LogicalPlan, - allowExisting: Boolean) extends UnaryNode with Command { - - override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = - tableDesc.identifier.database.isDefined && - tableDesc.schema.nonEmpty && - tableDesc.storage.serde.isDefined && - tableDesc.storage.inputFormat.isDefined && - tableDesc.storage.outputFormat.isDefined && - childrenResolved -} - -private[hive] case class CreateViewAsSelect( - tableDesc: CatalogTable, - child: LogicalPlan, - allowExisting: Boolean, - replace: Boolean, - sql: String) extends UnaryNode with Command { - override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = false -} - -/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ -private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging { - import ParseUtils._ - import ParserUtils._ - - // Token text -> human readable text - private val hiveUnsupportedCommands = Map( - "TOK_CREATEROLE" -> "CREATE ROLE", - "TOK_DROPROLE" -> "DROP ROLE", - "TOK_EXPORT" -> "EXPORT TABLE", - "TOK_GRANT" -> "GRANT", - "TOK_GRANT_ROLE" -> "GRANT", - "TOK_IMPORT" -> "IMPORT TABLE", - "TOK_REVOKE" -> "REVOKE", - "TOK_REVOKE_ROLE" -> "REVOKE", - "TOK_SHOW_COMPACTIONS" -> "SHOW COMPACTIONS", - "TOK_SHOW_CREATETABLE" -> "SHOW CREATE TABLE", - "TOK_SHOW_GRANT" -> "SHOW GRANT", - "TOK_SHOW_ROLE_GRANT" -> "SHOW ROLE GRANT", - "TOK_SHOW_ROLE_PRINCIPALS" -> "SHOW PRINCIPALS", - "TOK_SHOW_ROLES" -> "SHOW ROLES", - "TOK_SHOW_SET_ROLE" -> "SHOW CURRENT ROLES / SET ROLE", - "TOK_SHOW_TRANSACTIONS" -> "SHOW TRANSACTIONS", - "TOK_SHOWINDEXES" -> "SHOW INDEXES", - "TOK_SHOWLOCKS" -> "SHOW LOCKS") - - private val nativeCommands = Set( - "TOK_ALTERDATABASE_OWNER", - "TOK_ALTERINDEX_PROPERTIES", - "TOK_ALTERINDEX_REBUILD", - "TOK_ALTERTABLE_ALTERPARTS", - "TOK_ALTERTABLE_PARTITION", - "TOK_ALTERVIEW_ADDPARTS", - "TOK_ALTERVIEW_AS", - "TOK_ALTERVIEW_DROPPARTS", - "TOK_ALTERVIEW_PROPERTIES", - "TOK_ALTERVIEW_RENAME", - - "TOK_CREATEINDEX", - "TOK_CREATEMACRO", - - "TOK_DROPINDEX", - "TOK_DROPMACRO", - "TOK_DROPTABLE_PROPERTIES", - "TOK_DROPVIEW", - "TOK_DROPVIEW_PROPERTIES", - - "TOK_LOAD", - - "TOK_LOCKTABLE", - - "TOK_MSCK", - - "TOK_SHOW_TABLESTATUS", - "TOK_SHOW_TBLPROPERTIES", - "TOK_SHOWCOLUMNS", - "TOK_SHOWDATABASES", - "TOK_SHOWPARTITIONS", - - "TOK_UNLOCKTABLE" - ) - - // Commands that we do not need to explain. - private val noExplainCommands = Set( - "TOK_DESCTABLE", - "TOK_SHOWTABLES", - "TOK_TRUNCATETABLE", // truncate table" is a NativeCommand, does not need to explain. - "TOK_ALTERTABLE" - ) ++ nativeCommands - - /** - * Returns the HiveConf - */ - private[this] def hiveConf: HiveConf = { - var ss = SessionState.get() - // SessionState is lazy initialization, it can be null here - if (ss == null) { - val original = Thread.currentThread().getContextClassLoader - val conf = new HiveConf(classOf[SessionState]) - conf.setClassLoader(original) - ss = new SessionState(conf) - SessionState.start(ss) - } - ss.getConf - } - - protected def getProperties(node: ASTNode): Seq[(String, String)] = node match { - case Token("TOK_TABLEPROPLIST", list) => - list.map { - case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) => - unquoteString(key) -> unquoteString(value) - } - } - - private def createView( - view: ASTNode, - viewNameParts: ASTNode, - query: ASTNode, - schema: Seq[CatalogColumn], - properties: Map[String, String], - allowExist: Boolean, - replace: Boolean): CreateViewAsSelect = { - val tableIdentifier = extractTableIdent(viewNameParts) - val originalText = query.source - val tableDesc = CatalogTable( - identifier = tableIdentifier, - tableType = CatalogTableType.VIRTUAL_VIEW, - schema = schema, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - serdeProperties = Map.empty[String, String] - ), - properties = properties, - viewOriginalText = Some(originalText), - viewText = Some(originalText)) - - // We need to keep the original SQL string so that if `spark.sql.nativeView` is - // false, we can fall back to use hive native command later. - // We can remove this when parser is configurable(can access SQLConf) in the future. - val sql = view.source - CreateViewAsSelect(tableDesc, nodeToPlan(query), allowExist, replace, sql) - } - - /** Creates LogicalPlan for a given SQL string. */ - override def parsePlan(sql: String): LogicalPlan = { - safeParse(sql, ParseDriver.parsePlan(sql, conf)) { ast => - if (nativeCommands.contains(ast.text)) { - HiveNativeCommand(sql) - } else if (hiveUnsupportedCommands.contains(ast.text)) { - val humanReadableText = hiveUnsupportedCommands(ast.text) - throw new AnalysisException("Unsupported operation: " + humanReadableText) - } else { - nodeToPlan(ast) match { - case NativePlaceholder => HiveNativeCommand(sql) - case plan => plan - } - } - } - } - - protected override def isNoExplainCommand(command: String): Boolean = - noExplainCommands.contains(command) - - protected override def nodeToPlan(node: ASTNode): LogicalPlan = { - node match { - case Token("TOK_DFS", Nil) => - HiveNativeCommand(node.source + " " + node.remainder) - - case Token("TOK_ADDFILE", Nil) => - AddFile(node.remainder) - - case Token("TOK_ADDJAR", Nil) => - AddJar(node.remainder) - - // Special drop table that also uncaches. - case Token("TOK_DROPTABLE", Token("TOK_TABNAME", tableNameParts) :: ifExists) => - val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") - DropTable(tableName, ifExists.nonEmpty) - - // Support "ANALYZE TABLE tableName COMPUTE STATISTICS noscan" - case Token("TOK_ANALYZE", - Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: isNoscan) => - // Reference: - // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables - if (partitionSpec.nonEmpty) { - // Analyze partitions will be treated as a Hive native command. - NativePlaceholder - } else if (isNoscan.isEmpty) { - // If users do not specify "noscan", it will be treated as a Hive native command. - NativePlaceholder - } else { - val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") - AnalyzeTable(tableName) - } - - case view @ Token("TOK_ALTERVIEW", children) => - val Some(nameParts) :: maybeQuery :: _ = - getClauses(Seq( - "TOK_TABNAME", - "TOK_QUERY", - "TOK_ALTERVIEW_ADDPARTS", - "TOK_ALTERVIEW_DROPPARTS", - "TOK_ALTERVIEW_PROPERTIES", - "TOK_ALTERVIEW_RENAME"), children) - - // if ALTER VIEW doesn't have query part, let hive to handle it. - maybeQuery.map { query => - createView(view, nameParts, query, Nil, Map(), allowExist = false, replace = true) - }.getOrElse(NativePlaceholder) - - case view @ Token("TOK_CREATEVIEW", children) - if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty => - val Seq( - Some(viewNameParts), - Some(query), - maybeComment, - replace, - allowExisting, - maybeProperties, - maybeColumns, - maybePartCols - ) = getClauses(Seq( - "TOK_TABNAME", - "TOK_QUERY", - "TOK_TABLECOMMENT", - "TOK_ORREPLACE", - "TOK_IFNOTEXISTS", - "TOK_TABLEPROPERTIES", - "TOK_TABCOLNAME", - "TOK_VIEWPARTCOLS"), children) - - // If the view is partitioned, we let hive handle it. - if (maybePartCols.isDefined) { - NativePlaceholder - } else { - val schema = maybeColumns.map { cols => - // We can't specify column types when create view, so fill it with null first, and - // update it after the schema has been resolved later. - nodeToColumns(cols, lowerCase = true).map(_.copy(dataType = null)) - }.getOrElse(Seq.empty[CatalogColumn]) - - val properties = scala.collection.mutable.Map.empty[String, String] - - maybeProperties.foreach { - case Token("TOK_TABLEPROPERTIES", list :: Nil) => - properties ++= getProperties(list) - } - - maybeComment.foreach { - case Token("TOK_TABLECOMMENT", child :: Nil) => - val comment = unescapeSQLString(child.text) - if (comment ne null) { - properties += ("comment" -> comment) - } - } - - createView(view, viewNameParts, query, schema, properties.toMap, - allowExisting.isDefined, replace.isDefined) - } - - case Token("TOK_CREATETABLE", children) - if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty => - // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL - val ( - Some(tableNameParts) :: - _ /* likeTable */ :: - externalTable :: - Some(query) :: - allowExisting +: - _) = - getClauses( - Seq( - "TOK_TABNAME", - "TOK_LIKETABLE", - "EXTERNAL", - "TOK_QUERY", - "TOK_IFNOTEXISTS", - "TOK_TABLECOMMENT", - "TOK_TABCOLLIST", - "TOK_TABLEPARTCOLS", // Partitioned by - "TOK_TABLEBUCKETS", // Clustered by - "TOK_TABLESKEWED", // Skewed by - "TOK_TABLEROWFORMAT", - "TOK_TABLESERIALIZER", - "TOK_FILEFORMAT_GENERIC", - "TOK_TABLEFILEFORMAT", // User-provided InputFormat and OutputFormat - "TOK_STORAGEHANDLER", // Storage handler - "TOK_TABLELOCATION", - "TOK_TABLEPROPERTIES"), - children) - val tableIdentifier = extractTableIdent(tableNameParts) - - // TODO add bucket support - var tableDesc: CatalogTable = CatalogTable( - identifier = tableIdentifier, - tableType = - if (externalTable.isDefined) { - CatalogTableType.EXTERNAL_TABLE - } else { - CatalogTableType.MANAGED_TABLE - }, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - serdeProperties = Map.empty[String, String] - ), - schema = Seq.empty[CatalogColumn]) - - // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.) - val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) - // handle the default format for the storage type abbreviation - val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse { - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - } - - tableDesc = tableDesc.withNewStorage( - inputFormat = hiveSerDe.inputFormat.orElse(tableDesc.storage.inputFormat), - outputFormat = hiveSerDe.outputFormat.orElse(tableDesc.storage.outputFormat), - serde = hiveSerDe.serde.orElse(tableDesc.storage.serde)) - - children.collect { - case list @ Token("TOK_TABCOLLIST", _) => - val cols = nodeToColumns(list, lowerCase = true) - if (cols != null) { - tableDesc = tableDesc.copy(schema = cols) - } - case Token("TOK_TABLECOMMENT", child :: Nil) => - val comment = unescapeSQLString(child.text) - // TODO support the sql text - tableDesc = tableDesc.copy(viewText = Option(comment)) - case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) => - val cols = nodeToColumns(list.head, lowerCase = false) - if (cols != null) { - tableDesc = tableDesc.copy(partitionColumns = cols) - } - case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil) => - val serdeParams = new java.util.HashMap[String, String]() - child match { - case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) => - val fieldDelim = unescapeSQLString (rowChild1.text) - serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim) - serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim) - if (rowChild2.length > 1) { - val fieldEscape = unescapeSQLString (rowChild2.head.text) - serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape) - } - case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) => - val collItemDelim = unescapeSQLString(rowChild.text) - serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim) - case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) => - val mapKeyDelim = unescapeSQLString(rowChild.text) - serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim) - case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) => - val lineDelim = unescapeSQLString(rowChild.text) - if (!(lineDelim == "\n") && !(lineDelim == "10")) { - throw new AnalysisException( - s"LINES TERMINATED BY only supports newline '\\n' right now: $rowChild") - } - serdeParams.put(serdeConstants.LINE_DELIM, lineDelim) - case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) => - val nullFormat = unescapeSQLString(rowChild.text) - // TODO support the nullFormat - case _ => assert(false) - } - tableDesc = tableDesc.withNewStorage( - serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams.asScala) - case Token("TOK_TABLELOCATION", child :: Nil) => - val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text)) - tableDesc = tableDesc.withNewStorage(locationUri = Option(location)) - case Token("TOK_TABLESERIALIZER", child :: Nil) => - tableDesc = tableDesc.withNewStorage( - serde = Option(unescapeSQLString(child.children.head.text))) - if (child.numChildren == 2) { - // This is based on the readProps(..) method in - // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java: - val serdeParams = child.children(1).children.head.children.map { - case Token(_, Token(prop, Nil) :: valueNode) => - val value = valueNode.headOption - .map(_.text) - .map(unescapeSQLString) - .orNull - (unescapeSQLString(prop), value) - }.toMap - tableDesc = tableDesc.withNewStorage( - serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams) - } - case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) => - child.text.toLowerCase(Locale.ENGLISH) match { - case "orc" => - tableDesc = tableDesc.withNewStorage( - inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - if (tableDesc.storage.serde.isEmpty) { - tableDesc = tableDesc.withNewStorage( - serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - } - - case "parquet" => - tableDesc = tableDesc.withNewStorage( - inputFormat = - Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) - if (tableDesc.storage.serde.isEmpty) { - tableDesc = tableDesc.withNewStorage( - serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) - } - - case "rcfile" => - tableDesc = tableDesc.withNewStorage( - inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - if (tableDesc.storage.serde.isEmpty) { - tableDesc = tableDesc.withNewStorage( - serde = - Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) - } - - case "textfile" => - tableDesc = tableDesc.withNewStorage( - inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - - case "sequencefile" => - tableDesc = tableDesc.withNewStorage( - inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), - outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) - - case "avro" => - tableDesc = tableDesc.withNewStorage( - inputFormat = - Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")) - if (tableDesc.storage.serde.isEmpty) { - tableDesc = tableDesc.withNewStorage( - serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) - } - - case _ => - throw new AnalysisException( - s"Unrecognized file format in STORED AS clause: ${child.text}") - } - - case Token("TOK_TABLESERIALIZER", - Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) => - tableDesc = tableDesc.withNewStorage(serde = Option(unquoteString(serdeName))) - - otherProps match { - case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil => - tableDesc = tableDesc.withNewStorage( - serdeProperties = tableDesc.storage.serdeProperties ++ getProperties(list)) - case _ => - } - - case Token("TOK_TABLEPROPERTIES", list :: Nil) => - tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list)) - case list @ Token("TOK_TABLEFILEFORMAT", _) => - tableDesc = tableDesc.withNewStorage( - inputFormat = Option(unescapeSQLString(list.children.head.text)), - outputFormat = Option(unescapeSQLString(list.children(1).text))) - case Token("TOK_STORAGEHANDLER", _) => - throw new AnalysisException( - "CREATE TABLE AS SELECT cannot be used for a non-native table") - case _ => // Unsupported features - } - - CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting.isDefined) - - // If its not a "CTAS" like above then take it as a native command - case Token("TOK_CREATETABLE", _) => - NativePlaceholder - - // Support "TRUNCATE TABLE table_name [PARTITION partition_spec]" - case Token("TOK_TRUNCATETABLE", Token("TOK_TABLE_PARTITION", table) :: Nil) => - NativePlaceholder - - case _ => - super.nodeToPlan(node) - } - } - - protected override def nodeToDescribeFallback(node: ASTNode): LogicalPlan = NativePlaceholder - - protected override def nodeToTransformation( - node: ASTNode, - child: LogicalPlan): Option[logical.ScriptTransformation] = node match { - case Token("TOK_SELEXPR", - Token("TOK_TRANSFORM", - Token("TOK_EXPLIST", inputExprs) :: - Token("TOK_SERDE", inputSerdeClause) :: - Token("TOK_RECORDWRITER", writerClause) :: - // TODO: Need to support other types of (in/out)put - Token(script, Nil) :: - Token("TOK_SERDE", outputSerdeClause) :: - Token("TOK_RECORDREADER", readerClause) :: - outputClause) :: Nil) => - - val (output, schemaLess) = outputClause match { - case Token("TOK_ALIASLIST", aliases) :: Nil => - (aliases.map { case Token(name, Nil) => - AttributeReference(cleanIdentifier(name), StringType)() }, false) - case Token("TOK_TABCOLLIST", attributes) :: Nil => - (attributes.map { case Token("TOK_TABCOL", Token(name, Nil) :: dataType :: Nil) => - AttributeReference(cleanIdentifier(name), nodeToDataType(dataType))() }, false) - case Nil => - (List(AttributeReference("key", StringType)(), - AttributeReference("value", StringType)()), true) - case _ => - noParseRule("Transform", node) - } - - type SerDeInfo = ( - Seq[(String, String)], // Input row format information - Option[String], // Optional input SerDe class - Seq[(String, String)], // Input SerDe properties - Boolean // Whether to use default record reader/writer - ) - - def matchSerDe(clause: Seq[ASTNode]): SerDeInfo = clause match { - case Token("TOK_SERDEPROPS", propsClause) :: Nil => - val rowFormat = propsClause.map { - case Token(name, Token(value, Nil) :: Nil) => (name, value) - } - (rowFormat, None, Nil, false) - - case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil => - (Nil, Some(unescapeSQLString(serdeClass)), Nil, false) - - case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: - Token("TOK_TABLEPROPERTIES", - Token("TOK_TABLEPROPLIST", propsClause) :: Nil) :: Nil) :: Nil => - val serdeProps = propsClause.map { - case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) => - (unescapeSQLString(name), unescapeSQLString(value)) - } - - // SPARK-10310: Special cases LazySimpleSerDe - // TODO Fully supports user-defined record reader/writer classes - val unescapedSerDeClass = unescapeSQLString(serdeClass) - val useDefaultRecordReaderWriter = - unescapedSerDeClass == classOf[LazySimpleSerDe].getCanonicalName - (Nil, Some(unescapedSerDeClass), serdeProps, useDefaultRecordReaderWriter) - - case Nil => - // Uses default TextRecordReader/TextRecordWriter, sets field delimiter here - val serdeProps = Seq(serdeConstants.FIELD_DELIM -> "\t") - (Nil, Option(hiveConf.getVar(ConfVars.HIVESCRIPTSERDE)), serdeProps, true) - } - - val (inRowFormat, inSerdeClass, inSerdeProps, useDefaultRecordReader) = - matchSerDe(inputSerdeClause) - - val (outRowFormat, outSerdeClass, outSerdeProps, useDefaultRecordWriter) = - matchSerDe(outputSerdeClause) - - val unescapedScript = unescapeSQLString(script) - - // TODO Adds support for user-defined record reader/writer classes - val recordReaderClass = if (useDefaultRecordReader) { - Option(hiveConf.getVar(ConfVars.HIVESCRIPTRECORDREADER)) - } else { - None - } - - val recordWriterClass = if (useDefaultRecordWriter) { - Option(hiveConf.getVar(ConfVars.HIVESCRIPTRECORDWRITER)) - } else { - None - } - - val schema = HiveScriptIOSchema( - inRowFormat, outRowFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - recordReaderClass, recordWriterClass, - schemaLess) - - Some( - logical.ScriptTransformation( - inputExprs.map(nodeToExpr), - unescapedScript, - output, - child, schema)) - case _ => None - } - - protected override def nodeToGenerator(node: ASTNode): Generator = node match { - case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) => - val functionInfo: FunctionInfo = - Option(FunctionRegistry.getFunctionInfo(functionName.toLowerCase)).getOrElse( - sys.error(s"Couldn't find function $functionName")) - val functionClassName = functionInfo.getFunctionClass.getName - HiveGenericUDTF( - functionName, new HiveFunctionWrapper(functionClassName), children.map(nodeToExpr)) - case other => super.nodeToGenerator(node) - } - - // This is based the getColumns methods in - // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java - protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[CatalogColumn] = { - node.children.map(_.children).collect { - case Token(rawColName, Nil) :: colTypeNode :: comment => - val colName = if (!lowerCase) rawColName else rawColName.toLowerCase - CatalogColumn( - name = cleanIdentifier(colName), - dataType = nodeToTypeString(colTypeNode), - nullable = true, - comment.headOption.map(n => unescapeSQLString(n.text))) - } - } - - // This is based on the following methods in - // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java: - // getTypeStringFromAST - // getStructTypeStringFromAST - // getUnionTypeStringFromAST - protected def nodeToTypeString(node: ASTNode): String = node.tokenType match { - case SparkSqlParser.TOK_LIST => - val listType :: Nil = node.children - val listTypeString = nodeToTypeString(listType) - s"${serdeConstants.LIST_TYPE_NAME}<$listTypeString>" - - case SparkSqlParser.TOK_MAP => - val keyType :: valueType :: Nil = node.children - val keyTypeString = nodeToTypeString(keyType) - val valueTypeString = nodeToTypeString(valueType) - s"${serdeConstants.MAP_TYPE_NAME}<$keyTypeString,$valueTypeString>" - - case SparkSqlParser.TOK_STRUCT => - val typeNode = node.children.head - require(typeNode.children.nonEmpty, "Struct must have one or more columns.") - val structColStrings = typeNode.children.map { columnNode => - val Token(colName, Nil) :: colTypeNode :: Nil = columnNode.children - cleanIdentifier(colName) + ":" + nodeToTypeString(colTypeNode) - } - s"${serdeConstants.STRUCT_TYPE_NAME}<${structColStrings.mkString(",")}>" - - case SparkSqlParser.TOK_UNIONTYPE => - val typeNode = node.children.head - val unionTypesString = typeNode.children.map(nodeToTypeString).mkString(",") - s"${serdeConstants.UNION_TYPE_NAME}<$unionTypesString>" - - case SparkSqlParser.TOK_CHAR => - val Token(size, Nil) :: Nil = node.children - s"${serdeConstants.CHAR_TYPE_NAME}($size)" - - case SparkSqlParser.TOK_VARCHAR => - val Token(size, Nil) :: Nil = node.children - s"${serdeConstants.VARCHAR_TYPE_NAME}($size)" - - case SparkSqlParser.TOK_DECIMAL => - val precisionAndScale = node.children match { - case Token(precision, Nil) :: Token(scale, Nil) :: Nil => - precision + "," + scale - case Token(precision, Nil) :: Nil => - precision + "," + HiveDecimal.USER_DEFAULT_SCALE - case Nil => - HiveDecimal.USER_DEFAULT_PRECISION + "," + HiveDecimal.USER_DEFAULT_SCALE - case _ => - noParseRule("Decimal", node) - } - s"${serdeConstants.DECIMAL_TYPE_NAME}($precisionAndScale)" - - // Simple data types. - case SparkSqlParser.TOK_BOOLEAN => serdeConstants.BOOLEAN_TYPE_NAME - case SparkSqlParser.TOK_TINYINT => serdeConstants.TINYINT_TYPE_NAME - case SparkSqlParser.TOK_SMALLINT => serdeConstants.SMALLINT_TYPE_NAME - case SparkSqlParser.TOK_INT => serdeConstants.INT_TYPE_NAME - case SparkSqlParser.TOK_BIGINT => serdeConstants.BIGINT_TYPE_NAME - case SparkSqlParser.TOK_FLOAT => serdeConstants.FLOAT_TYPE_NAME - case SparkSqlParser.TOK_DOUBLE => serdeConstants.DOUBLE_TYPE_NAME - case SparkSqlParser.TOK_STRING => serdeConstants.STRING_TYPE_NAME - case SparkSqlParser.TOK_BINARY => serdeConstants.BINARY_TYPE_NAME - case SparkSqlParser.TOK_DATE => serdeConstants.DATE_TYPE_NAME - case SparkSqlParser.TOK_TIMESTAMP => serdeConstants.TIMESTAMP_TYPE_NAME - case SparkSqlParser.TOK_INTERVAL_YEAR_MONTH => serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME - case SparkSqlParser.TOK_INTERVAL_DAY_TIME => serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME - case SparkSqlParser.TOK_DATETIME => serdeConstants.DATETIME_TYPE_NAME - case _ => null - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index d6a08fc..12e4f49 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -29,8 +29,8 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.parser.ng._ -import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._ +import org.apache.spark.sql.catalyst.parser._ +import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView} @@ -161,18 +161,10 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { } // Create the schema. - val schema = Option(ctx.colTypeList).toSeq.flatMap(_.colType.asScala).map { col => - CatalogColumn( - col.identifier.getText, - col.dataType.getText.toLowerCase, // TODO validate this? - nullable = true, - Option(col.STRING).map(string)) - } + val schema = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns(_, _.toLowerCase)) // Get the column by which the table is partitioned. - val partitionCols = Option(ctx.identifierList).toSeq.flatMap(visitIdentifierList).map { - CatalogColumn(_, null, nullable = true, None) - } + val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns(_)) // Create the storage. def format(fmt: ParserRuleContext): CatalogStorageFormat = { @@ -439,4 +431,19 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { } EmptyStorageFormat.copy(serdeProperties = entries.toMap) } + + /** + * Create a sequence of [[CatalogColumn]]s from a column list + */ + private def visitCatalogColumns( + ctx: ColTypeListContext, + formatter: String => String = identity): Seq[CatalogColumn] = withOrigin(ctx) { + ctx.colType.asScala.map { col => + CatalogColumn( + formatter(col.identifier.getText), + col.dataType.getText.toLowerCase, // TODO validate this? + nullable = true, + Option(col.STRING).map(string)) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala index 4b6da7c..d966468 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala @@ -22,8 +22,8 @@ import scala.util.Try import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalyst.parser.ParseDriver import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.hive.execution.HiveSqlParser import org.apache.spark.sql.hive.test.TestHiveSingleton class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterEach { @@ -131,7 +131,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd * @param token a unique token in the string that should be indicated by the exception */ def positionTest(name: String, query: String, token: String): Unit = { - def ast = ParseDriver.parsePlan(query, hiveContext.conf) + def ast = HiveSqlParser.parsePlan(query) def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>") test(name) { http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 0aaf576..75108c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -24,11 +24,11 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.JsonTuple -import org.apache.spark.sql.catalyst.parser.SimpleParserConf import org.apache.spark.sql.catalyst.plans.logical.Generate +import org.apache.spark.sql.hive.execution.HiveSqlParser class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { - val parser = new HiveQl(SimpleParserConf()) + val parser = HiveSqlParser private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { http://git-wip-us.apache.org/repos/asf/spark/blob/a9b93e07/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index ae026ed..05318f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,7 +21,6 @@ import scala.reflect.ClassTag import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.SimpleParserConf import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -30,11 +29,9 @@ import org.apache.spark.sql.internal.SQLConf class StatisticsSuite extends QueryTest with TestHiveSingleton { import hiveContext.sql - val parser = new HiveQl(SimpleParserConf()) - test("parse analyze commands") { def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { - val parsed = parser.parsePlan(analyzeCommand) + val parsed = HiveSqlParser.parsePlan(analyzeCommand) val operators = parsed.collect { case a: AnalyzeTable => a case o => o --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
