Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2466#discussion_r202576876
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 ---
    @@ -169,220 +128,45 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
           provider) = createTableTuple
     
         val (tableIdentifier, temp, ifNotExists, external) = 
visitCreateTableHeader(tableHeader)
    -
    -    // TODO: implement temporary tables
    -    if (temp) {
    -      throw new ParseException(
    -        "CREATE TEMPORARY TABLE is not supported yet. " +
    -        "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
    -    }
    -    if (skewSpecContext != null) {
    -      operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
    -    }
    -    if (bucketSpecContext != null) {
    -      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", 
bucketSpecContext)
    -    }
    -
    -    val cols = Option(columns).toSeq.flatMap(visitColTypeList)
    -    val properties = getPropertyKeyValues(tablePropertyList)
    -
    -    // Ensuring whether no duplicate name is used in table definition
    -    val colNames = cols.map(_.name)
    -    if (colNames.length != colNames.distinct.length) {
    -      val duplicateColumns = colNames.groupBy(identity).collect {
    -        case (x, ys) if ys.length > 1 => "\"" + x + "\""
    -      }
    -      operationNotAllowed(s"Duplicated column names found in table 
definition of " +
    -                          s"$tableIdentifier: 
${duplicateColumns.mkString("[", ",", "]")}", columns)
    -    }
    -
    -    val tablePath = if (locationSpecContext != null) {
    +    val cols: Seq[StructField] = 
Option(columns).toSeq.flatMap(visitColTypeList)
    +    val colNames: Seq[String] = CarbonSparkSqlParserUtil
    +      .validateCreateTableReqAndGetColumns(tableHeader,
    +        skewSpecContext,
    +        bucketSpecContext,
    +        columns,
    +        cols,
    +        tableIdentifier,
    +        temp)
    +    val tablePath: Option[String] = if (locationSpecContext != null) {
           Some(visitLocationSpec(locationSpecContext))
         } else {
           None
         }
     
         val tableProperties = mutable.Map[String, String]()
    +    val properties: Map[String, String] = 
getPropertyKeyValues(tablePropertyList)
         properties.foreach{property => tableProperties.put(property._1, 
property._2)}
     
         // validate partition clause
         val (partitionByStructFields, partitionFields) =
           validatePartitionFields(partitionColumns, colNames, tableProperties)
     
    -    // validate partition clause
    -    if (partitionFields.nonEmpty) {
    -      if (!CommonUtil.validatePartitionColumns(tableProperties, 
partitionFields)) {
    -         throw new MalformedCarbonCommandException("Error: Invalid 
partition definition")
    -      }
    -      // partition columns should not be part of the schema
    -      val badPartCols = partitionFields
    -        .map(_.partitionColumn.toLowerCase)
    -        .toSet
    -        .intersect(colNames.map(_.toLowerCase).toSet)
    -
    -      if (badPartCols.nonEmpty) {
    -        operationNotAllowed(s"Partition columns should not be specified in 
the schema: " +
    -                            badPartCols.map("\"" + _ + "\"").mkString("[", 
",", "]"),
    -          partitionColumns)
    -      }
    -    }
    -
    -    val options = new CarbonOption(properties)
    -    // validate streaming property
    -    validateStreamingProperty(options)
    -    var fields = parser.getFields(cols ++ partitionByStructFields)
         // validate for create table as select
         val selectQuery = Option(query).map(plan)
    -    selectQuery match {
    -      case Some(q) =>
    -        // create table as select does not allow creation of partitioned 
table
    -        if (partitionFields.nonEmpty) {
    -          val errorMessage = "A Create Table As Select (CTAS) statement is 
not allowed to " +
    -                             "create a partitioned table using Carbondata 
file formats."
    -          operationNotAllowed(errorMessage, partitionColumns)
    -        }
    -        // create table as select does not allow to explicitly specify 
schema
    -        if (fields.nonEmpty) {
    -          operationNotAllowed(
    -            "Schema may not be specified in a Create Table As Select 
(CTAS) statement", columns)
    -        }
    -        // external table is not allow
    -        if (external) {
    -          operationNotAllowed("Create external table as select", 
tableHeader)
    -        }
    -        fields = parser
    -          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
    -            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
    -      case _ =>
    -        // ignore this case
    -    }
    -    if (partitionFields.nonEmpty && options.isStreaming) {
    -      operationNotAllowed("Streaming is not allowed on partitioned table", 
partitionColumns)
    -    }
    -    // validate tblProperties
    -    val bucketFields = parser.getBucketFields(tableProperties, fields, 
options)
    -    var isTransactionalTable : Boolean = true
    -
    -    val tableInfo = if (external) {
    -      // read table info from schema file in the provided table path
    -      // external table also must convert table name to lower case
    -      val identifier = AbsoluteTableIdentifier.from(
    -        tablePath.get,
    -        
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
    -        tableIdentifier.table.toLowerCase())
    -      val table = try {
    -        val schemaPath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
    -        if (!FileFactory.isFileExist(schemaPath, 
FileFactory.getFileType(schemaPath))) {
    -          if (provider.equalsIgnoreCase("'carbonfile'")) {
    -            SchemaReader.inferSchema(identifier, true)
    -          } else {
    -            isTransactionalTable = false
    -            SchemaReader.inferSchema(identifier, false)
    -          }
    -        }
    -        else {
    -          SchemaReader.getTableInfo(identifier)
    -        }
    -      }
    -        catch {
    -        case e: Throwable =>
    -          operationNotAllowed(s"Invalid table path provided: 
${tablePath.get} ", tableHeader)
    -      }
    -      // set "_external" property, so that DROP TABLE will not delete the 
data
    -      if (provider.equalsIgnoreCase("'carbonfile'")) {
    -        table.getFactTable.getTableProperties.put("_filelevelformat", 
"true")
    -        table.getFactTable.getTableProperties.put("_external", "false")
    -      } else {
    -        table.getFactTable.getTableProperties.put("_external", "true")
    -        table.getFactTable.getTableProperties.put("_filelevelformat", 
"false")
    -      }
    -      // setting local dictionary for all string coloumn for external table
    -      var isLocalDic_enabled = table.getFactTable.getTableProperties
    -        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
    -      if (null == isLocalDic_enabled) {
    -        table.getFactTable.getTableProperties
    -          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
    -            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
    -      }
    -      isLocalDic_enabled = table.getFactTable.getTableProperties
    -        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
    -      if 
(CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
    -          isLocalDic_enabled.toBoolean) {
    -        val allcolumns = table.getFactTable.getListOfColumns
    -        for (i <- 0 until allcolumns.size()) {
    -          val cols = allcolumns.get(i)
    -          if (cols.getDataType == DataTypes.STRING || cols.getDataType == 
DataTypes.VARCHAR) {
    -            cols.setLocalDictColumn(true)
    -          }
    -          allcolumns.set(i, cols)
    -        }
    -        table.getFactTable.setListOfColumns(allcolumns)
    -      }
    -
    -      table
    -    } else {
    -      // prepare table model of the collected tokens
    -      val tableModel: TableModel = parser.prepareTableModel(
    -        ifNotExists,
    -        convertDbNameToLowerCase(tableIdentifier.database),
    -        tableIdentifier.table.toLowerCase,
    -        fields,
    -        partitionFields,
    -        tableProperties,
    -        bucketFields,
    -        isAlterFlow = false,
    -        false,
    -        tableComment)
    -      TableNewProcessor(tableModel)
    -    }
    -    tableInfo.setTransactionalTable(isTransactionalTable)
    -    selectQuery match {
    -      case query@Some(q) =>
    -        CarbonCreateTableAsSelectCommand(
    -          tableInfo = tableInfo,
    -          query = query.get,
    -          ifNotExistsSet = ifNotExists,
    -          tableLocation = tablePath)
    -      case _ =>
    -        CarbonCreateTableCommand(
    -          tableInfo = tableInfo,
    -          ifNotExistsSet = ifNotExists,
    -          tableLocation = tablePath,
    -          external)
    -    }
    -  }
    -
    -  private def validateStreamingProperty(carbonOption: CarbonOption): Unit 
= {
    -    try {
    -      carbonOption.isStreaming
    -    } catch {
    -      case _: IllegalArgumentException =>
    -        throw new MalformedCarbonCommandException(
    -          "Table property 'streaming' should be either 'true' or 'false'")
    -    }
    +    val extraTableTuple = (cols, external, tableIdentifier, ifNotExists, 
colNames, tablePath,
    +      tableProperties, properties, partitionByStructFields, 
partitionFields,
    +      parser, sparkSession, selectQuery)
    +    CarbonSparkSqlParserUtil
    +      .createCarbonTable(createTableTuple, extraTableTuple)
       }
     
       private def validatePartitionFields(
    --- End diff --
    
    Now this method has only 1 line of code. So, we can move this code line to 
caller and avoid this method completely. 


---

Reply via email to