Repository: carbondata
Updated Branches:
  refs/heads/master a4c2ef5f8 -> 5aada46e7


[CARBONDATA-2710][Spark Integration] Refactor CarbonSparkSqlParser for better 
code reuse.

Refactor CarbonSparkSqlParser for better code reuse

This closes #2466


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5aada46e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5aada46e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5aada46e

Branch: refs/heads/master
Commit: 5aada46e7bb6bcbb11652979862e3ccebaa6e3e8
Parents: a4c2ef5
Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com>
Authored: Mon Jul 9 16:08:47 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Jul 18 16:20:30 2018 +0530

----------------------------------------------------------------------
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 293 ++-------------
 .../sql/parser/CarbonSparkSqlParserUtil.scala   | 367 +++++++++++++++++++
 .../spark/sql/hive/CarbonSessionState.scala     |   4 +-
 .../spark/sql/hive/CarbonSqlAstBuilder.scala    |   4 +-
 4 files changed, 397 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 4cc0e1b..39dce3a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -16,35 +16,25 @@
  */
 package org.apache.spark.sql.parser
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.antlr.v4.runtime.tree.TerminalNode
-import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
-import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, 
ParseException, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.{CarbonSession, SparkSession}
+import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, 
TableNewProcessor}
-import 
org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, 
CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.command.PartitionerField
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.schema.SchemaReader
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
- * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments 
and carbon specific
+ * Concrete parser for Spark SQL statements and carbon specific
  * statements
  */
 class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends 
AbstractSqlParser {
@@ -90,60 +80,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     parser: CarbonSpark2SqlParser,
     sparkSession: SparkSession)
   extends SparkSqlAstBuilder(conf) {
-
-  def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
-    Option(createFileFormat) match {
-      case Some(value) =>
-        val result = value.children.get(1).getText
-        if (result.equalsIgnoreCase("by")) {
-          value.storageHandler().STRING().getSymbol.getText
-        } else if (result.equalsIgnoreCase("as") && value.children.size() > 1) 
{
-          value.children.get(2).getText
-        } else {
-          // The case of "STORED AS PARQUET/ORC"
-          ""
-        }
-      case _ => ""
-    }
-  }
-
-  /**
-   * This method will convert the database name to lower case
-   *
-   * @param dbName
-   * @return Option of String
-   */
-  def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
-    dbName match {
-      case Some(databaseName) => Some(databaseName.toLowerCase)
-      case None => dbName
-    }
-  }
-
-
-
-  def needToConvertToLowerCase(key: String): Boolean = {
-    val noConvertList = Array("LIST_INFO", "RANGE_INFO", "PATH")
-    !noConvertList.exists(x => x.equalsIgnoreCase(key))
-  }
-
   /**
    * Parse a key-value map from a [[TablePropertyListContext]], assuming all 
values are specified.
    */
   def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, 
String] = {
     val props = visitTablePropertyList(ctx)
-    val badKeys = props.filter { case (_, v) => v == null }.keys
-    if (badKeys.nonEmpty) {
-      operationNotAllowed(
-        s"Values must be specified for key(s): ${badKeys.mkString("[", ",", 
"]")}", ctx)
-    }
-    props.map { case (key, value) =>
-      if (needToConvertToLowerCase(key)) {
-        (key.toLowerCase, value.toLowerCase)
-      } else {
-        (key.toLowerCase, value)
-      }
-    }
+    CarbonSparkSqlParserUtil.visitPropertyKeyValues(ctx, props)
   }
 
   def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
@@ -169,222 +111,39 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
       provider) = createTableTuple
 
     val (tableIdentifier, temp, ifNotExists, external) = 
visitCreateTableHeader(tableHeader)
-
-    // TODO: implement temporary tables
-    if (temp) {
-      throw new ParseException(
-        "CREATE TEMPORARY TABLE is not supported yet. " +
-        "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
-    }
-    if (skewSpecContext != null) {
-      operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
-    }
-    if (bucketSpecContext != null) {
-      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
-    }
-
-    val cols = Option(columns).toSeq.flatMap(visitColTypeList)
-    val properties = getPropertyKeyValues(tablePropertyList)
-
-    // Ensuring whether no duplicate name is used in table definition
-    val colNames = cols.map(_.name)
-    if (colNames.length != colNames.distinct.length) {
-      val duplicateColumns = colNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }
-      operationNotAllowed(s"Duplicated column names found in table definition 
of " +
-                          s"$tableIdentifier: ${duplicateColumns.mkString("[", 
",", "]")}", columns)
-    }
-
-    val tablePath = if (locationSpecContext != null) {
+    val cols: Seq[StructField] = 
Option(columns).toSeq.flatMap(visitColTypeList)
+    val colNames: Seq[String] = CarbonSparkSqlParserUtil
+      .validateCreateTableReqAndGetColumns(tableHeader,
+        skewSpecContext,
+        bucketSpecContext,
+        columns,
+        cols,
+        tableIdentifier,
+        temp)
+    val tablePath: Option[String] = if (locationSpecContext != null) {
       Some(visitLocationSpec(locationSpecContext))
     } else {
       None
     }
 
     val tableProperties = mutable.Map[String, String]()
+    val properties: Map[String, String] = 
getPropertyKeyValues(tablePropertyList)
     properties.foreach{property => tableProperties.put(property._1, 
property._2)}
 
     // validate partition clause
-    val (partitionByStructFields, partitionFields) =
-      validatePartitionFields(partitionColumns, colNames, tableProperties)
-
-    // validate partition clause
-    if (partitionFields.nonEmpty) {
-      if (!CommonUtil.validatePartitionColumns(tableProperties, 
partitionFields)) {
-         throw new MalformedCarbonCommandException("Error: Invalid partition 
definition")
-      }
-      // partition columns should not be part of the schema
-      val badPartCols = partitionFields
-        .map(_.partitionColumn.toLowerCase)
-        .toSet
-        .intersect(colNames.map(_.toLowerCase).toSet)
-
-      if (badPartCols.nonEmpty) {
-        operationNotAllowed(s"Partition columns should not be specified in the 
schema: " +
-                            badPartCols.map("\"" + _ + "\"").mkString("[", 
",", "]"),
-          partitionColumns)
-      }
-    }
+    val partitionByStructFields = 
Option(partitionColumns).toSeq.flatMap(visitColTypeList)
+    val partitionFields = CarbonSparkSqlParserUtil.
+      validatePartitionFields(partitionColumns, colNames, tableProperties,
+      partitionByStructFields)
 
-    val options = new CarbonOption(properties)
-    // validate streaming property
-    validateStreamingProperty(options)
-    var fields = parser.getFields(cols ++ partitionByStructFields)
     // validate for create table as select
     val selectQuery = Option(query).map(plan)
-    selectQuery match {
-      case Some(q) =>
-        // create table as select does not allow creation of partitioned table
-        if (partitionFields.nonEmpty) {
-          val errorMessage = "A Create Table As Select (CTAS) statement is not 
allowed to " +
-                             "create a partitioned table using Carbondata file 
formats."
-          operationNotAllowed(errorMessage, partitionColumns)
-        }
-        // create table as select does not allow to explicitly specify schema
-        if (fields.nonEmpty) {
-          operationNotAllowed(
-            "Schema may not be specified in a Create Table As Select (CTAS) 
statement", columns)
-        }
-        // external table is not allow
-        if (external) {
-          operationNotAllowed("Create external table as select", tableHeader)
-        }
-        fields = parser
-          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
-            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
-      case _ =>
-        // ignore this case
-    }
-    if (partitionFields.nonEmpty && options.isStreaming) {
-      operationNotAllowed("Streaming is not allowed on partitioned table", 
partitionColumns)
-    }
-    // validate tblProperties
-    val bucketFields = parser.getBucketFields(tableProperties, fields, options)
-    var isTransactionalTable : Boolean = true
-
-    val tableInfo = if (external) {
-      // read table info from schema file in the provided table path
-      // external table also must convert table name to lower case
-      val identifier = AbsoluteTableIdentifier.from(
-        tablePath.get,
-        
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
-        tableIdentifier.table.toLowerCase())
-      val table = try {
-        val schemaPath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
-        if (!FileFactory.isFileExist(schemaPath, 
FileFactory.getFileType(schemaPath))) {
-          if (provider.equalsIgnoreCase("'carbonfile'")) {
-            SchemaReader.inferSchema(identifier, true)
-          } else {
-            isTransactionalTable = false
-            SchemaReader.inferSchema(identifier, false)
-          }
-        }
-        else {
-          SchemaReader.getTableInfo(identifier)
-        }
-      }
-        catch {
-        case e: Throwable =>
-          operationNotAllowed(s"Invalid table path provided: ${tablePath.get} 
", tableHeader)
-      }
-      // set "_external" property, so that DROP TABLE will not delete the data
-      if (provider.equalsIgnoreCase("'carbonfile'")) {
-        table.getFactTable.getTableProperties.put("_filelevelformat", "true")
-        table.getFactTable.getTableProperties.put("_external", "false")
-      } else {
-        table.getFactTable.getTableProperties.put("_external", "true")
-        table.getFactTable.getTableProperties.put("_filelevelformat", "false")
-      }
-      // setting local dictionary for all string coloumn for external table
-      var isLocalDic_enabled = table.getFactTable.getTableProperties
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
-      if (null == isLocalDic_enabled) {
-        table.getFactTable.getTableProperties
-          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
-            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
-      }
-      isLocalDic_enabled = table.getFactTable.getTableProperties
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
-      if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
-          isLocalDic_enabled.toBoolean) {
-        val allcolumns = table.getFactTable.getListOfColumns
-        for (i <- 0 until allcolumns.size()) {
-          val cols = allcolumns.get(i)
-          if (cols.getDataType == DataTypes.STRING || cols.getDataType == 
DataTypes.VARCHAR) {
-            cols.setLocalDictColumn(true)
-          }
-          allcolumns.set(i, cols)
-        }
-        table.getFactTable.setListOfColumns(allcolumns)
-      }
-
-      table
-    } else {
-      // prepare table model of the collected tokens
-      val tableModel: TableModel = parser.prepareTableModel(
-        ifNotExists,
-        convertDbNameToLowerCase(tableIdentifier.database),
-        tableIdentifier.table.toLowerCase,
-        fields,
-        partitionFields,
-        tableProperties,
-        bucketFields,
-        isAlterFlow = false,
-        false,
-        tableComment)
-      TableNewProcessor(tableModel)
-    }
-    tableInfo.setTransactionalTable(isTransactionalTable)
-    selectQuery match {
-      case query@Some(q) =>
-        CarbonCreateTableAsSelectCommand(
-          tableInfo = tableInfo,
-          query = query.get,
-          ifNotExistsSet = ifNotExists,
-          tableLocation = tablePath)
-      case _ =>
-        CarbonCreateTableCommand(
-          tableInfo = tableInfo,
-          ifNotExistsSet = ifNotExists,
-          tableLocation = tablePath,
-          external)
-    }
-  }
-
-  private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
-    try {
-      carbonOption.isStreaming
-    } catch {
-      case _: IllegalArgumentException =>
-        throw new MalformedCarbonCommandException(
-          "Table property 'streaming' should be either 'true' or 'false'")
-    }
+    val extraTableTuple = (cols, external, tableIdentifier, ifNotExists, 
colNames, tablePath,
+      tableProperties, properties, partitionByStructFields, partitionFields,
+      parser, sparkSession, selectQuery)
+    CarbonSparkSqlParserUtil
+      .createCarbonTable(createTableTuple, extraTableTuple)
   }
-
-  private def validatePartitionFields(
-      partitionColumns: ColTypeListContext,
-      colNames: Seq[String],
-      tableProperties: mutable.Map[String, String]): (Seq[StructField], 
Seq[PartitionerField]) = {
-    val partitionByStructFields = 
Option(partitionColumns).toSeq.flatMap(visitColTypeList)
-    val partitionerFields = partitionByStructFields.map { structField =>
-      PartitionerField(structField.name, Some(structField.dataType.toString), 
null)
-    }
-    if (partitionerFields.nonEmpty) {
-      if (!CommonUtil.validatePartitionColumns(tableProperties, 
partitionerFields)) {
-         throw new MalformedCarbonCommandException("Error: Invalid partition 
definition")
-      }
-      // partition columns should not be part of the schema
-      val badPartCols = 
partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
-      if (badPartCols.nonEmpty) {
-        operationNotAllowed(s"Partition columns should not be specified in the 
schema: " +
-                            badPartCols.map("\"" + _ + "\"").mkString("[", 
",", "]")
-          , partitionColumns: ColTypeListContext)
-      }
-    }
-    (partitionByStructFields, partitionerFields)
-  }
-
 }
 
 trait CarbonAstTrait {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
new file mode 100644
index 0000000..9c0a099
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import scala.collection.mutable
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, 
TableNewProcessor}
+import 
org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand,
+CarbonCreateTableCommand}
+import org.apache.spark.sql.types.StructField
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
+
+/**
+ * Utility class to validate the create table and CTAS command,
+ * and to prepare the logical plan of create table and CTAS command.
+ */
+object CarbonSparkSqlParserUtil {
+  /**
+   * The method validate that the property configured for the streaming 
attribute is valid.
+   *
+   * @param carbonOption Instance of CarbonOption having all the required 
option for datasource.
+   */
+  private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
+    try {
+      carbonOption.isStreaming
+    } catch {
+      case _: IllegalArgumentException =>
+        throw new MalformedCarbonCommandException(
+          "Table property 'streaming' should be either 'true' or 'false'")
+    }
+  }
+
+  /**
+   * The method validates the create table command and returns the create 
table or
+   * ctas table LogicalPlan.
+   *
+   * @param createTableTuple a tuple of (CreateTableHeaderContext, 
SkewSpecContext,
+   *                         BucketSpecContext, ColTypeListContext, 
ColTypeListContext,
+   *                         TablePropertyListContext,
+   *                         LocationSpecContext, Option[String], 
TerminalNode, QueryContext,
+   *                         String)
+   * @param extraTableTuple  A tupple of (Seq[StructField], Boolean, 
TableIdentifier, Boolean,
+   *                         Seq[String],
+   *                         Option[String], mutable.Map[String, String], 
Map[String, String],
+   *                         Seq[StructField],
+   *                         Seq[PartitionerField], CarbonSpark2SqlParser, 
SparkSession,
+   *                         Option[LogicalPlan])
+   * @return <LogicalPlan> of create table or ctas table
+   *
+   */
+  def createCarbonTable(createTableTuple: (CreateTableHeaderContext, 
SkewSpecContext,
+    BucketSpecContext, ColTypeListContext, ColTypeListContext, 
TablePropertyListContext,
+    LocationSpecContext, Option[String], TerminalNode, QueryContext, String),
+      extraTableTuple: (Seq[StructField], Boolean, TableIdentifier, Boolean, 
Seq[String],
+        Option[String], mutable.Map[String, String], Map[String, String], 
Seq[StructField],
+        Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession,
+        Option[LogicalPlan])): LogicalPlan = {
+    val (tableHeader, skewSpecContext, bucketSpecContext, partitionColumns, 
columns,
+    tablePropertyList, locationSpecContext, tableComment, ctas, query, 
provider) = createTableTuple
+    val (cols, external, tableIdentifier, ifNotExists, colNames, tablePath,
+    tableProperties, properties, partitionByStructFields, partitionFields,
+    parser, sparkSession, selectQuery) = extraTableTuple
+    val options = new CarbonOption(properties)
+    // validate streaming property
+    validateStreamingProperty(options)
+    var fields = parser.getFields(cols ++ partitionByStructFields)
+    // validate for create table as select
+    selectQuery match {
+      case Some(q) =>
+        // create table as select does not allow creation of partitioned table
+        if (partitionFields.nonEmpty) {
+          val errorMessage = "A Create Table As Select (CTAS) statement is not 
allowed to " +
+                             "create a partitioned table using Carbondata file 
formats."
+          operationNotAllowed(errorMessage, partitionColumns)
+        }
+        // create table as select does not allow to explicitly specify schema
+        if (fields.nonEmpty) {
+          operationNotAllowed(
+            "Schema may not be specified in a Create Table As Select (CTAS) 
statement", columns)
+        }
+        // external table is not allow
+        if (external) {
+          operationNotAllowed("Create external table as select", tableHeader)
+        }
+        fields = parser
+          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
+            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
+      case _ =>
+      // ignore this case
+    }
+    if (partitionFields.nonEmpty && options.isStreaming) {
+      operationNotAllowed("Streaming is not allowed on partitioned table", 
partitionColumns)
+    }
+    // validate tblProperties
+    val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+    var isTransactionalTable: Boolean = true
+
+    val tableInfo = if (external) {
+      // read table info from schema file in the provided table path
+      // external table also must convert table name to lower case
+      val identifier = AbsoluteTableIdentifier.from(
+        tablePath.get,
+        
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
+        tableIdentifier.table.toLowerCase())
+      val table = try {
+        val schemaPath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
+        if (!FileFactory.isFileExist(schemaPath, 
FileFactory.getFileType(schemaPath))) {
+          if (provider.equalsIgnoreCase("'carbonfile'")) {
+            SchemaReader.inferSchema(identifier, true)
+          } else {
+            isTransactionalTable = false
+            SchemaReader.inferSchema(identifier, false)
+          }
+        }
+        else {
+          SchemaReader.getTableInfo(identifier)
+        }
+      }
+      catch {
+        case e: Throwable =>
+          operationNotAllowed(s"Invalid table path provided: ${ tablePath.get 
} ", tableHeader)
+      }
+      // set "_external" property, so that DROP TABLE will not delete the data
+      if (provider.equalsIgnoreCase("'carbonfile'")) {
+        table.getFactTable.getTableProperties.put("_filelevelformat", "true")
+        table.getFactTable.getTableProperties.put("_external", "false")
+      } else {
+        table.getFactTable.getTableProperties.put("_external", "true")
+        table.getFactTable.getTableProperties.put("_filelevelformat", "false")
+      }
+      var isLocalDic_enabled = table.getFactTable.getTableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+      if (null == isLocalDic_enabled) {
+        table.getFactTable.getTableProperties
+          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+      }
+      isLocalDic_enabled = table.getFactTable.getTableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+      if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
+          isLocalDic_enabled.toBoolean) {
+        val allcolumns = table.getFactTable.getListOfColumns
+        for (i <- 0 until allcolumns.size()) {
+          val cols = allcolumns.get(i)
+          if (cols.getDataType == DataTypes.STRING || cols.getDataType == 
DataTypes.VARCHAR) {
+            cols.setLocalDictColumn(true)
+          }
+          allcolumns.set(i, cols)
+        }
+        table.getFactTable.setListOfColumns(allcolumns)
+      }
+      table
+    } else {
+      // prepare table model of the collected tokens
+      val tableModel: TableModel = parser.prepareTableModel(
+        ifNotExists,
+        convertDbNameToLowerCase(tableIdentifier.database),
+        tableIdentifier.table.toLowerCase,
+        fields,
+        partitionFields,
+        tableProperties,
+        bucketFields,
+        isAlterFlow = false,
+        false,
+        tableComment)
+      TableNewProcessor(tableModel)
+    }
+    tableInfo.setTransactionalTable(isTransactionalTable)
+    selectQuery match {
+      case query@Some(q) =>
+        CarbonCreateTableAsSelectCommand(
+          tableInfo = tableInfo,
+          query = query.get,
+          ifNotExistsSet = ifNotExists,
+          tableLocation = tablePath)
+      case _ =>
+        CarbonCreateTableCommand(
+          tableInfo = tableInfo,
+          ifNotExistsSet = ifNotExists,
+          tableLocation = tablePath,
+          external)
+    }
+  }
+
+  /**
+   * This method will convert the database name to lower case
+   *
+   * @param dbName database name.
+   * @return Option of String
+   */
+  def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
+    dbName match {
+      case Some(databaseName) => Some(databaseName.toLowerCase)
+      case None => dbName
+    }
+  }
+
+  /**
+   * Validates the partition columns and return's A tuple of partition columns 
and partitioner
+   * fields.
+   *
+   * @param partitionColumns        An instance of ColTypeListContext having 
parser rules for
+   *                                column.
+   * @param colNames                <Seq[String]> Sequence of Table column 
names.
+   * @param tableProperties         <Map[String, String]> Table property map.
+   * @param partitionByStructFields Seq[StructField] Sequence of partition 
fields.
+   * @return <Seq[PartitionerField]> A Seq of partitioner fields.
+   */
+  def validatePartitionFields(
+      partitionColumns: ColTypeListContext,
+      colNames: Seq[String],
+      tableProperties: mutable.Map[String, String],
+      partitionByStructFields: Seq[StructField]): Seq[PartitionerField] = {
+
+    val partitionerFields = partitionByStructFields.map { structField =>
+      PartitionerField(structField.name, Some(structField.dataType.toString), 
null)
+    }
+    // validate partition clause
+    if (partitionerFields.nonEmpty) {
+      if (!CommonUtil.validatePartitionColumns(tableProperties, 
partitionerFields)) {
+        throw new MalformedCarbonCommandException("Error: Invalid partition 
definition")
+      }
+      // partition columns should not be part of the schema
+      val badPartCols = 
partitionerFields.map(_.partitionColumn.toLowerCase).toSet
+        .intersect(colNames.map(_.toLowerCase).toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns should not be specified in the 
schema: " +
+                            badPartCols.map("\"" + _ + "\"").mkString("[", 
",", "]")
+          , partitionColumns: ColTypeListContext)
+      }
+    }
+    partitionerFields
+  }
+
+  /**
+   * Parse a key-value map from a [[TablePropertyListContext]], assuming all 
values are specified.
+   *
+   * @param ctx   Instance of TablePropertyListContext defining parser rule 
for the table
+   *              properties.
+   * @param props <Map[String, String]> Map of table property list
+   * @return <Map[String, String]> Map of transformed table property.
+   */
+  def visitPropertyKeyValues(ctx: TablePropertyListContext,
+      props: Map[String, String]): Map[String, String] = {
+    val badKeys = props.filter { case (_, v) => v == null }.keys
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", 
"]") }", ctx)
+    }
+    props.map { case (key, value) =>
+      if (needToConvertToLowerCase(key)) {
+        (key.toLowerCase, value.toLowerCase)
+      } else {
+        (key.toLowerCase, value)
+      }
+    }
+  }
+
+  /**
+   * check's whether need to convert to lower case
+   *
+   * @param key <String> property key
+   * @return returns <true> if lower case conversion is needed else <false>
+   */
+  def needToConvertToLowerCase(key: String): Boolean = {
+    val noConvertList = Array("LIST_INFO", "RANGE_INFO", "PATH")
+    !noConvertList.exists(x => x.equalsIgnoreCase(key))
+  }
+
+  /**
+   * The method validate the create table command and returns the table's 
columns.
+   *
+   * @param tableHeader       An instance of CreateTableHeaderContext having 
parser rules for
+   *                          create table.
+   * @param skewSpecContext   An instance of SkewSpecContext having parser 
rules for create table.
+   * @param bucketSpecContext An instance of BucketSpecContext having parser 
rules for create table.
+   * @param columns           An instance of ColTypeListContext having parser 
rules for columns
+   *                          of the table.
+   * @param cols              Table;s columns.
+   * @param tableIdentifier   Instance of table identifier.
+   * @param isTempTable       Flag to identify temp table.
+   * @return Table's column names <Seq[String]>.
+   */
+  def validateCreateTableReqAndGetColumns(tableHeader: 
CreateTableHeaderContext,
+      skewSpecContext: SkewSpecContext,
+      bucketSpecContext: BucketSpecContext,
+      columns: ColTypeListContext,
+      cols: Seq[StructField],
+      tableIdentifier: TableIdentifier,
+      isTempTable: Boolean): Seq[String] = {
+    // TODO: implement temporary tables
+    if (isTempTable) {
+      throw new ParseException(
+        "CREATE TEMPORARY TABLE is not supported yet. " +
+        "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
+    }
+    if (skewSpecContext != null) {
+      operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
+    }
+    if (bucketSpecContext != null) {
+      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
+    }
+
+    // Ensuring whether no duplicate name is used in table definition
+    val colNames: Seq[String] = cols.map(_.name)
+    if (colNames.length != colNames.distinct.length) {
+      val duplicateColumns = colNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }
+      operationNotAllowed(s"Duplicated column names found in table definition 
of " +
+                          s"$tableIdentifier: ${ 
duplicateColumns.mkString("[", ",", "]") }",
+        columns)
+    }
+    colNames
+  }
+  /**
+   * The method return's the storage type
+   * @param createFileFormat
+   * @return
+   */
+  def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
+    Option(createFileFormat) match {
+      case Some(value) =>
+        val result = value.children.get(1).getText
+        if (result.equalsIgnoreCase("by")) {
+          value.storageHandler().STRING().getSymbol.getText
+        } else if (result.equalsIgnoreCase("as") && value.children.size() > 1) 
{
+          value.children.get(2).getText
+        } else {
+          // The case of "STORED AS PARQUET/ORC"
+          ""
+        }
+      case _ => ""
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 2c98ec2..759539b 100644
--- 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStr
 import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, 
CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, 
CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, 
CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
ExperimentalMethods, SparkSession, Strategy}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -376,7 +376,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: 
CarbonSpark2SqlParser, sparkSes
   val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+    val fileStorage = 
CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
         fileStorage.equalsIgnoreCase("carbondata") ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5aada46e/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index a533db0..b8bf56d 100644
--- 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -27,7 +27,7 @@ import 
org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterT
 import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
 CarbonAlterTableDataTypeChangeCommand}
 import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, 
CarbonShowTablesCommand}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, 
CarbonSpark2SqlParser}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, 
CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
 import org.apache.spark.sql.types.DecimalType
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -40,7 +40,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: 
CarbonSpark2SqlParser, sparkSes
   val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan 
= {
-    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+    val fileStorage = 
CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
         fileStorage.equalsIgnoreCase("carbondata") ||

Reply via email to