Repository: carbondata Updated Branches: refs/heads/master 54eedfe62 -> 59de7cdbd
[CARBONDATA-1884] Add CTAS support to carbondata Implemented CTAS feature in carbondata This closes #1665 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59de7cdb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59de7cdb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59de7cdb Branch: refs/heads/master Commit: 59de7cdbd234d921af1a4f535d240943778a775d Parents: 54eedfe Author: manishgupta88 <[email protected]> Authored: Wed Dec 13 21:40:19 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Tue Dec 19 14:43:57 2017 +0800 ---------------------------------------------------------------------- .../createTable/TestCreateTableAsSelect.scala | 161 +++++++++++++++++++ .../TestDataWithDicExcludeAndInclude.scala | 11 -- .../spark/util/CarbonReflectionUtils.scala | 2 +- .../CarbonCreateTableAsSelectCommand.scala | 113 +++++++++++++ .../spark/sql/hive/CarbonFileMetastore.scala | 44 +++++ .../apache/spark/sql/hive/CarbonMetaStore.scala | 28 +++- .../spark/sql/parser/CarbonSparkSqlParser.scala | 56 +++++-- .../src/main/spark2.1/CarbonSessionState.scala | 26 +-- .../src/main/spark2.2/CarbonSessionState.scala | 26 +-- 9 files changed, 417 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala new file mode 100644 index 0000000..ffe6261 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.Spark2TestQueryExecutor +import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory + +/** + * test functionality for create table as select command + */ +class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll { + + private def createTablesAndInsertData { + // create carbon table and insert data + sql("CREATE TABLE carbon_ctas_test(key INT, value STRING) STORED by 'carbondata'") + sql("insert into carbon_ctas_test select 100,'spark'") + sql("insert into carbon_ctas_test select 200,'hive'") + + // create parquet table and insert data + sql("CREATE TABLE parquet_ctas_test(key INT, value STRING) STORED as parquet") + sql("insert into parquet_ctas_test select 100,'spark'") + sql("insert into parquet_ctas_test select 200,'hive'") + + // create hive table and insert data + sql("CREATE TABLE orc_ctas_test(key INT, value STRING) STORED as ORC") + sql("insert into orc_ctas_test select 100,'spark'") + sql("insert into orc_ctas_test select 200,'hive'") + } + + override def beforeAll { + sql("DROP TABLE IF EXISTS carbon_ctas_test") + sql("DROP TABLE IF EXISTS parquet_ctas_test") + sql("DROP TABLE IF EXISTS orc_ctas_test") + createTablesAndInsertData + } + + test("test create table as select with select from same table name when table exists") { + sql("drop table if exists ctas_same_table_name") + sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'") + intercept[Exception] { + sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name") + } + } + + test("test create table as select with select from same table name when table does not exists") { + sql("drop table if exists ctas_same_table_name") + intercept[Exception] { + sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name") + } + } + + test("test create table as select with select from same table name with if not exists clause") { + sql("drop table if exists ctas_same_table_name") + sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'") + sql("create table if not exists ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name") + assert(true) + } + + test("test create table as select with select from another carbon table") { + sql("DROP TABLE IF EXISTS ctas_select_carbon") + sql("create table ctas_select_carbon stored by 'carbondata' as select * from carbon_ctas_test") + checkAnswer(sql("select * from ctas_select_carbon"), sql("select * from carbon_ctas_test")) + } + + test("test create table as select with select from another parquet table") { + sql("DROP TABLE IF EXISTS ctas_select_parquet") + sql("create table ctas_select_parquet stored by 'carbondata' as select * from parquet_ctas_test") + checkAnswer(sql("select * from ctas_select_parquet"), sql("select * from parquet_ctas_test")) + } + + test("test create table as select with select from another hive/orc table") { + sql("DROP TABLE IF EXISTS ctas_select_orc") + sql("create table ctas_select_orc stored by 'carbondata' as select * from orc_ctas_test") + checkAnswer(sql("select * from ctas_select_orc"), sql("select * from orc_ctas_test")) + } + + test("test create table as select with where clause in select from carbon table that returns data") { + sql("DROP TABLE IF EXISTS ctas_select_where_carbon") + sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=100") + checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=100")) + } + + test( + "test create table as select with where clause in select from carbon table that does not return data") { + sql("DROP TABLE IF EXISTS ctas_select_where_carbon") + sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=300") + checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=300")) + } + + test("test create table as select with where clause in select from carbon table and load again") { + sql("DROP TABLE IF EXISTS ctas_select_where_carbon") + sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=100") + sql("insert into ctas_select_where_carbon select 200,'hive'") + checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test")) + } + + test("test create table as select with where clause in select from parquet table") { + sql("DROP TABLE IF EXISTS ctas_select_where_parquet") + sql("create table ctas_select_where_parquet stored by 'carbondata' as select * from parquet_ctas_test where key=100") + checkAnswer(sql("select * from ctas_select_where_parquet"), sql("select * from parquet_ctas_test where key=100")) + } + + test("test create table as select with where clause in select from hive/orc table") { + sql("DROP TABLE IF EXISTS ctas_select_where_orc") + sql("create table ctas_select_where_orc stored by 'carbondata' as select * from orc_ctas_test where key=100") + checkAnswer(sql("select * from ctas_select_where_orc"), sql("select * from orc_ctas_test where key=100")) + } + + test("test create table as select with select directly having the data") { + sql("DROP TABLE IF EXISTS ctas_select_direct_data") + sql("create table ctas_select_direct_data stored by 'carbondata' as select 300,'carbondata'") + checkAnswer(sql("select * from ctas_select_direct_data"), Seq(Row(300,"carbondata"))) + } + + test("test create table as select with TBLPROPERTIES") { + sql("DROP TABLE IF EXISTS ctas_tblproperties_test") + sql( + "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" + + "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test") + checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test")) + val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore + .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark) + .asInstanceOf[CarbonRelation].carbonTable + val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetaDataFilepath) + assert(metadataFolderPath.exists()) + val dictFiles: Array[CarbonFile] = metadataFolderPath.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(".dict") || file.getName.contains(".sortindex") + } + }) + assert(dictFiles.length == 3) + } + + override def afterAll { + sql("DROP TABLE IF EXISTS carbon_ctas_test") + sql("DROP TABLE IF EXISTS parquet_ctas_test") + sql("DROP TABLE IF EXISTS orc_ctas_test") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala index 484c304..c788857 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala @@ -100,17 +100,6 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with BeforeA ).message.contains("Operation not allowed: CREATE EXTERNAL TABLE")) } - test("test CTAS should fail") { - assert(intercept[AnalysisException]( - sql( - """ - | CREATE TABLE t1 (id string, value int) - | STORED BY 'carbondata' - | AS SELECT 'ABC', 1 FROM t2 - """.stripMargin) - ).message.contains("Operation not allowed: CREATE TABLE AS SELECT")) - } - override def afterAll { dropTable CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index ba51077..19b967a 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -189,7 +189,7 @@ object CarbonReflectionUtils { createObject( "org.apache.spark.sql.hive.CarbonSqlAstBuilder", conf, - sqlParser)._1.asInstanceOf[AstBuilder] + sqlParser, sparkSession)._1.asInstanceOf[AstBuilder] } else { throw new UnsupportedOperationException("Spark version not supported") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala new file mode 100644 index 0000000..26a8f6f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo + +/** + * Create table and insert the query result into it. + * + * @param tableInfo the Table Describe, which may contains serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param ifNotExistsSet allow continue working if it's already exists, otherwise + * raise exception + * @param tableLocation store location where the table need to be created + */ +case class CarbonCreateTableAsSelectCommand( + tableInfo: TableInfo, + query: LogicalPlan, + ifNotExistsSet: Boolean = false, + tableLocation: Option[String] = None) extends AtomicRunnableCommand { + + /** + * variable to be used for insert into command for checking whether the + * table is created newly or already existed + */ + var isTableCreated: Boolean = false + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = tableInfo.getFactTable.getTableName + var databaseOpt: Option[String] = None + if (tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) + } + val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + LOGGER.audit(s"Request received for CTAS for $dbName.$tableName") + // check if table already exists + if (sparkSession.sessionState.catalog.listTables(dbName) + .exists(_.table.equalsIgnoreCase(tableName))) { + if (!ifNotExistsSet) { + LOGGER.audit( + s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " + + s"Table [$tableName] already exists under database [$dbName]") + throw new TableAlreadyExistsException(dbName, tableName) + } + } else { + // execute command to create carbon table + CarbonCreateTableCommand(tableInfo, ifNotExistsSet, tableLocation).run(sparkSession) + isTableCreated = true + } + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (isTableCreated) { + val tableName = tableInfo.getFactTable.getTableName + var databaseOpt: Option[String] = None + if (tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) + } + val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val carbonDataSourceHadoopRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .createCarbonDataSourceHadoopRelation(sparkSession, + TableIdentifier(tableName, Option(dbName))) + // execute command to load data into carbon table + CarbonInsertIntoCommand( + carbonDataSourceHadoopRelation, + query, + overwrite = false, + partition = Map.empty).run(sparkSession) + LOGGER.audit(s"CTAS operation completed successfully for $dbName.$tableName") + } + Seq.empty + } + + override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { + val tableName = tableInfo.getFactTable.getTableName + var databaseOpt: Option[String] = None + if (tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) + } + val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + // drop the created table. + CarbonDropTableCommand( + ifExistsSet = false, + Option(dbName), tableName).run(sparkSession) + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index f7a1eed..ba222e2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import java.net.URI import java.util.UUID import java.util.concurrent.atomic.AtomicLong @@ -528,4 +529,47 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableMetadataFile = tablePath.getSchemaFilePath CarbonUtil.readSchemaFile(tableMetadataFile) } + + override def createCarbonDataSourceHadoopRelation( + sparkSession: SparkSession, + tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation = { + val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) + relation match { + case SubqueryAlias(_, + LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => + carbonDataSourceHadoopRelation + case LogicalRelation( + carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + carbonDataSourceHadoopRelation + case SubqueryAlias(_, c) + if SPARK_VERSION.startsWith("2.2") && + (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || + c.getClass.getName + .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || + c.getClass.getName.equals( + "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => + val catalogTable = + CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable] + catalogTable.provider match { + case Some(name) if name.equals("org.apache.spark.sql.CarbonSource") => name + case _ => + throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table) + } + val tableLocation = catalogTable.storage.locationUri match { + case tableLoc@Some(uri) => + if (tableLoc.get.isInstanceOf[String]) { + FileFactory.getUpdatedFilePath(tableLoc.get.asInstanceOf[String]) + } else if (tableLoc.get.isInstanceOf[URI]) { + FileFactory.getUpdatedFilePath(tableLoc.get.asInstanceOf[URI].getPath) + } + case None => + CarbonEnv.getTablePath(tableIdentifier.database, tableIdentifier.table)(sparkSession) + } + CarbonDatasourceHadoopRelation(sparkSession, + Array(tableLocation.asInstanceOf[String]), + catalogTable.storage.properties, + Option(catalogTable.schema)) + case _ => throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table) + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 5a8e58b..cc0e6ab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -16,9 +16,10 @@ */ package org.apache.spark.sql.hive -import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame, Dataset, RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} @@ -144,6 +145,31 @@ trait CarbonMetaStore { def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] + + /** + * Method will be used to retrieve or create carbon data source relation + * + * @param sparkSession + * @param tableIdentifier + * @return + */ + def createCarbonDataSourceHadoopRelation( + sparkSession: SparkSession, + tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation + + /** + * Method will be used retrieve the schema from unresolved relation + * + * @param sparkSession + * @param query + * @return + */ + def getSchemaFromUnresolvedRelation( + sparkSession: SparkSession, + query: LogicalPlan): StructType = { + val df: DataFrame = Dataset.ofRows(sparkSession, query) + df.schema + } } /** * Factory for Carbon metastore http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 79095ca..3597208 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -26,14 +26,12 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor} -import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -78,7 +76,9 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab } } -class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) +class CarbonHelperSqlAstBuilder(conf: SQLConf, + parser: CarbonSpark2SqlParser, + sparkSession: SparkSession) extends SparkSqlAstBuilder(conf) { def getFileStorage(createFileFormat: CreateFileFormatContext): String = { @@ -147,7 +147,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) tablePropertyList : TablePropertyListContext, locationSpecContext: SqlBaseParser.LocationSpecContext, tableComment : Option[String], - ctas: TerminalNode) : LogicalPlan = { + ctas: TerminalNode, + query: QueryContext) : LogicalPlan = { // val parser = new CarbonSpark2SqlParser val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader) @@ -166,9 +167,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) if (external) { operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader) } - if (ctas != null && columns != null) { - operationNotAllowed("CREATE TABLE AS SELECT", tableHeader) - } val cols = Option(columns).toSeq.flatMap(visitColTypeList) val properties = getPropertyKeyValues(tablePropertyList) @@ -210,13 +208,33 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) } } - val fields = parser.getFields(cols ++ partitionByStructFields) val options = new CarbonOption(properties) + // validate streaming property + validateStreamingProperty(options) + var fields = parser.getFields(cols ++ partitionByStructFields) + // validate for create table as select + val selectQuery = Option(query).map(plan) + selectQuery match { + case Some(q) => + // create table as select does not allow creation of partitioned table + if (partitionFields.nonEmpty) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Carbondata file formats." + operationNotAllowed(errorMessage, partitionColumns) + } + // create table as select does not allow to explicitly specify schema + if (fields.nonEmpty) { + operationNotAllowed( + "Schema may not be specified in a Create Table As Select (CTAS) statement", columns) + } + fields = parser + .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore + .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get)) + case _ => + // ignore this case + } // validate tblProperties val bucketFields = parser.getBucketFields(tableProperties, fields, options) - - validateStreamingProperty(options) - // prepare table model of the collected tokens val tableModel: TableModel = parser.prepareTableModel( ifNotExists, @@ -228,7 +246,19 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) bucketFields, isAlterFlow = false, tableComment) - CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, tablePath) + + selectQuery match { + case query@Some(q) => + CarbonCreateTableAsSelectCommand( + TableNewProcessor(tableModel), + query.get, + tableModel.ifNotExistsSet, + tablePath) + case _ => + CarbonCreateTableCommand(TableNewProcessor(tableModel), + tableModel.ifNotExistsSet, + tablePath) + } } private def validateStreamingProperty(carbonOption: CarbonOption): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index c198613..b55a6aa 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -259,25 +259,27 @@ object CarbonOptimizerUtil { } } -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends - SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { - val helper = new CarbonHelperSqlAstBuilder(conf, parser) + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - helper.createCarbonTable(ctx.createTableHeader, - ctx.skewSpec, - ctx.bucketSpec, - ctx.partitionColumns, - ctx.columns, - ctx.tablePropertyList, - ctx.locationSpec, - Option(ctx.STRING()).map(string), - ctx.AS) + helper.createCarbonTable( + tableHeader = ctx.createTableHeader, + skewSpecContext = ctx.skewSpec, + bucketSpecContext = ctx.bucketSpec, + partitionColumns = ctx.partitionColumns, + columns = ctx.columns, + tablePropertyList = ctx.tablePropertyList, + locationSpecContext = ctx.locationSpec(), + tableComment = Option(ctx.STRING()).map(string), + ctas = ctx.AS, + query = ctx.query) } else { super.visitCreateTable(ctx) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index 8acddfa..c951e5e 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -280,25 +280,27 @@ class CarbonOptimizer( } } -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends - SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { - val helper = new CarbonHelperSqlAstBuilder(conf, parser) + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - helper.createCarbonTable(ctx.createTableHeader, - ctx.skewSpec, - ctx.bucketSpec, - ctx.partitionColumns, - ctx.columns, - ctx.tablePropertyList, - ctx.locationSpec(), - Option(ctx.STRING()).map(string), - ctx.AS) + helper.createCarbonTable( + tableHeader = ctx.createTableHeader, + skewSpecContext = ctx.skewSpec, + bucketSpecContext = ctx.bucketSpec, + partitionColumns = ctx.partitionColumns, + columns = ctx.columns, + tablePropertyList = ctx.tablePropertyList, + locationSpecContext = ctx.locationSpec(), + tableComment = Option(ctx.STRING()).map(string), + ctas = ctx.AS, + query = ctx.query) } else { super.visitCreateHiveTable(ctx) }
