Repository: carbondata Updated Branches: refs/heads/master a4c2ef5f8 -> 5aada46e7
[CARBONDATA-2710][Spark Integration] Refactor CarbonSparkSqlParser for better code reuse. Refactor CarbonSparkSqlParser for better code reuse This closes #2466 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5aada46e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5aada46e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5aada46e Branch: refs/heads/master Commit: 5aada46e7bb6bcbb11652979862e3ccebaa6e3e8 Parents: a4c2ef5 Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Mon Jul 9 16:08:47 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Wed Jul 18 16:20:30 2018 +0530 ---------------------------------------------------------------------- .../spark/sql/parser/CarbonSparkSqlParser.scala | 293 ++------------- .../sql/parser/CarbonSparkSqlParserUtil.scala | 367 +++++++++++++++++++ .../spark/sql/hive/CarbonSessionState.scala | 4 +- .../spark/sql/hive/CarbonSqlAstBuilder.scala | 4 +- 4 files changed, 397 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 4cc0e1b..39dce3a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -16,35 +16,25 @@ */ package org.apache.spark.sql.parser -import scala.collection.JavaConverters._ import scala.collection.mutable import org.antlr.v4.runtime.tree.TerminalNode -import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession} -import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser} -import org.apache.spark.sql.catalyst.parser.ParserUtils._ +import org.apache.spark.sql.{CarbonSession, SparkSession} +import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor} -import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} +import org.apache.spark.sql.execution.command.PartitionerField import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.metadata.schema.SchemaReader -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} +import org.apache.carbondata.spark.util.CarbonScalaUtil /** - * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific + * Concrete parser for Spark SQL statements and carbon specific * statements */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { @@ -90,60 +80,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) extends SparkSqlAstBuilder(conf) { - - def getFileStorage(createFileFormat: CreateFileFormatContext): String = { - Option(createFileFormat) match { - case Some(value) => - val result = value.children.get(1).getText - if (result.equalsIgnoreCase("by")) { - value.storageHandler().STRING().getSymbol.getText - } else if (result.equalsIgnoreCase("as") && value.children.size() > 1) { - value.children.get(2).getText - } else { - // The case of "STORED AS PARQUET/ORC" - "" - } - case _ => "" - } - } - - /** - * This method will convert the database name to lower case - * - * @param dbName - * @return Option of String - */ - def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = { - dbName match { - case Some(databaseName) => Some(databaseName.toLowerCase) - case None => dbName - } - } - - - - def needToConvertToLowerCase(key: String): Boolean = { - val noConvertList = Array("LIST_INFO", "RANGE_INFO", "PATH") - !noConvertList.exists(x => x.equalsIgnoreCase(key)) - } - /** * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. */ def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { val props = visitTablePropertyList(ctx) - val badKeys = props.filter { case (_, v) => v == null }.keys - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props.map { case (key, value) => - if (needToConvertToLowerCase(key)) { - (key.toLowerCase, value.toLowerCase) - } else { - (key.toLowerCase, value) - } - } + CarbonSparkSqlParserUtil.visitPropertyKeyValues(ctx, props) } def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] @@ -169,222 +111,39 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, provider) = createTableTuple val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader) - - // TODO: implement temporary tables - if (temp) { - throw new ParseException( - "CREATE TEMPORARY TABLE is not supported yet. " + - "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader) - } - if (skewSpecContext != null) { - operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext) - } - if (bucketSpecContext != null) { - operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext) - } - - val cols = Option(columns).toSeq.flatMap(visitColTypeList) - val properties = getPropertyKeyValues(tablePropertyList) - - // Ensuring whether no duplicate name is used in table definition - val colNames = cols.map(_.name) - if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of " + - s"$tableIdentifier: ${duplicateColumns.mkString("[", ",", "]")}", columns) - } - - val tablePath = if (locationSpecContext != null) { + val cols: Seq[StructField] = Option(columns).toSeq.flatMap(visitColTypeList) + val colNames: Seq[String] = CarbonSparkSqlParserUtil + .validateCreateTableReqAndGetColumns(tableHeader, + skewSpecContext, + bucketSpecContext, + columns, + cols, + tableIdentifier, + temp) + val tablePath: Option[String] = if (locationSpecContext != null) { Some(visitLocationSpec(locationSpecContext)) } else { None } val tableProperties = mutable.Map[String, String]() + val properties: Map[String, String] = getPropertyKeyValues(tablePropertyList) properties.foreach{property => tableProperties.put(property._1, property._2)} // validate partition clause - val (partitionByStructFields, partitionFields) = - validatePartitionFields(partitionColumns, colNames, tableProperties) - - // validate partition clause - if (partitionFields.nonEmpty) { - if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) { - throw new MalformedCarbonCommandException("Error: Invalid partition definition") - } - // partition columns should not be part of the schema - val badPartCols = partitionFields - .map(_.partitionColumn.toLowerCase) - .toSet - .intersect(colNames.map(_.toLowerCase).toSet) - - if (badPartCols.nonEmpty) { - operationNotAllowed(s"Partition columns should not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), - partitionColumns) - } - } + val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList) + val partitionFields = CarbonSparkSqlParserUtil. + validatePartitionFields(partitionColumns, colNames, tableProperties, + partitionByStructFields) - val options = new CarbonOption(properties) - // validate streaming property - validateStreamingProperty(options) - var fields = parser.getFields(cols ++ partitionByStructFields) // validate for create table as select val selectQuery = Option(query).map(plan) - selectQuery match { - case Some(q) => - // create table as select does not allow creation of partitioned table - if (partitionFields.nonEmpty) { - val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Carbondata file formats." - operationNotAllowed(errorMessage, partitionColumns) - } - // create table as select does not allow to explicitly specify schema - if (fields.nonEmpty) { - operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", columns) - } - // external table is not allow - if (external) { - operationNotAllowed("Create external table as select", tableHeader) - } - fields = parser - .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore - .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get)) - case _ => - // ignore this case - } - if (partitionFields.nonEmpty && options.isStreaming) { - operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns) - } - // validate tblProperties - val bucketFields = parser.getBucketFields(tableProperties, fields, options) - var isTransactionalTable : Boolean = true - - val tableInfo = if (external) { - // read table info from schema file in the provided table path - // external table also must convert table name to lower case - val identifier = AbsoluteTableIdentifier.from( - tablePath.get, - CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(), - tableIdentifier.table.toLowerCase()) - val table = try { - val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) - if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { - if (provider.equalsIgnoreCase("'carbonfile'")) { - SchemaReader.inferSchema(identifier, true) - } else { - isTransactionalTable = false - SchemaReader.inferSchema(identifier, false) - } - } - else { - SchemaReader.getTableInfo(identifier) - } - } - catch { - case e: Throwable => - operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader) - } - // set "_external" property, so that DROP TABLE will not delete the data - if (provider.equalsIgnoreCase("'carbonfile'")) { - table.getFactTable.getTableProperties.put("_filelevelformat", "true") - table.getFactTable.getTableProperties.put("_external", "false") - } else { - table.getFactTable.getTableProperties.put("_external", "true") - table.getFactTable.getTableProperties.put("_filelevelformat", "false") - } - // setting local dictionary for all string coloumn for external table - var isLocalDic_enabled = table.getFactTable.getTableProperties - .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) - if (null == isLocalDic_enabled) { - table.getFactTable.getTableProperties - .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, - CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) - } - isLocalDic_enabled = table.getFactTable.getTableProperties - .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) - if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) && - isLocalDic_enabled.toBoolean) { - val allcolumns = table.getFactTable.getListOfColumns - for (i <- 0 until allcolumns.size()) { - val cols = allcolumns.get(i) - if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) { - cols.setLocalDictColumn(true) - } - allcolumns.set(i, cols) - } - table.getFactTable.setListOfColumns(allcolumns) - } - - table - } else { - // prepare table model of the collected tokens - val tableModel: TableModel = parser.prepareTableModel( - ifNotExists, - convertDbNameToLowerCase(tableIdentifier.database), - tableIdentifier.table.toLowerCase, - fields, - partitionFields, - tableProperties, - bucketFields, - isAlterFlow = false, - false, - tableComment) - TableNewProcessor(tableModel) - } - tableInfo.setTransactionalTable(isTransactionalTable) - selectQuery match { - case query@Some(q) => - CarbonCreateTableAsSelectCommand( - tableInfo = tableInfo, - query = query.get, - ifNotExistsSet = ifNotExists, - tableLocation = tablePath) - case _ => - CarbonCreateTableCommand( - tableInfo = tableInfo, - ifNotExistsSet = ifNotExists, - tableLocation = tablePath, - external) - } - } - - private def validateStreamingProperty(carbonOption: CarbonOption): Unit = { - try { - carbonOption.isStreaming - } catch { - case _: IllegalArgumentException => - throw new MalformedCarbonCommandException( - "Table property 'streaming' should be either 'true' or 'false'") - } + val extraTableTuple = (cols, external, tableIdentifier, ifNotExists, colNames, tablePath, + tableProperties, properties, partitionByStructFields, partitionFields, + parser, sparkSession, selectQuery) + CarbonSparkSqlParserUtil + .createCarbonTable(createTableTuple, extraTableTuple) } - - private def validatePartitionFields( - partitionColumns: ColTypeListContext, - colNames: Seq[String], - tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = { - val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList) - val partitionerFields = partitionByStructFields.map { structField => - PartitionerField(structField.name, Some(structField.dataType.toString), null) - } - if (partitionerFields.nonEmpty) { - if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) { - throw new MalformedCarbonCommandException("Error: Invalid partition definition") - } - // partition columns should not be part of the schema - val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet) - if (badPartCols.nonEmpty) { - operationNotAllowed(s"Partition columns should not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]") - , partitionColumns: ColTypeListContext) - } - } - (partitionByStructFields, partitionerFields) - } - } trait CarbonAstTrait { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala new file mode 100644 index 0000000..9c0a099 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + +import scala.collection.mutable + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, +CarbonCreateTableCommand} +import org.apache.spark.sql.types.StructField + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} + +/** + * Utility class to validate the create table and CTAS command, + * and to prepare the logical plan of create table and CTAS command. + */ +object CarbonSparkSqlParserUtil { + /** + * The method validate that the property configured for the streaming attribute is valid. + * + * @param carbonOption Instance of CarbonOption having all the required option for datasource. + */ + private def validateStreamingProperty(carbonOption: CarbonOption): Unit = { + try { + carbonOption.isStreaming + } catch { + case _: IllegalArgumentException => + throw new MalformedCarbonCommandException( + "Table property 'streaming' should be either 'true' or 'false'") + } + } + + /** + * The method validates the create table command and returns the create table or + * ctas table LogicalPlan. + * + * @param createTableTuple a tuple of (CreateTableHeaderContext, SkewSpecContext, + * BucketSpecContext, ColTypeListContext, ColTypeListContext, + * TablePropertyListContext, + * LocationSpecContext, Option[String], TerminalNode, QueryContext, + * String) + * @param extraTableTuple A tupple of (Seq[StructField], Boolean, TableIdentifier, Boolean, + * Seq[String], + * Option[String], mutable.Map[String, String], Map[String, String], + * Seq[StructField], + * Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession, + * Option[LogicalPlan]) + * @return <LogicalPlan> of create table or ctas table + * + */ + def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext, + BucketSpecContext, ColTypeListContext, ColTypeListContext, TablePropertyListContext, + LocationSpecContext, Option[String], TerminalNode, QueryContext, String), + extraTableTuple: (Seq[StructField], Boolean, TableIdentifier, Boolean, Seq[String], + Option[String], mutable.Map[String, String], Map[String, String], Seq[StructField], + Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession, + Option[LogicalPlan])): LogicalPlan = { + val (tableHeader, skewSpecContext, bucketSpecContext, partitionColumns, columns, + tablePropertyList, locationSpecContext, tableComment, ctas, query, provider) = createTableTuple + val (cols, external, tableIdentifier, ifNotExists, colNames, tablePath, + tableProperties, properties, partitionByStructFields, partitionFields, + parser, sparkSession, selectQuery) = extraTableTuple + val options = new CarbonOption(properties) + // validate streaming property + validateStreamingProperty(options) + var fields = parser.getFields(cols ++ partitionByStructFields) + // validate for create table as select + selectQuery match { + case Some(q) => + // create table as select does not allow creation of partitioned table + if (partitionFields.nonEmpty) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Carbondata file formats." + operationNotAllowed(errorMessage, partitionColumns) + } + // create table as select does not allow to explicitly specify schema + if (fields.nonEmpty) { + operationNotAllowed( + "Schema may not be specified in a Create Table As Select (CTAS) statement", columns) + } + // external table is not allow + if (external) { + operationNotAllowed("Create external table as select", tableHeader) + } + fields = parser + .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore + .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get)) + case _ => + // ignore this case + } + if (partitionFields.nonEmpty && options.isStreaming) { + operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns) + } + // validate tblProperties + val bucketFields = parser.getBucketFields(tableProperties, fields, options) + var isTransactionalTable: Boolean = true + + val tableInfo = if (external) { + // read table info from schema file in the provided table path + // external table also must convert table name to lower case + val identifier = AbsoluteTableIdentifier.from( + tablePath.get, + CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(), + tableIdentifier.table.toLowerCase()) + val table = try { + val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) + if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { + if (provider.equalsIgnoreCase("'carbonfile'")) { + SchemaReader.inferSchema(identifier, true) + } else { + isTransactionalTable = false + SchemaReader.inferSchema(identifier, false) + } + } + else { + SchemaReader.getTableInfo(identifier) + } + } + catch { + case e: Throwable => + operationNotAllowed(s"Invalid table path provided: ${ tablePath.get } ", tableHeader) + } + // set "_external" property, so that DROP TABLE will not delete the data + if (provider.equalsIgnoreCase("'carbonfile'")) { + table.getFactTable.getTableProperties.put("_filelevelformat", "true") + table.getFactTable.getTableProperties.put("_external", "false") + } else { + table.getFactTable.getTableProperties.put("_external", "true") + table.getFactTable.getTableProperties.put("_filelevelformat", "false") + } + var isLocalDic_enabled = table.getFactTable.getTableProperties + .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) + if (null == isLocalDic_enabled) { + table.getFactTable.getTableProperties + .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, + CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) + } + isLocalDic_enabled = table.getFactTable.getTableProperties + .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) + if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) && + isLocalDic_enabled.toBoolean) { + val allcolumns = table.getFactTable.getListOfColumns + for (i <- 0 until allcolumns.size()) { + val cols = allcolumns.get(i) + if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) { + cols.setLocalDictColumn(true) + } + allcolumns.set(i, cols) + } + table.getFactTable.setListOfColumns(allcolumns) + } + table + } else { + // prepare table model of the collected tokens + val tableModel: TableModel = parser.prepareTableModel( + ifNotExists, + convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + partitionFields, + tableProperties, + bucketFields, + isAlterFlow = false, + false, + tableComment) + TableNewProcessor(tableModel) + } + tableInfo.setTransactionalTable(isTransactionalTable) + selectQuery match { + case query@Some(q) => + CarbonCreateTableAsSelectCommand( + tableInfo = tableInfo, + query = query.get, + ifNotExistsSet = ifNotExists, + tableLocation = tablePath) + case _ => + CarbonCreateTableCommand( + tableInfo = tableInfo, + ifNotExistsSet = ifNotExists, + tableLocation = tablePath, + external) + } + } + + /** + * This method will convert the database name to lower case + * + * @param dbName database name. + * @return Option of String + */ + def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = { + dbName match { + case Some(databaseName) => Some(databaseName.toLowerCase) + case None => dbName + } + } + + /** + * Validates the partition columns and return's A tuple of partition columns and partitioner + * fields. + * + * @param partitionColumns An instance of ColTypeListContext having parser rules for + * column. + * @param colNames <Seq[String]> Sequence of Table column names. + * @param tableProperties <Map[String, String]> Table property map. + * @param partitionByStructFields Seq[StructField] Sequence of partition fields. + * @return <Seq[PartitionerField]> A Seq of partitioner fields. + */ + def validatePartitionFields( + partitionColumns: ColTypeListContext, + colNames: Seq[String], + tableProperties: mutable.Map[String, String], + partitionByStructFields: Seq[StructField]): Seq[PartitionerField] = { + + val partitionerFields = partitionByStructFields.map { structField => + PartitionerField(structField.name, Some(structField.dataType.toString), null) + } + // validate partition clause + if (partitionerFields.nonEmpty) { + if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) { + throw new MalformedCarbonCommandException("Error: Invalid partition definition") + } + // partition columns should not be part of the schema + val badPartCols = partitionerFields.map(_.partitionColumn.toLowerCase).toSet + .intersect(colNames.map(_.toLowerCase).toSet) + if (badPartCols.nonEmpty) { + operationNotAllowed(s"Partition columns should not be specified in the schema: " + + badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]") + , partitionColumns: ColTypeListContext) + } + } + partitionerFields + } + + /** + * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. + * + * @param ctx Instance of TablePropertyListContext defining parser rule for the table + * properties. + * @param props <Map[String, String]> Map of table property list + * @return <Map[String, String]> Map of transformed table property. + */ + def visitPropertyKeyValues(ctx: TablePropertyListContext, + props: Map[String, String]): Map[String, String] = { + val badKeys = props.filter { case (_, v) => v == null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx) + } + props.map { case (key, value) => + if (needToConvertToLowerCase(key)) { + (key.toLowerCase, value.toLowerCase) + } else { + (key.toLowerCase, value) + } + } + } + + /** + * check's whether need to convert to lower case + * + * @param key <String> property key + * @return returns <true> if lower case conversion is needed else <false> + */ + def needToConvertToLowerCase(key: String): Boolean = { + val noConvertList = Array("LIST_INFO", "RANGE_INFO", "PATH") + !noConvertList.exists(x => x.equalsIgnoreCase(key)) + } + + /** + * The method validate the create table command and returns the table's columns. + * + * @param tableHeader An instance of CreateTableHeaderContext having parser rules for + * create table. + * @param skewSpecContext An instance of SkewSpecContext having parser rules for create table. + * @param bucketSpecContext An instance of BucketSpecContext having parser rules for create table. + * @param columns An instance of ColTypeListContext having parser rules for columns + * of the table. + * @param cols Table;s columns. + * @param tableIdentifier Instance of table identifier. + * @param isTempTable Flag to identify temp table. + * @return Table's column names <Seq[String]>. + */ + def validateCreateTableReqAndGetColumns(tableHeader: CreateTableHeaderContext, + skewSpecContext: SkewSpecContext, + bucketSpecContext: BucketSpecContext, + columns: ColTypeListContext, + cols: Seq[StructField], + tableIdentifier: TableIdentifier, + isTempTable: Boolean): Seq[String] = { + // TODO: implement temporary tables + if (isTempTable) { + throw new ParseException( + "CREATE TEMPORARY TABLE is not supported yet. " + + "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader) + } + if (skewSpecContext != null) { + operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext) + } + if (bucketSpecContext != null) { + operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext) + } + + // Ensuring whether no duplicate name is used in table definition + val colNames: Seq[String] = cols.map(_.name) + if (colNames.length != colNames.distinct.length) { + val duplicateColumns = colNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => "\"" + x + "\"" + } + operationNotAllowed(s"Duplicated column names found in table definition of " + + s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }", + columns) + } + colNames + } + /** + * The method return's the storage type + * @param createFileFormat + * @return + */ + def getFileStorage(createFileFormat: CreateFileFormatContext): String = { + Option(createFileFormat) match { + case Some(value) => + val result = value.children.get(1).getText + if (result.equalsIgnoreCase("by")) { + value.storageHandler().STRING().getSymbol.getText + } else if (result.equalsIgnoreCase("as") && value.children.size() > 1) { + value.children.get(2).getText + } else { + // The case of "STORED AS PARQUET/ORC" + "" + } + case _ => "" + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 2c98ec2..759539b 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStr import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} -import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} +import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil} import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -376,7 +376,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { - val fileStorage = helper.getFileStorage(ctx.createFileFormat) + val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("carbondata") || http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala index a533db0..b8bf56d 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterT import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser} +import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil} import org.apache.spark.sql.types.DecimalType import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -40,7 +40,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { - val fileStorage = helper.getFileStorage(ctx.createFileFormat) + val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("carbondata") ||