Github user Indhumathi27 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185494171
--- Diff:
integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
---
@@ -0,0 +1,170 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{
+ CatalogRelation, CatalogTable, CatalogTableType,
+ SimpleCatalogRelation
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{
+ AlterTableRecoverPartitionsCommand, DDLUtils,
+ RunnableCommand
+}
+import org.apache.spark.sql.execution.datasources.{DataSource,
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+ table: CatalogTable,
+ mode: SaveMode,
+ query: LogicalPlan)
+ extends RunnableCommand {
+
+ override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ assert(table.tableType != CatalogTableType.VIEW)
+ assert(table.provider.isDefined)
+ assert(table.schema.isEmpty)
+
+ val provider = table.provider.get
+ val sessionState = sparkSession.sessionState
+ val db =
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = table.identifier.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
+
+ var createMetastoreTable = false
+ var existingSchema = Option.empty[StructType]
+ if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+ // Check if we need to throw an exception or just return.
+ mode match {
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(s"Table $tableName already exists. "
+
+ s"If you are using saveAsTable, you
can set SaveMode to " +
+ s"SaveMode.Append to " +
+ s"insert data into the table or set
SaveMode to SaveMode" +
+ s".Overwrite to overwrite" +
+ s"the existing data. " +
+ s"Or, if you are using SQL CREATE
TABLE, you need to drop " +
+ s"$tableName first.")
+ case SaveMode.Ignore =>
+ // Since the table already exists and the save mode is Ignore,
we will just return.
+ return Seq.empty[Row]
+ case SaveMode.Append =>
+ // Check if the specified data source match the data source of
the existing table.
+ val existingProvider = DataSource.lookupDataSource(provider)
+ // TODO: Check that options from the resolved relation match the
relation that we are
+ // inserting into (i.e. using the same compression).
+
+ // Pass a table identifier with database part, so that
`lookupRelation` won't get temp
+ // views unexpectedly.
+
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
match {
+ case l@LogicalRelation(_: InsertableRelation | _:
HadoopFsRelation, _, _) =>
+ // check if the file formats match
+ l.relation match {
+ case r: HadoopFsRelation if r.fileFormat.getClass !=
existingProvider =>
+ throw new AnalysisException(
+ s"The file format of the existing table $tableName is
" +
+ s"`${ r.fileFormat.getClass.getName }`. It doesn't
match the specified " +
+ s"format `$provider`")
+ case _ =>
+ }
+ if (query.schema.size != l.schema.size) {
+ throw new AnalysisException(
+ s"The column number of the existing schema[${ l.schema
}] " +
+ s"doesn't match the data schema[${ query.schema }]'s")
+ }
+ existingSchema = Some(l.schema)
+ case s: SimpleCatalogRelation if
DDLUtils.isDatasourceTable(s.metadata) =>
+ existingSchema = Some(s.metadata.schema)
+ case c: CatalogRelation if c.catalogTable.provider ==
Some(DDLUtils.HIVE_PROVIDER) =>
+ throw new AnalysisException("Saving data in the Hive serde
table " +
+ s"${
+ c.catalogTable
--- End diff --
okay
---