This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 02e7723 [CARBONDATA-4210] Handle 3.1 parsing failures related to
alter complex types
02e7723 is described below
commit 02e77234ebb9c1e18e194c1844aaf2d4a2473dcc
Author: akkio-97 <[email protected]>
AuthorDate: Wed Jun 23 13:19:48 2021 +0530
[CARBONDATA-4210] Handle 3.1 parsing failures related to alter complex types
Why is this PR needed?
For 2.3 and 2.4 parsing of alter commands are done by spark. Which is not
in the case of 3.1.
What changes were proposed in this PR?
So carbon is responsible for the parsing here.
Previously ignored test cases due to this issue are now enabled.
This closes #4162
---
.../TestSIWithComplexArrayType.scala | 254 +++++++-------
.../spark/sql/catalyst/CarbonParserUtil.scala | 47 +++
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 32 +-
.../sql/parser/CarbonSparkSqlParserUtil.scala | 29 +-
.../alterTable/TestAlterTableAddColumns.scala | 8 +-
.../AlterTableColumnRenameTestCase.scala | 387 ++++++++++-----------
6 files changed, 390 insertions(+), 367 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 46908b7..3fe1443 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -46,158 +46,142 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
}
test("Test restructured array<string> and existing string column as index
columns on SI with compaction") {
- // TODO: Support alter chnage column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("drop table if exists complextable")
- sql("create table complextable (id string, country array<string>,
columnName string) stored as carbondata")
- sql("insert into complextable select 1,array('china', 'us'), 'b'")
- sql("insert into complextable select 2,array('pak'), 'v'")
+ sql("drop table if exists complextable")
+ sql("create table complextable (id string, country array<string>,
columnName string) stored as carbondata")
+ sql("insert into complextable select 1,array('china', 'us'), 'b'")
+ sql("insert into complextable select 2,array('pak'), 'v'")
- sql("drop index if exists index_11 on complextable")
- sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
- sql("alter table complextable change newArray arr2 array<string>")
- sql("alter table complextable change columnName name string")
- sql("insert into complextable select 3,array('china'),
'f',array('hello','world')")
- sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis')")
+ sql("drop index if exists index_11 on complextable")
+ sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
+ sql("alter table complextable change newArray arr2 array<string>")
+ sql("alter table complextable change columnName name string")
+ sql("insert into complextable select 3,array('china'),
'f',array('hello','world')")
+ sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis')")
- checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
- Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
- mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
- val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and name='g'")
- val result2 = sql("select * from complextable where arr2[0]='iron' and
name='f'")
- sql("create index index_11 on table complextable(arr2, name) as
'carbondata'")
- sql("alter table complextable compact 'minor'")
- val df1 = sql(" select * from complextable where
array_contains(arr2,'iron') and name='g'")
- val df2 = sql(" select * from complextable where arr2[0]='iron' and
name='f'")
- if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- val doNotHitSIDf = sql(" select * from complextable where
array_contains(arr2,'iron') and array_contains(arr2,'man')")
- if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- checkAnswer(result1, df1)
- checkAnswer(result2, df2)
+ checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
+ Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
+ mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
+ val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and name='g'")
+ val result2 = sql("select * from complextable where arr2[0]='iron' and
name='f'")
+ sql("create index index_11 on table complextable(arr2, name) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df1 = sql(" select * from complextable where
array_contains(arr2,'iron') and name='g'")
+ val df2 = sql(" select * from complextable where arr2[0]='iron' and
name='f'")
+ if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ val doNotHitSIDf = sql(" select * from complextable where
array_contains(arr2,'iron') and array_contains(arr2,'man')")
+ if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
}
+ checkAnswer(result1, df1)
+ checkAnswer(result2, df2)
}
test("Test restructured array<string> and string columns as index columns on
SI with compaction") {
- // TODO: Support alter chnage column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("drop table if exists complextable")
- sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
- sql("insert into complextable select 1,array('china', 'us'), 'b'")
- sql("insert into complextable select 2,array('pak'), 'v'")
+ sql("drop table if exists complextable")
+ sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
+ sql("insert into complextable select 1,array('china', 'us'), 'b'")
+ sql("insert into complextable select 2,array('pak'), 'v'")
- sql("drop index if exists index_11 on complextable")
- sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
- sql("alter table complextable change newArray arr2 array<string>")
- sql("ALTER TABLE complextable ADD COLUMNS(address string)")
- sql("alter table complextable change address addr string")
- sql("insert into complextable select 3,array('china'),
'f',array('hello','world'),'china'")
- sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis'),'India'")
+ sql("drop index if exists index_11 on complextable")
+ sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
+ sql("alter table complextable change newArray arr2 array<string>")
+ sql("ALTER TABLE complextable ADD COLUMNS(address string)")
+ sql("alter table complextable change address addr string")
+ sql("insert into complextable select 3,array('china'),
'f',array('hello','world'),'china'")
+ sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis'),'India'")
- checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
- Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
- mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
- val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and addr='India'")
- val result2 = sql("select * from complextable where arr2[0]='iron' and
addr='china'")
- sql("create index index_11 on table complextable(arr2, addr) as
'carbondata'")
- sql("alter table complextable compact 'minor'")
- val df1 = sql(" select * from complextable where
array_contains(arr2,'iron') and addr='India'")
- val df2 = sql(" select * from complextable where arr2[0]='iron' and
addr='china'")
- if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- val doNotHitSIDf = sql(" select * from complextable where
array_contains(arr2,'iron') and array_contains(arr2,'man')")
- if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- checkAnswer(result1, df1)
- checkAnswer(result2, df2)
+ checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
+ Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
+ mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
+ val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and addr='India'")
+ val result2 = sql("select * from complextable where arr2[0]='iron' and
addr='china'")
+ sql("create index index_11 on table complextable(arr2, addr) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df1 = sql(" select * from complextable where
array_contains(arr2,'iron') and addr='India'")
+ val df2 = sql(" select * from complextable where arr2[0]='iron' and
addr='china'")
+ if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ val doNotHitSIDf = sql(" select * from complextable where
array_contains(arr2,'iron') and array_contains(arr2,'man')")
+ if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
}
+ checkAnswer(result1, df1)
+ checkAnswer(result2, df2)
}
test("test array<string> on secondary index with compaction") {
- // TODO: Support alter chnage column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("create table complextable (id string, columnCountry array<string>,
name string) stored as carbondata")
- sql("insert into complextable select 1,array('china', 'us'), 'b'")
- sql("insert into complextable select 2,array('pak'), 'v'")
- sql("insert into complextable select 3,array('china'), 'f'")
- sql("insert into complextable select 4,array('india'),'g'")
- sql("alter table complextable change columnCountry country
array<string>")
- val result1 = sql(" select * from complextable where
array_contains(country,'china')")
- val result2 = sql(" select * from complextable where country[0]='china'")
- sql("drop index if exists index_1 on complextable")
- sql("create index index_1 on table complextable(country) as
'carbondata'")
- sql("alter table complextable compact 'minor'")
- val df1 = sql(" select * from complextable where
array_contains(country,'china')")
- val df2 = sql(" select * from complextable where country[0]='china'")
- if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- val doNotHitSIDf = sql(" select * from complextable where
array_contains(country,'china') and array_contains(country,'us')")
- if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- checkAnswer(result1, df1)
- checkAnswer(result2, df2)
+ sql("create table complextable (id string, columnCountry array<string>,
name string) stored as carbondata")
+ sql("insert into complextable select 1,array('china', 'us'), 'b'")
+ sql("insert into complextable select 2,array('pak'), 'v'")
+ sql("insert into complextable select 3,array('china'), 'f'")
+ sql("insert into complextable select 4,array('india'),'g'")
+ sql("alter table complextable change columnCountry country array<string>")
+ val result1 = sql(" select * from complextable where
array_contains(country,'china')")
+ val result2 = sql(" select * from complextable where country[0]='china'")
+ sql("drop index if exists index_1 on complextable")
+ sql("create index index_1 on table complextable(country) as 'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df1 = sql(" select * from complextable where
array_contains(country,'china')")
+ val df2 = sql(" select * from complextable where country[0]='china'")
+ if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ val doNotHitSIDf = sql(" select * from complextable where
array_contains(country,'china') and array_contains(country,'us')")
+ if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
}
+ checkAnswer(result1, df1)
+ checkAnswer(result2, df2)
}
test("test array<string> and string as index columns on secondary index with
compaction") {
- // TODO: Support alter chnage column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("create table complextable (id string, columnCountry array<string>,
name string) stored as carbondata")
- sql("insert into complextable select 1, array('china', 'us'), 'b'")
- sql("insert into complextable select 2, array('pak'), 'v'")
- sql("insert into complextable select 3, array('china'), 'f'")
- sql("insert into complextable select 4, array('india'),'g'")
- sql("alter table complextable change columnCountry country
array<string>")
- val result = sql(" select * from complextable where
array_contains(country,'china') and name='f'")
- sql("drop index if exists index_1 on complextable")
- sql("create index index_1 on table complextable(country, name) as
'carbondata'")
- sql("alter table complextable compact 'minor'")
- val df = sql(" select * from complextable where
array_contains(country,'china') and name='f'")
- if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
- assert(false)
- } else {
- assert(true)
- }
- checkAnswer(result, df)
+ sql("create table complextable (id string, columnCountry array<string>,
name string) stored as carbondata")
+ sql("insert into complextable select 1, array('china', 'us'), 'b'")
+ sql("insert into complextable select 2, array('pak'), 'v'")
+ sql("insert into complextable select 3, array('china'), 'f'")
+ sql("insert into complextable select 4, array('india'),'g'")
+ sql("alter table complextable change columnCountry country array<string>")
+ val result = sql(" select * from complextable where
array_contains(country,'china') and name='f'")
+ sql("drop index if exists index_1 on complextable")
+ sql("create index index_1 on table complextable(country, name) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df = sql(" select * from complextable where
array_contains(country,'china') and name='f'")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
}
+ checkAnswer(result, df)
}
test("test load data with array<string> on secondary index") {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index aed63fe..cee16d7 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -1113,6 +1113,53 @@ object CarbonParserUtil {
/**
* This method will parse the given data type and validate against the
allowed data types
*
+ * @param complexField datatype structure(only complex) given by the user in
DDL
+ * @param values values defined when the decimal datatype is given in DDL
+ * @return DataTypeInfo object with datatype, precision and scale
+ */
+ def parseDataType(
+ columnName: String,
+ complexField: Field,
+ values: Option[List[(Int, Int)]]): DataTypeInfo = {
+ val dataTypeName =
DataTypeConverterUtil.convertToCarbonType(complexField).getName
+ val dataTypeInfo = CarbonParserUtil.parseDataType(columnName,
dataTypeName.toLowerCase, values)
+ complexField.dataType match {
+ case Some(CarbonCommonConstants.ARRAY) =>
+ val childField = complexField.children.get(0)
+ val childType = childField.dataType
+ val childName = columnName + CarbonCommonConstants.POINT +
childField.name
+ val childValues = childType match {
+ case d: DecimalType => Some(List((d.precision, d.scale)))
+ case _ => None
+ }
+ val childDatatypeInfo = parseDataType(childName, childField,
childValues)
+ dataTypeInfo.setChildren(List(childDatatypeInfo))
+ case Some(CarbonCommonConstants.STRUCT) =>
+ var childTypeInfoList: List[DataTypeInfo] = null
+ for (childField <- complexField.children.get) {
+ val childType = childField.dataType
+ val childName = columnName + CarbonCommonConstants.POINT +
childField.name.get
+ val childValues = childType match {
+ case d: DecimalType => Some(List((d.precision, d.scale)))
+ case _ => None
+ }
+ val childDatatypeInfo = parseDataType(childName, childField,
childValues)
+ if (childTypeInfoList == null) {
+ childTypeInfoList = List(childDatatypeInfo)
+ } else {
+ childTypeInfoList = childTypeInfoList :+ childDatatypeInfo
+ }
+ }
+ dataTypeInfo.setChildren(childTypeInfoList)
+ case _ =>
+ }
+ // TODO have to handle for map types [CARBONDATA-4199]
+ dataTypeInfo
+ }
+
+ /**
+ * This method will parse the given data type and validate against the
allowed data types
+ *
* @param dataType datatype string given by the user in DDL
* @param values values defined when the decimal datatype is given in DDL
* @return DataTypeInfo object with datatype, precision and scale
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index b3eb214..3a9a2ad 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parser
+import java.util.regex.Pattern
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
@@ -629,14 +631,32 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
commandOptions)
}
-
protected lazy val alterTableColumnRenameAndModifyDataType:
Parser[LogicalPlan] =
- ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~
ident ~
- opt("(" ~> rep1sep(valueOptions, ",") <~ ")") ~ opt(COMMENT ~>
restInput) <~ opt(";") ^^ {
- case dbName ~ table ~ columnName ~ columnNameCopy ~ dataType ~ values ~
- comment if CarbonPlanHelper.isCarbonTable(TableIdentifier(table,
dbName)) =>
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~
+ opt(primitiveTypes) ~ opt(nestedType) ~ opt(COMMENT ~> restInput) <~
opt(";") ^^ {
+ case dbName ~ table ~ columnName ~ newColumnName ~ dataType ~
complexField ~
+ comment if CarbonPlanHelper.isCarbonTable(TableIdentifier(table,
dbName)) &&
+ (complexField.isDefined ^ dataType.isDefined) =>
+ var primitiveType = dataType
+ var newComment: Option[String] = comment
+ var decimalValues = None: Option[List[(Int, Int)]]
+ // if datatype is decimal then extract precision and scale
+ if (!dataType.equals(None) &&
dataType.get.contains(CarbonCommonConstants.DECIMAL)) {
+ val matcher = Pattern.compile("[0-9]+").matcher(dataType.get)
+ val list = collection.mutable.ListBuffer.empty[Int]
+ while ( { matcher.find }) {
+ list += matcher.group.toInt
+ }
+ decimalValues = Some(List((list(0), list(1))))
+ primitiveType = Some(CarbonCommonConstants.DECIMAL)
+ }
+ newComment = if (comment.isDefined) {
+ Some(StringUtils.substringBetween(comment.get, "'", "'"))
+ } else { None }
+
CarbonSparkSqlParserUtil.alterTableColumnRenameAndModifyDataType(
- dbName, table, columnName, columnNameCopy, dataType, values, comment)
+ dbName, table, columnName, newColumnName, primitiveType,
decimalValues, newComment,
+ complexField)
}
protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 609a476..5467276 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -447,22 +447,25 @@ object CarbonSparkSqlParserUtil {
table: String,
columnName: String,
columnNameCopy: String,
- dataType: String,
+ dataType: Option[String],
values: Option[List[(Int, Int)]],
- comment: Option[String]
+ comment: Option[String],
+ complexChild: Option[Field]
): CarbonAlterTableColRenameDataTypeChangeCommand = {
val isColumnRename = !columnName.equalsIgnoreCase(columnNameCopy)
- val alterTableColRenameAndDataTypeChangeModel =
- AlterTableDataTypeChangeModel(
- CarbonParserUtil.parseDataType(columnName, dataType.toLowerCase,
- values),
- CarbonParserUtil.convertDbNameToLowerCase(dbName),
- table.toLowerCase,
- columnName.toLowerCase,
- columnNameCopy.toLowerCase,
- isColumnRename,
- comment)
-
CarbonAlterTableColRenameDataTypeChangeCommand(alterTableColRenameAndDataTypeChangeModel)
+ val dataTypeInfo = if (!dataType.equals(None)) {
+ CarbonParserUtil.parseDataType(columnName, dataType.get.toLowerCase,
values)
+ } else {
+ CarbonParserUtil.parseDataType(columnNameCopy, complexChild.get, values)
+ }
+
CarbonAlterTableColRenameDataTypeChangeCommand(AlterTableDataTypeChangeModel(
+ dataTypeInfo,
+ CarbonParserUtil.convertDbNameToLowerCase(dbName),
+ table.toLowerCase,
+ columnName.toLowerCase,
+ columnNameCopy.toLowerCase,
+ isColumnRename,
+ comment))
}
def alterTableAddColumns(
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index ae3f3cb..f248d72 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -337,9 +337,7 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
assert(c(0)(2) == null)
}
- // TODO: Fix it when complex support for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- ignore("Test alter add for structs enabling local dictionary") {
+ test("Test alter add for structs enabling local dictionary") {
createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "STRUCT")
// For the previous segments the default value for newly added struct
column is null
insertIntoTableForStructType
@@ -349,9 +347,7 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_struct")
}
- // TODO: Fix the below test case when complex support for SPARK 3.1.1 is
added
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- ignore("Test alter add for structs, disabling local dictionary") {
+ test("Test alter add for structs, disabling local dictionary") {
createTableForComplexTypes("LOCAL_DICTIONARY_EXCLUDE", "STRUCT")
// For the previous segments the default value for newly added struct
column is null
insertIntoTableForStructType
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index 4103424..ecabd6c 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -53,119 +53,100 @@ class AlterTableColumnRenameTestCase extends QueryTest
with BeforeAndAfterAll {
}
test("Rename more than one column at a time in one operation") {
- // TODO: Support alter change column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("drop table if exists test_rename")
- sql("CREATE TABLE test_rename (str struct<a:struct<b:int, d:int>," +
- " c:int>) STORED AS carbondata")
- sql("insert into test_rename
values(named_struct('a',named_struct('b',12,'d',12), 'c', 12))")
- sql("alter table test_rename change str str22 struct<a11:struct<b2:int,
d:int>, c:int>")
- sql("insert into test_rename values(named_struct('a11'," +
- "named_struct('b2',24,'d',24), 'c', 24))")
-
- val rows = sql("select str22.a11.b2 from test_rename").collect()
- assert(rows(0).equals(Row(12)) && rows(1).equals(Row(24)))
- // check if old column names are still present
- val ex1 = intercept[AnalysisException] {
- sql("select str from test_rename").show(false)
- }
- assert(ex1.getMessage.contains("cannot resolve '`str`'"))
-
- val ex2 = intercept[AnalysisException] {
- sql("select str.a from test_rename").show(false)
- }
- assert(ex2.getMessage.contains("cannot resolve '`str.a`'"))
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (str struct<a:struct<b:int, d:int>, c:int>)
STORED AS carbondata")
+ sql("insert into test_rename
values(named_struct('a',named_struct('b',12,'d',12), 'c', 12))")
+ sql("alter table test_rename change str str22 struct<a11:struct<b2:int,
d:int>, c:int>")
+ sql("insert into test_rename
values(named_struct('a11',named_struct('b2',24,'d',24), 'c', 24))")
+
+ val rows = sql("select str22.a11.b2 from test_rename").collect()
+ assert(rows(0).equals(Row(12)) && rows(1).equals(Row(24)))
+ // check if old column names are still present
+ val ex1 = intercept[AnalysisException] {
+ sql("select str from test_rename").show(false)
+ }
+ assert(ex1.getMessage.contains("cannot resolve '`str`'"))
- // check un-altered columns
- val rows1 = sql("select str22.c from test_rename").collect()
- val rows2 = sql("select str22.a11.d from test_rename").collect()
- assert(rows1.sameElements(Array(Row(12), Row(24))))
- assert(rows2.sameElements(Array(Row(12), Row(24))))
+ val ex2 = intercept[AnalysisException] {
+ sql("select str.a from test_rename").show(false)
}
+ assert(ex2.getMessage.contains("cannot resolve '`str.a`'"))
+
+ // check un-altered columns
+ val rows1 = sql("select str22.c from test_rename").collect()
+ val rows2 = sql("select str22.a11.d from test_rename").collect()
+ assert(rows1.sameElements(Array(Row(12), Row(24))))
+ assert(rows2.sameElements(Array(Row(12), Row(24))))
}
test("rename complex columns with invalid
structure/duplicate-names/Map-type") {
- // TODO: Support alter change column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("drop table if exists test_rename")
- sql(
- "CREATE TABLE test_rename (str struct<a:int,b:long>, str2
struct<a:int,b:long>, map1 " +
- "map<string, string>, str3 struct<a:int, b:map<string, string>>)
STORED AS carbondata")
-
- val ex1 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str
struct<a:array<int>,b:long>")
- }
- assert(ex1.getMessage
- .contains(
- "column rename operation failed: Altering datatypes of any child
column is" +
- " not supported"))
+ sql("drop table if exists test_rename")
+ sql(
+ "CREATE TABLE test_rename (str struct<a:int,b:long>, str2
struct<a:int,b:long>, map1 " +
+ "map<string, string>, str3 struct<a:int, b:map<string, string>>) STORED
AS carbondata")
- val ex2 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str
struct<a:int,b:long,c:int>")
- }
- assert(ex2.getMessage
- .contains(
- "column rename operation failed: Number of children of old and new
complex columns are " +
- "not the same"))
+ val ex1 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change str str struct<a:array<int>,b:long>")
+ }
+ assert(ex1.getMessage
+ .contains(
+ "column rename operation failed: Altering datatypes of any child
column is not supported"))
- val ex3 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str int")
- }
- assert(ex3.getMessage
- .contains(
- "column rename operation failed: Old and new complex columns are not
compatible " +
- "in structure"))
+ val ex2 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change str str struct<a:int,b:long,c:int>")
+ }
+ assert(ex2.getMessage
+ .contains(
+ "column rename operation failed: Number of children of old and new
complex columns are " +
+ "not the same"))
- val ex4 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str struct<a:int,a:long>")
- }
- assert(ex4.getMessage
- .contains(
- "Column Rename Operation failed. New column name str.a already
exists in table " +
- "test_rename"))
+ val ex3 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change str str int")
+ }
+ assert(ex3.getMessage
+ .contains(
+ "column rename operation failed: Old and new complex columns are not
compatible " +
+ "in structure"))
- val ex5 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str2 struct<a:int,b:long>")
- }
- assert(ex5.getMessage
- .contains(
- "Column Rename Operation failed. New column name str2 already exists
in" +
- " table test_rename"))
+ val ex4 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change str str struct<a:int,a:long>")
+ }
+ assert(ex4.getMessage
+ .contains(
+ "Column Rename Operation failed. New column name str.a already exists
in table " +
+ "test_rename"))
- val ex6 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change map1 map2 map<string,
struct<a:int>>")
- }
- assert(ex6.getMessage
- .contains("rename operation failed: Alter rename is unsupported for
Map datatype column"))
+ val ex5 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change str str2 struct<a:int,b:long>")
+ }
+ assert(ex5.getMessage
+ .contains(
+ "Column Rename Operation failed. New column name str2 already exists
in table test_rename"))
- val ex7 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str3 str33 struct<a:int,
bc:map<string, string>>")
- }
- assert(ex7.getMessage
- .contains(
- "rename operation failed: Cannot alter complex structure that
includes map type column"))
+ val ex6 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change map1 map2 map<string,
struct<a:int>>")
+ }
+ assert(ex6.getMessage
+ .contains("rename operation failed: Alter rename is unsupported for Map
datatype column"))
- val ex8 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str2 str22 struct<>")
- }
- assert(ex8.getMessage
- .contains(
- "rename operation failed: Either the old or the new dimension is
null"))
-
- // ensure all failed rename operations have been reverted to original
state
- val describe = sql("desc table test_rename")
- assert(describe.collect().size == 4)
- assertResult(1)(describe.filter(
- "col_name='str' and data_type = 'struct<a:int,b:bigint>'").count())
- assertResult(1)(describe.filter(
- "col_name='str2' and data_type = 'struct<a:int,b:bigint>'").count())
- assertResult(1)(describe.filter(
- "col_name='map1' and data_type = 'map<string,string>'").count())
- assertResult(1)(describe.filter(
- "col_name='str3' and data_type =
'struct<a:int,b:map<string,string>>'").count())
+ val ex7 = intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change str3 str33 struct<a:int,
bc:map<string, string>>")
}
+ assert(ex7.getMessage
+ .contains(
+ "rename operation failed: Cannot alter complex structure that includes
map type column"))
+
+ // ensure all failed rename operations have been reverted to original state
+ val describe = sql("desc table test_rename")
+ assert(describe.collect().size == 4)
+ assertResult(1)(describe.filter(
+ "col_name='str' and data_type = 'struct<a:int,b:bigint>'").count())
+ assertResult(1)(describe.filter(
+ "col_name='str2' and data_type = 'struct<a:int,b:bigint>'").count())
+ assertResult(1)(describe.filter(
+ "col_name='map1' and data_type = 'map<string,string>'").count())
+ assertResult(1)(describe.filter(
+ "col_name='str3' and data_type =
'struct<a:int,b:map<string,string>>'").count())
}
def checkAnswerUtil1(df1: DataFrame, df2: DataFrame, df3: DataFrame) {
@@ -181,75 +162,71 @@ class AlterTableColumnRenameTestCase extends QueryTest
with BeforeAndAfterAll {
}
test("test alter rename struct of (primitive/struct/array)") {
- // TODO: Support alter chnage column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("drop table if exists test_rename")
- sql("CREATE TABLE test_rename (str1 struct<a:int>, str2
struct<a:struct<b:int>>, str3 " +
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (str1 struct<a:int>, str2
struct<a:struct<b:int>>, str3 " +
"struct<a:struct<b:struct<c:int>>>, intfield int) STORED AS
carbondata")
- sql("insert into test_rename values(named_struct('a', 2), " +
+ sql("insert into test_rename values(named_struct('a', 2), " +
"named_struct('a', named_struct('b', 2)), named_struct('a',
named_struct('b', " +
"named_struct('c', 2))), 1)")
- // Operation 1: rename parent column from str2 to str22 and read old rows
- sql("alter table test_rename change str2 str22 struct<a:struct<b:int>>")
- var df1 = sql("select str22 from test_rename")
- var df2 = sql("select str22.a from test_rename")
- var df3 = sql("select str22.a.b from test_rename")
- assert(df1.collect().size == 1 && df2.collect().size == 1 &&
df3.collect().size == 1)
- checkAnswerUtil1(df1, df2, df3)
-
- // Operation 2: rename child column from a to a11
- sql("alter table test_rename change str22 str22
struct<a11:struct<b:int>>")
- df1 = sql("select str22 from test_rename")
- df2 = sql("select str22.a11 from test_rename")
- df3 = sql("select str22.a11.b from test_rename")
- assert(df1.collect().size == 1 && df2.collect().size == 1 &&
df3.collect().size == 1)
- checkAnswerUtil1(df1, df2, df3)
-
- // Operation 3: rename parent column from str22 to str33
- sql("alter table test_rename change str22 str33
struct<a11:struct<b:int>>")
- df1 = sql("select str33 from test_rename")
- df2 = sql("select str33.a11 from test_rename")
- df3 = sql("select str33.a11.b from test_rename")
- assert(df1.collect().size == 1 && df2.collect().size == 1 &&
df3.collect().size == 1)
- checkAnswerUtil1(df1, df2, df3)
-
- // insert new rows
- sql("insert into test_rename values(named_struct('a', 3), " +
+ // Operation 1: rename parent column from str2 to str22 and read old rows
+ sql("alter table test_rename change str2 str22 struct<a:struct<b:int>>")
+ var df1 = sql("select str22 from test_rename")
+ var df2 = sql("select str22.a from test_rename")
+ var df3 = sql("select str22.a.b from test_rename")
+ assert(df1.collect().size == 1 && df2.collect().size == 1 &&
df3.collect().size == 1)
+ checkAnswerUtil1(df1, df2, df3)
+
+ // Operation 2: rename child column from a to a11
+ sql("alter table test_rename change str22 str22 struct<a11:struct<b:int>>")
+ df1 = sql("select str22 from test_rename")
+ df2 = sql("select str22.a11 from test_rename")
+ df3 = sql("select str22.a11.b from test_rename")
+ assert(df1.collect().size == 1 && df2.collect().size == 1 &&
df3.collect().size == 1)
+ checkAnswerUtil1(df1, df2, df3)
+
+ // Operation 3: rename parent column from str22 to str33
+ sql("alter table test_rename change str22 str33 struct<a11:struct<b:int>>")
+ df1 = sql("select str33 from test_rename")
+ df2 = sql("select str33.a11 from test_rename")
+ df3 = sql("select str33.a11.b from test_rename")
+ assert(df1.collect().size == 1 && df2.collect().size == 1 &&
df3.collect().size == 1)
+ checkAnswerUtil1(df1, df2, df3)
+
+ // insert new rows
+ sql("insert into test_rename values(named_struct('a', 3), " +
"named_struct('a', named_struct('b', 3)), named_struct('a',
named_struct('b', " +
"named_struct('c', 3))), 2)")
- df1 = sql("select str33 from test_rename")
- df2 = sql("select str33.a11 from test_rename")
- df3 = sql("select str33.a11.b from test_rename")
- assert(df1.collect().size == 2 && df2.collect().size == 2 &&
df3.collect().size == 2)
- checkAnswerUtil2(df1, df2, df3)
-
- // Operation 4: rename child column from a11 to a22 & b to b11
- sql("alter table test_rename change str33 str33
struct<a22:struct<b11:int>>")
- df1 = sql("select str33 from test_rename")
- df2 = sql("select str33.a22 from test_rename")
- df3 = sql("select str33.a22.b11 from test_rename")
- assert(df1.collect().size == 2 && df2.collect().size == 2 &&
df3.collect().size == 2)
- checkAnswerUtil2(df1, df2, df3)
-
- // Operation 5: rename primitive column from intField to intField2
- sql("alter table test_rename change intField intField2 int")
-
- val describe = sql("desc table test_rename")
- assert(describe.collect().size == 4)
- assertResult(1)(describe.filter(
- "col_name='str1' and data_type = 'struct<a:int>'").count())
- assertResult(1)(describe.filter(
- "col_name='str33' and data_type =
'struct<a22:struct<b11:int>>'").count())
- assertResult(1)(describe.filter(
- "col_name='str3' and data_type =
'struct<a:struct<b:struct<c:int>>>'").count())
-
- // validate schema evolution entries for 4 above alter operations
- val (addedColumns, removedColumns, noOfEvolutions) =
returnValuesAfterSchemaEvolution(
- "test_rename")
- validateSchemaEvolution(addedColumns, removedColumns, noOfEvolutions)
- }
+ df1 = sql("select str33 from test_rename")
+ df2 = sql("select str33.a11 from test_rename")
+ df3 = sql("select str33.a11.b from test_rename")
+ assert(df1.collect().size == 2 && df2.collect().size == 2 &&
df3.collect().size == 2)
+ checkAnswerUtil2(df1, df2, df3)
+
+ // Operation 4: rename child column from a11 to a22 & b to b11
+ sql("alter table test_rename change str33 str33
struct<a22:struct<b11:int>>")
+ df1 = sql("select str33 from test_rename")
+ df2 = sql("select str33.a22 from test_rename")
+ df3 = sql("select str33.a22.b11 from test_rename")
+ assert(df1.collect().size == 2 && df2.collect().size == 2 &&
df3.collect().size == 2)
+ checkAnswerUtil2(df1, df2, df3)
+
+ // Operation 5: rename primitive column from intField to intField2
+ sql("alter table test_rename change intField intField2 int")
+
+ val describe = sql("desc table test_rename")
+ assert(describe.collect().size == 4)
+ assertResult(1)(describe.filter(
+ "col_name='str1' and data_type = 'struct<a:int>'").count())
+ assertResult(1)(describe.filter(
+ "col_name='str33' and data_type =
'struct<a22:struct<b11:int>>'").count())
+ assertResult(1)(describe.filter(
+ "col_name='str3' and data_type =
'struct<a:struct<b:struct<c:int>>>'").count())
+
+ // validate schema evolution entries for 4 above alter operations
+ val (addedColumns, removedColumns, noOfEvolutions) =
returnValuesAfterSchemaEvolution(
+ "test_rename")
+ validateSchemaEvolution(addedColumns, removedColumns, noOfEvolutions)
}
def returnValuesAfterSchemaEvolution(tableName: String): (Seq[ColumnSchema],
Seq[ColumnSchema],
@@ -291,50 +268,46 @@ class AlterTableColumnRenameTestCase extends QueryTest
with BeforeAndAfterAll {
}
test("test alter rename array of (primitive/array/struct)") {
- // TODO: Support alter chnage column for complex type for SPARK 3.1.1
- // REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
- if (!sqlContext.sparkContext.version.startsWith("3.1")) {
- sql("drop table if exists test_rename")
- sql(
- "CREATE TABLE test_rename (arr1 array<int>, arr2 array<array<int>>,
arr3 array<string>, " +
- "arr4 array<struct<a:int>>) STORED AS carbondata")
- sql(
- "insert into test_rename values (array(1,2,3),
array(array(1,2),array(3,4))," +
- " array('hello','world'), array(named_struct('a',45)))")
-
- sql("alter table test_rename change arr1 arr11 array<int>")
- val df1 = sql("select arr11 from test_rename")
- assert(df1.collect.size == 1)
- checkAnswer(df1, Seq(Row(make(Array(1, 2, 3)))))
-
- sql("alter table test_rename change arr2 arr22 array<array<int>>")
- val df2 = sql("select arr22 from test_rename")
- assert(df2.collect.size == 1)
- checkAnswer(df2, Seq(Row(make(Array(make(Array(1, 2)), make(Array(3,
4)))))))
-
- sql("alter table test_rename change arr3 arr33 array<string>")
- val df3 = sql("select arr33 from test_rename")
- assert(df3.collect.size == 1)
- checkAnswer(sql("select arr33 from test_rename"),
Seq(Row(make(Array("hello", "world")))))
-
- sql("alter table test_rename change arr4 arr44 array<struct<a:int>>")
- sql("alter table test_rename change arr44 arr44 array<struct<a11:int>>")
-
- val df4 = sql("select arr44.a11 from test_rename")
- assert(df4.collect.size == 1)
- checkAnswer(df4, Seq(Row(make(Array(45)))))
-
- // test for new inserted row
- sql(
- "insert into test_rename values (array(11,22,33),
array(array(11,22),array(33,44)), array" +
- "('hello11', 'world11'), array(named_struct('a',4555)))")
- val rows = sql("select arr11, arr22, arr33, arr44.a11 from
test_rename").collect
- assert(rows.size == 2)
- val secondRow = rows(1)
- assert(secondRow(0).equals(make(Array(11, 22, 33))) &&
- secondRow(1).equals(make(Array(make(Array(11, 22)), make(Array(33,
44))))) &&
- secondRow(2).equals(make(Array("hello11", "world11"))))
- }
+ sql("drop table if exists test_rename")
+ sql(
+ "CREATE TABLE test_rename (arr1 array<int>, arr2 array<array<int>>, arr3
array<string>, " +
+ "arr4 array<struct<a:int>>) STORED AS carbondata")
+ sql(
+ "insert into test_rename values (array(1,2,3),
array(array(1,2),array(3,4)), array('hello'," +
+ "'world'), array(named_struct('a',45)))")
+
+ sql("alter table test_rename change arr1 arr11 array<int>")
+ val df1 = sql("select arr11 from test_rename")
+ assert(df1.collect.size == 1)
+ checkAnswer(df1, Seq(Row(make(Array(1, 2, 3)))))
+
+ sql("alter table test_rename change arr2 arr22 array<array<int>>")
+ val df2 = sql("select arr22 from test_rename")
+ assert(df2.collect.size == 1)
+ checkAnswer(df2, Seq(Row(make(Array(make(Array(1, 2)), make(Array(3,
4)))))))
+
+ sql("alter table test_rename change arr3 arr33 array<string>")
+ val df3 = sql("select arr33 from test_rename")
+ assert(df3.collect.size == 1)
+ checkAnswer(sql("select arr33 from test_rename"),
Seq(Row(make(Array("hello", "world")))))
+
+ sql("alter table test_rename change arr4 arr44 array<struct<a:int>>")
+ sql("alter table test_rename change arr44 arr44 array<struct<a11:int>>")
+
+ val df4 = sql("select arr44.a11 from test_rename")
+ assert(df4.collect.size == 1)
+ checkAnswer(df4, Seq(Row(make(Array(45)))))
+
+ // test for new inserted row
+ sql(
+ "insert into test_rename values (array(11,22,33),
array(array(11,22),array(33,44)), array" +
+ "('hello11', 'world11'), array(named_struct('a',4555)))")
+ val rows = sql("select arr11, arr22, arr33, arr44.a11 from
test_rename").collect
+ assert(rows.size == 2)
+ val secondRow = rows(1)
+ assert(secondRow(0).equals(make(Array(11, 22, 33))) &&
+ secondRow(1).equals(make(Array(make(Array(11, 22)), make(Array(33,
44))))) &&
+ secondRow(2).equals(make(Array("hello11", "world11"))))
}
test("validate alter change datatype for complex children columns") {