Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2466#discussion_r202576876
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
---
@@ -169,220 +128,45 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
provider) = createTableTuple
val (tableIdentifier, temp, ifNotExists, external) =
visitCreateTableHeader(tableHeader)
-
- // TODO: implement temporary tables
- if (temp) {
- throw new ParseException(
- "CREATE TEMPORARY TABLE is not supported yet. " +
- "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
- }
- if (skewSpecContext != null) {
- operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
- }
- if (bucketSpecContext != null) {
- operationNotAllowed("CREATE TABLE ... CLUSTERED BY",
bucketSpecContext)
- }
-
- val cols = Option(columns).toSeq.flatMap(visitColTypeList)
- val properties = getPropertyKeyValues(tablePropertyList)
-
- // Ensuring whether no duplicate name is used in table definition
- val colNames = cols.map(_.name)
- if (colNames.length != colNames.distinct.length) {
- val duplicateColumns = colNames.groupBy(identity).collect {
- case (x, ys) if ys.length > 1 => "\"" + x + "\""
- }
- operationNotAllowed(s"Duplicated column names found in table
definition of " +
- s"$tableIdentifier:
${duplicateColumns.mkString("[", ",", "]")}", columns)
- }
-
- val tablePath = if (locationSpecContext != null) {
+ val cols: Seq[StructField] =
Option(columns).toSeq.flatMap(visitColTypeList)
+ val colNames: Seq[String] = CarbonSparkSqlParserUtil
+ .validateCreateTableReqAndGetColumns(tableHeader,
+ skewSpecContext,
+ bucketSpecContext,
+ columns,
+ cols,
+ tableIdentifier,
+ temp)
+ val tablePath: Option[String] = if (locationSpecContext != null) {
Some(visitLocationSpec(locationSpecContext))
} else {
None
}
val tableProperties = mutable.Map[String, String]()
+ val properties: Map[String, String] =
getPropertyKeyValues(tablePropertyList)
properties.foreach{property => tableProperties.put(property._1,
property._2)}
// validate partition clause
val (partitionByStructFields, partitionFields) =
validatePartitionFields(partitionColumns, colNames, tableProperties)
- // validate partition clause
- if (partitionFields.nonEmpty) {
- if (!CommonUtil.validatePartitionColumns(tableProperties,
partitionFields)) {
- throw new MalformedCarbonCommandException("Error: Invalid
partition definition")
- }
- // partition columns should not be part of the schema
- val badPartCols = partitionFields
- .map(_.partitionColumn.toLowerCase)
- .toSet
- .intersect(colNames.map(_.toLowerCase).toSet)
-
- if (badPartCols.nonEmpty) {
- operationNotAllowed(s"Partition columns should not be specified in
the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[",
",", "]"),
- partitionColumns)
- }
- }
-
- val options = new CarbonOption(properties)
- // validate streaming property
- validateStreamingProperty(options)
- var fields = parser.getFields(cols ++ partitionByStructFields)
// validate for create table as select
val selectQuery = Option(query).map(plan)
- selectQuery match {
- case Some(q) =>
- // create table as select does not allow creation of partitioned
table
- if (partitionFields.nonEmpty) {
- val errorMessage = "A Create Table As Select (CTAS) statement is
not allowed to " +
- "create a partitioned table using Carbondata
file formats."
- operationNotAllowed(errorMessage, partitionColumns)
- }
- // create table as select does not allow to explicitly specify
schema
- if (fields.nonEmpty) {
- operationNotAllowed(
- "Schema may not be specified in a Create Table As Select
(CTAS) statement", columns)
- }
- // external table is not allow
- if (external) {
- operationNotAllowed("Create external table as select",
tableHeader)
- }
- fields = parser
- .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
- .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
- case _ =>
- // ignore this case
- }
- if (partitionFields.nonEmpty && options.isStreaming) {
- operationNotAllowed("Streaming is not allowed on partitioned table",
partitionColumns)
- }
- // validate tblProperties
- val bucketFields = parser.getBucketFields(tableProperties, fields,
options)
- var isTransactionalTable : Boolean = true
-
- val tableInfo = if (external) {
- // read table info from schema file in the provided table path
- // external table also must convert table name to lower case
- val identifier = AbsoluteTableIdentifier.from(
- tablePath.get,
-
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
- tableIdentifier.table.toLowerCase())
- val table = try {
- val schemaPath =
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
- if (!FileFactory.isFileExist(schemaPath,
FileFactory.getFileType(schemaPath))) {
- if (provider.equalsIgnoreCase("'carbonfile'")) {
- SchemaReader.inferSchema(identifier, true)
- } else {
- isTransactionalTable = false
- SchemaReader.inferSchema(identifier, false)
- }
- }
- else {
- SchemaReader.getTableInfo(identifier)
- }
- }
- catch {
- case e: Throwable =>
- operationNotAllowed(s"Invalid table path provided:
${tablePath.get} ", tableHeader)
- }
- // set "_external" property, so that DROP TABLE will not delete the
data
- if (provider.equalsIgnoreCase("'carbonfile'")) {
- table.getFactTable.getTableProperties.put("_filelevelformat",
"true")
- table.getFactTable.getTableProperties.put("_external", "false")
- } else {
- table.getFactTable.getTableProperties.put("_external", "true")
- table.getFactTable.getTableProperties.put("_filelevelformat",
"false")
- }
- // setting local dictionary for all string coloumn for external table
- var isLocalDic_enabled = table.getFactTable.getTableProperties
- .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
- if (null == isLocalDic_enabled) {
- table.getFactTable.getTableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
- CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
- }
- isLocalDic_enabled = table.getFactTable.getTableProperties
- .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
- if
(CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
- isLocalDic_enabled.toBoolean) {
- val allcolumns = table.getFactTable.getListOfColumns
- for (i <- 0 until allcolumns.size()) {
- val cols = allcolumns.get(i)
- if (cols.getDataType == DataTypes.STRING || cols.getDataType ==
DataTypes.VARCHAR) {
- cols.setLocalDictColumn(true)
- }
- allcolumns.set(i, cols)
- }
- table.getFactTable.setListOfColumns(allcolumns)
- }
-
- table
- } else {
- // prepare table model of the collected tokens
- val tableModel: TableModel = parser.prepareTableModel(
- ifNotExists,
- convertDbNameToLowerCase(tableIdentifier.database),
- tableIdentifier.table.toLowerCase,
- fields,
- partitionFields,
- tableProperties,
- bucketFields,
- isAlterFlow = false,
- false,
- tableComment)
- TableNewProcessor(tableModel)
- }
- tableInfo.setTransactionalTable(isTransactionalTable)
- selectQuery match {
- case query@Some(q) =>
- CarbonCreateTableAsSelectCommand(
- tableInfo = tableInfo,
- query = query.get,
- ifNotExistsSet = ifNotExists,
- tableLocation = tablePath)
- case _ =>
- CarbonCreateTableCommand(
- tableInfo = tableInfo,
- ifNotExistsSet = ifNotExists,
- tableLocation = tablePath,
- external)
- }
- }
-
- private def validateStreamingProperty(carbonOption: CarbonOption): Unit
= {
- try {
- carbonOption.isStreaming
- } catch {
- case _: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(
- "Table property 'streaming' should be either 'true' or 'false'")
- }
+ val extraTableTuple = (cols, external, tableIdentifier, ifNotExists,
colNames, tablePath,
+ tableProperties, properties, partitionByStructFields,
partitionFields,
+ parser, sparkSession, selectQuery)
+ CarbonSparkSqlParserUtil
+ .createCarbonTable(createTableTuple, extraTableTuple)
}
private def validatePartitionFields(
--- End diff --
Now this method has only 1 line of code. So, we can move this code line to
caller and avoid this method completely.
---