    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
    +import org.apache.spark.sql.catalyst.catalog.{
    +  CatalogRelation, CatalogTable, CatalogTableType,
    +  SimpleCatalogRelation
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.command.{
    +  AlterTableRecoverPartitionsCommand, DDLUtils,
    +  RunnableCommand
    +import org.apache.spark.sql.execution.datasources.{DataSource, 
HadoopFsRelation, LogicalRelation}
    +import org.apache.spark.sql.sources.InsertableRelation
    +import org.apache.spark.sql.types.StructType
    + * Create table 'using carbondata' and insert the query result into it.
    + *
    + * @param table the Catalog Table
    + * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
    + * @param query the query whose result will be insert into the new relation
    + *
    + */
    +case class CreateCarbonSourceTableAsSelectCommand(
    +    table: CatalogTable,
    +    mode: SaveMode,
    +    query: LogicalPlan)
    +  extends RunnableCommand {
    +  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
    +  override def run(sparkSession: SparkSession): Seq[Row] = {
    +    assert(table.tableType != CatalogTableType.VIEW)
    +    assert(table.provider.isDefined)
    +    assert(table.schema.isEmpty)
    +    val provider = table.provider.get
    +    val sessionState = sparkSession.sessionState
    +    val db = 
    +    val tableIdentWithDB = table.identifier.copy(database = Some(db))
    +    val tableName = tableIdentWithDB.unquotedString
    +    var createMetastoreTable = false
    +    var existingSchema = Option.empty[StructType]
    +    if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
    +      // Check if we need to throw an exception or just return.
    +      mode match {
    +        case SaveMode.ErrorIfExists =>
    +          throw new AnalysisException(s"Table $tableName already exists. " 
    +                                      s"If you are using saveAsTable, you 
can set SaveMode to " +
    +                                      s"SaveMode.Append to " +
    +                                      s"insert data into the table or set 
SaveMode to SaveMode" +
    +                                      s".Overwrite to overwrite" +
    +                                      s"the existing data. " +
    +                                      s"Or, if you are using SQL CREATE 
TABLE, you need to drop " +
    +                                      s"$tableName first.")
    +        case SaveMode.Ignore =>
    +          // Since the table already exists and the save mode is Ignore, 
we will just return.
    +          return Seq.empty[Row]
    +        case SaveMode.Append =>
    +          // Check if the specified data source match the data source of 
the existing table.
    +          val existingProvider = DataSource.lookupDataSource(provider)
    +          // TODO: Check that options from the resolved relation match the 
relation that we are
    +          // inserting into (i.e. using the same compression).
    +          // Pass a table identifier with database part, so that 
`lookupRelation` won't get temp
    +          // views unexpectedly.
match {
    +            case l@LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation, _, _) =>
    +              // check if the file formats match
    +              l.relation match {
    +                case r: HadoopFsRelation if r.fileFormat.getClass != 
existingProvider =>
    +                  throw new AnalysisException(
    +                    s"The file format of the existing table $tableName is 
" +
    +                    s"`${ r.fileFormat.getClass.getName }`. It doesn't 
match the specified " +
    +                    s"format `$provider`")
    +                case _ =>
    +              }
    +              if (query.schema.size != l.schema.size) {
    +                throw new AnalysisException(
    +                  s"The column number of the existing schema[${ l.schema 
}] " +
    +                  s"doesn't match the data schema[${ query.schema }]'s")
    +              }
    +              existingSchema = Some(l.schema)
    +            case s: SimpleCatalogRelation if 
DDLUtils.isDatasourceTable(s.metadata) =>
    +              existingSchema = Some(s.metadata.schema)
    +            case c: CatalogRelation if c.catalogTable.provider == 
    +              throw new AnalysisException("Saving data in the Hive serde 
table " +
    +                                          s"${
    +                                            c.catalogTable
    please keep {} in one line


