This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 21bbc4a5306ff850fcf14b488e4cb0452415213c Author: Indhumathi27 <[email protected]> AuthorDate: Mon Sep 16 16:25:03 2019 +0530 [CARBONDATA-3520] CTAS should fail if select query contains duplicate columns Problem: If Select query contains Duplicate columns, CTAS was creating a table with only one column, which is wrong Solution: Throw error message if Select query contains duplicate columns. This closes #3388 --- .../createTable/TestCreateTableAsSelect.scala | 37 ++++++++++++++++++++++ .../sql/parser/CarbonSparkSqlParserUtil.scala | 23 +++++++++++--- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala index 3896061..8e4d8fa 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala @@ -407,6 +407,43 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 24.5))) } + test("test duplicate columns with select query") { + sql("DROP TABLE IF EXISTS target_table") + sql("DROP TABLE IF EXISTS source_table") + // create carbon table and insert data + sql( + """ + | CREATE TABLE source_table( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'carbondata' + | """.stripMargin) + sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27") + val e = intercept[AnalysisException] { + sql( + """ + | CREATE TABLE target_table + | STORED BY 'carbondata' + | AS + | SELECT t1.city, t2.city + | FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen' + """.stripMargin) + } + e.getMessage().toString.contains("Duplicated column names found in table definition of " + + "`target_table`: [\"city\"]") + sql( + """ + | CREATE TABLE target_table + | STORED BY 'carbondata' + | AS + | SELECT t1.city as a, t2.city as b + | FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen' + """.stripMargin) + checkAnswer(sql("select * from target_table"), Seq(Row("shenzhen", "shenzhen"))) + } + override def afterAll { sql("DROP TABLE IF EXISTS carbon_ctas_test") sql("DROP TABLE IF EXISTS parquet_ctas_test") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala index 5c008f2..4d85e88 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala @@ -119,6 +119,8 @@ object CarbonSparkSqlParserUtil { case _ => // ignore this case } + val columnNames = fields.map(_.name.get) + checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames) if (partitionFields.nonEmpty && options.isStreaming) { operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns) } @@ -355,16 +357,29 @@ object CarbonSparkSqlParserUtil { // Ensuring whether no duplicate name is used in table definition val colNames: Seq[String] = cols.map(_.name) + checkIfDuplicateColumnExists(columns, tableIdentifier, colNames) + colNames + } + + private def checkIfDuplicateColumnExists(columns: ColTypeListContext, + tableIdentifier: TableIdentifier, + colNames: Seq[String]): Unit = { if (colNames.length != colNames.distinct.length) { val duplicateColumns = colNames.groupBy(identity).collect { case (x, ys) if ys.length > 1 => "\"" + x + "\"" } - operationNotAllowed(s"Duplicated column names found in table definition of " + - s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }", - columns) + val errorMessage = s"Duplicated column names found in table definition of " + + s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }" + // In case of create table as select, ColTypeListContext will be null. Check if + // duplicateColumns present in column names list, If yes, throw exception + if (null != columns) { + operationNotAllowed(errorMessage, columns) + } else { + throw new UnsupportedOperationException(errorMessage) + } } - colNames } + /** * The method return's the storage type * @param createFileFormat
