Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356072
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.MetadataCommand
+import
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param query the query whose result will be insert into the new relation
+ * @param tableInfo the Table Describe, which may contains serde, storage
handler etc.
+ * @param ifNotExistsSet allow continue working if it's already exists,
otherwise
+ * raise exception
+ * @param tableLocation store location where the table need to be created
+ */
+case class CarbonCreateTableAsSelectCommand(query: LogicalPlan,
+ tableInfo: TableInfo,
+ ifNotExistsSet: Boolean = false,
+ tableLocation: Option[String] = None) extends MetadataCommand {
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = tableInfo.getFactTable.getTableName
+ var databaseOpt: Option[String] = None
+ if (tableInfo.getDatabaseName != null) {
+ databaseOpt = Some(tableInfo.getDatabaseName)
+ }
+ val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+ LOGGER.audit(s"Request received for CTAS for $dbName.$tableName")
+ lazy val carbonDataSourceHadoopRelation = {
+ // execute command to create carbon table
+ CarbonCreateTableCommand(tableInfo, ifNotExistsSet,
tableLocation).run(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .getCarbonDataSourceHadoopRelation(sparkSession,
TableIdentifier(tableName, Option(dbName)))
+ }
+ // check if table already exists
+ if (sparkSession.sessionState.catalog.listTables(dbName)
+ .exists(_.table.equalsIgnoreCase(tableName))) {
+ if (!ifNotExistsSet) {
+ LOGGER.audit(
+ s"Table creation with Database name [$dbName] and Table name
[$tableName] failed. " +
+ s"Table [$tableName] already exists under database [$dbName]")
+ throw new TableAlreadyExistsException(dbName, tableName)
+ }
+ } else {
+ try {
+ // execute command to load data into carbon table
+ CarbonInsertIntoCommand(carbonDataSourceHadoopRelation,
--- End diff --
move parameter to next line
---