This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d01d9f5 [CARBONDATA-4163] Support adding of single-level complex
columns(array/struct)
d01d9f5 is described below
commit d01d9f5c63af42b35819ead326c771eb674f4ba5
Author: akkio-97 <[email protected]>
AuthorDate: Wed Mar 31 20:41:26 2021 +0530
[CARBONDATA-4163] Support adding of single-level complex
columns(array/struct)
Why is this PR needed?
This PR enables adding of single-level complex columns(only array and
struct)
to carbon table. Command -
ALTER TABLE <table_name> ADD COLUMNS(arr1 ARRAY (double) )
ALTER TABLE <table_name> ADD COLUMNS(struct1 STRUCT<a:int, b:string>)
The default value for the column in case of old rows will be null.
What changes were proposed in this PR?
1. Create instances of ColumnSchema for each of the children, By doing this
each child column will have its own ordinal. The new columns are first
identified and stored in a flat structure. For example, for arr1
array(int)
--> 2 column schemas are created - arr1 and arr1.val. First being the
parent
and second being its child. Each of which will have its own ordinals.
2. Later while updating the Schema evolution entry we only account for the
newly
added parent columns while discarding children columns (As they are no
longer
required. Otherwise we will have the child as a separate column in the
schema ).
3. Using the schema evolution entry the final schema is updated. Since
ColumnSchemas
are stored as a flat structure we later convert them to a nested
structure of type Dimensions.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4115
---
.../core/scan/executor/util/QueryUtil.java | 5 +
docs/ddl-of-carbondata.md | 7 +-
.../TestSIWithComplexArrayType.scala | 37 +++
.../command/carbonTableSchemaCommon.scala | 78 +++++-
.../schema/CarbonAlterTableAddColumnCommand.scala | 2 +
.../sql/parser/CarbonSparkSqlParserUtil.scala | 6 -
.../alterTable/TestAlterTableAddColumns.scala | 309 ++++++++++++++++++++-
.../restructure/AlterTableValidationTestCase.scala | 10 -
8 files changed, 422 insertions(+), 32 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index de28738..0f557ab 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -280,6 +280,11 @@ public class QueryUtil {
private static void fillParentDetails(Map<Integer, Integer>
dimensionToBlockIndexMap,
CarbonDimension dimension, Map<Integer, GenericQueryType>
complexTypeMap) {
+ // If the dimension is added via alter add command then it doesn't exist
in any
+ // of the blocks as it is not yet updated. Hence return
+ if (!dimensionToBlockIndexMap.containsKey(dimension.getOrdinal())) {
+ return;
+ }
int parentBlockIndex =
dimensionToBlockIndexMap.get(dimension.getOrdinal());
GenericQueryType parentQueryType;
if (DataTypes.isArrayType(dimension.getDataType())) {
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3bb6c69..4c9e957 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -770,8 +770,11 @@ CarbonData DDL statements are documented here,which
includes:
```
ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING)
TBLPROPERTIES('DEFAULT.VALUE.a1'='10')
```
- **NOTE:** Add Complex datatype columns is not supported.
-
+ **NOTE:** Adding of only single-level Complex datatype columns(only
array and struct) is supported.
+ Example -
+ ```
+ ALTER TABLE <table-name> ADD COLUMNS(arrField array<array<int>>,
structField struct<id1:string,name1:string>)
+ ```
Users can specify which columns to include and exclude for local dictionary
generation after adding new columns. These will be appended with the already
existing local dictionary include and exclude columns of main table
respectively.
```
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index f1156be..3af2a3d 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -43,6 +43,43 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
sql("drop table if exists complextable5")
}
+ test("Test alter add array column before creating SI") {
+ sql("drop table if exists complextable")
+ sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
+ sql("insert into complextable select 1,array('china', 'us'), 'b'")
+ sql("insert into complextable select 2,array('pak'), 'v'")
+
+ sql("drop index if exists index_11 on complextable")
+ sql(
+ "ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
+ sql("insert into complextable select 3,array('china'),
'f',array('hello','world')")
+ sql("insert into complextable select
4,array('india'),'g',array('iron','man','jarvis')")
+
+ val result1 = sql("select * from complextable where
array_contains(arr2,'iron')")
+ val result2 = sql("select * from complextable where arr2[0]='iron'")
+ sql("create index index_11 on table complextable(arr2) as 'carbondata'")
+ val df1 = sql(" select * from complextable where
array_contains(arr2,'iron')")
+ val df2 = sql(" select * from complextable where arr2[0]='iron'")
+ if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ val doNotHitSIDf = sql(" select * from complextable where
array_contains(arr2,'iron') and array_contains(arr2,'man')")
+ if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result1, df1)
+ checkAnswer(result2, df2)
+ }
+
test("test array<string> on secondary index") {
sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 22b1989..f09f893 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.util.CarbonException
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.index.Segment
@@ -235,6 +236,26 @@ class AlterTableColumnSchemaGenerator(
dataType.toString.equals("STRUCT")
}
+ private def createChildSchema(childField: Field, currentSchemaOrdinal: Int):
ColumnSchema = {
+ // TODO: should support adding multi-level complex columns: CARBONDATA-4164
+ if (!childField.children.contains(null)) {
+ throw new UnsupportedOperationException(
+ "Alter add columns with nested complex types is not allowed")
+ }
+
+ TableNewProcessor.createColumnSchema(
+ childField,
+ new java.util.ArrayList[Encoding](),
+ isDimensionCol = true,
+ childField.precision,
+ childField.scale,
+ childField.schemaOrdinal + currentSchemaOrdinal,
+ alterTableModel.highCardinalityDims,
+ alterTableModel.databaseName.getOrElse(dbName),
+ isSortColumn(childField.name.getOrElse(childField.column)),
+ isVarcharColumn(childField.name.getOrElse(childField.column)))
+ }
+
def process: Seq[ColumnSchema] = {
val tableSchema = tableInfo.getFactTable
val tableCols = tableSchema.getListOfColumns.asScala
@@ -244,8 +265,8 @@ class AlterTableColumnSchemaGenerator(
// get all original dimension columns
// but exclude complex type columns and long string columns
var allColumns = tableCols.filter(x =>
- (x.isDimensionColumn && !x.getDataType.isComplexType()
- && x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR)))
+ (x.isDimensionColumn && !x.getDataType.isComplexType() &&
!x.isComplexColumn()
+ && x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR)))
var newCols = Seq[ColumnSchema]()
val invertedIndexCols: Array[String] = alterTableModel
.tableProperties
@@ -255,6 +276,10 @@ class AlterTableColumnSchemaGenerator(
// add new dimension columns
alterTableModel.dimCols.foreach(field => {
+ if (field.dataType.get.toLowerCase().equals(CarbonCommonConstants.MAP)) {
+ throw new MalformedCarbonCommandException(
+ s"Add column is unsupported for map datatype column: ${ field.column
}")
+ }
val encoders = new java.util.ArrayList[Encoding]()
val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
field,
@@ -275,6 +300,34 @@ class AlterTableColumnSchemaGenerator(
allColumns ++= Seq(columnSchema)
}
newCols ++= Seq(columnSchema)
+ if (DataTypes.isArrayType(columnSchema.getDataType)) {
+ columnSchema.setNumberOfChild(field.children.size)
+ val childField = field.children.get(0)
+ val childSchema: ColumnSchema = createChildSchema(childField,
currentSchemaOrdinal)
+ if (childSchema.getDataType == DataTypes.VARCHAR) {
+ // put the new long string columns in 'longStringCols'
+ // and add them after old long string columns
+ longStringCols ++= Seq(childSchema)
+ } else {
+ allColumns ++= Seq(childSchema)
+ }
+ newCols ++= Seq(childSchema)
+ } else if (DataTypes.isStructType(columnSchema.getDataType)) {
+ val noOfChildren = field.children.get.size
+ columnSchema.setNumberOfChild(noOfChildren)
+ for (i <- 0 to noOfChildren - 1) {
+ val childField = field.children.get(i)
+ val childSchema: ColumnSchema = createChildSchema(childField,
currentSchemaOrdinal)
+ if (childSchema.getDataType == DataTypes.VARCHAR) {
+ // put the new long string columns in 'longStringCols'
+ // and add them after old long string columns
+ longStringCols ++= Seq(childSchema)
+ } else {
+ allColumns ++= Seq(childSchema)
+ }
+ newCols ++= Seq(childSchema)
+ }
+ }
})
// put the old long string columns
allColumns ++= tableCols.filter(x =>
@@ -283,7 +336,8 @@ class AlterTableColumnSchemaGenerator(
allColumns ++= longStringCols
// put complex type columns at the end of dimension columns
allColumns ++= tableCols.filter(x =>
- (x.isDimensionColumn && (x.getDataType.isComplexType() ||
x.getSchemaOrdinal == -1)))
+ (x.isDimensionColumn &&
+ (x.getDataType.isComplexType() || x.isComplexColumn() ||
x.getSchemaOrdinal == -1)))
// original measure columns
allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
// add new measure columns
@@ -319,10 +373,6 @@ class AlterTableColumnSchemaGenerator(
sys.error(s"Duplicate column found with name: $name")
})
- if (newCols.exists(_.getDataType.isComplexType)) {
- sys.error(s"Complex column cannot be added")
- }
-
val columnValidator = CarbonSparkFactory.getCarbonColumnValidator
columnValidator.validateColumns(allColumns)
@@ -473,6 +523,11 @@ class AlterTableColumnSchemaGenerator(
for (elem <- alterTableModel.tableProperties) {
if (elem._1.toLowerCase.startsWith(defaultValueString)) {
if
(col.getColumnName.equalsIgnoreCase(elem._1.substring(defaultValueString.length)))
{
+ if (DataTypes.isArrayType(col.getDataType) ||
DataTypes.isStructType(col.getDataType) ||
+ DataTypes.isMapType(col.getDataType)) {
+ throw new UnsupportedOperationException(
+ "Cannot add a default value in case of complex columns.")
+ }
rawData = elem._2
val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2,
col)
if (null != data) {
@@ -534,10 +589,11 @@ object TableNewProcessor {
if (highCardinalityDims.contains(colName)) {
encoders.remove(Encoding.DICTIONARY)
}
- if (dataType == DataTypes.DATE) {
- encoders.add(Encoding.DICTIONARY)
- encoders.add(Encoding.DIRECT_DICTIONARY)
- } else if (dataType == DataTypes.TIMESTAMP &&
!highCardinalityDims.contains(colName)) {
+ // complex children does not require encoding
+ if (!colName.contains(CarbonCommonConstants.POINT) && (dataType ==
DataTypes.DATE ||
+ dataType ==
DataTypes.TIMESTAMP &&
+
!highCardinalityDims.contains(colName)
+ )) {
encoders.add(Encoding.DICTIONARY)
encoders.add(Encoding.DIRECT_DICTIONARY)
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 2e86d14..6aea625 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -85,6 +85,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
timeStamp = System.currentTimeMillis
val schemaEvolutionEntry = new
org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(timeStamp)
+ // filter out complex children columns
+ newCols = newCols.filter(x => !x.isComplexColumn)
schemaEvolutionEntry.setAdded(newCols.toList.asJava)
val thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 4f96942..35ef69c 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -683,12 +683,6 @@ object CarbonSparkSqlParserUtil {
fields: List[Field],
tblProp: Option[List[(String, String)]]
): CarbonAlterTableAddColumnCommand = {
- fields.foreach { f =>
- if (CarbonParserUtil.isComplexType(f.dataType.get)) {
- throw new MalformedCarbonCommandException(
- s"Add column is unsupported for complex datatype column: ${ f.column
}")
- }
- }
val tableProps = if (tblProp.isDefined) {
tblProp.get.groupBy(_._1.toLowerCase).foreach(f =>
if (f._2.size > 1) {
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index 8a5990e..b4504e1 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -17,13 +17,18 @@
package org.apache.carbondata.spark.testsuite.alterTable
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters
import scala.collection.mutable
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonProperties
class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
@@ -136,14 +141,280 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql(s"""DROP TABLE IF EXISTS ${ tableName }""")
}
+ def addedColumnsInSchemaEvolutionEntry(tableName: String): Seq[ColumnSchema]
= {
+ val carbonTable = CarbonEnv.getCarbonTable(None,
tableName)(sqlContext.sparkSession)
+ val schemaEvolutionList = carbonTable.getTableInfo
+ .getFactTable
+ .getSchemaEvolution()
+ .getSchemaEvolutionEntryList()
+ var addedColumns = Seq[ColumnSchema]()
+ for (i <- 0 until schemaEvolutionList.size()) {
+ addedColumns ++=
+ JavaConverters
+
.asScalaIteratorConverter(schemaEvolutionList.get(i).getAdded.iterator())
+ .asScala
+ .toSeq
+ }
+ addedColumns
+ }
+
+ test("Test adding of array of all primitive datatypes") {
+ import scala.collection.mutable.WrappedArray.make
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("CREATE TABLE alter_com(intfield int) STORED AS carbondata")
+ sql(
+ "ALTER TABLE alter_com ADD COLUMNS(arr1 array<short>, arr2 array<int>,
arr3 " +
+ "array<long>, arr4 array<double>, arr5 array<decimal(8,2)>, arr6
array<string>, arr7 " +
+ "array<char(5)>, arr8 array<varchar(50)>, arr9 array<boolean>, arr10
array<date>, arr11 " +
+ "array<timestamp> )")
+ val columns = sql("desc table alter_com").collect()
+ assert(columns.size.equals(12))
+ sql(
+ "insert into alter_com
values(1,array(1,5),array(1,2),array(1,2,3),array(1.2d,2.3d),array" +
+
"(4.5,6.7),array('hello','world'),array('a','bcd'),array('abcd','efg'),array(true,false),"
+
+ "array('2017-02-01','2018-09-11'),array('2017-02-01
00:01:00','2018-02-01 02:21:00') )")
+ checkAnswer(sql(
+ "select * from alter_com"),
+ Seq(Row(1,
+ make(Array(1, 5)),
+ make(Array(1, 2)),
+ make(Array(1, 2, 3)),
+ make(Array(1.2, 2.3)),
+ make(Array(java.math.BigDecimal.valueOf(4.5).setScale(2),
+ java.math.BigDecimal.valueOf(6.7).setScale(2))),
+ make(Array("hello", "world")),
+ make(Array("a", "bcd")),
+ make(Array("abcd", "efg")),
+ make(Array(true, false)),
+ make(Array(Date.valueOf("2017-02-01"),
+ Date.valueOf("2018-09-11"))),
+ make(Array(Timestamp.valueOf("2017-02-01 00:01:00"),
+ Timestamp.valueOf("2018-02-01 02:21:00")))
+ )))
+
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 11)
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
+ test("Test adding of struct of all primitive datatypes") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("CREATE TABLE alter_com(intField INT) STORED AS " +
+ "carbondata")
+ sql("ALTER TABLE alter_com ADD COLUMNS(structField
struct<a:short,b:int,c:long,d:double, " +
+
"e:decimal(8,2),f:string,g:char(5),h:varchar(50),i:boolean,j:date,k:timestamp>)")
+ sql("insert into alter_com values(1,
named_struct('a',1,'b',2,'c',3,'d',1.23,'e',2.34,'f'," +
+ "'hello','g','abc','h','def','i',true,'j','2017-02-01','k','2018-02-01
02:00:00.0') ) ")
+ checkAnswer(sql("select structField from alter_com"),
+ Seq(Row(Row(1, 2, 3, 1.23,
java.math.BigDecimal.valueOf(2.34).setScale(2), "hello", "abc",
+ "def", true, Date.valueOf("2017-02-01"), Timestamp.valueOf("2018-02-01
02:00:00.0")))))
+
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 1)
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
+ def insertIntoTableForArrayType(): Unit = {
+ sql("insert into alter_com
values(4,array(2),array(1,2,3,4),array('abc','def'))")
+ sql("insert into alter_com values(5,array(1,2),array(1),
array('Hulk','Thor'))")
+ sql(
+ "insert into alter_com
values(6,array(1,2,3),array(1234,8,33333),array('Iron','Man'," +
+ "'Jarvis'))")
+ }
+
+ def checkRestulForArrayType(): Unit = {
+ val totalRows = sql("select * from alter_com").collect()
+ val a = sql("select * from alter_com where array_contains(arr2,2)").collect
+ val b = sql("select * from alter_com where
array_contains(arr3,'Thor')").collect
+ val c = sql("select * from alter_com where intField = 1").collect
+ assert(totalRows.size == 6)
+ assert(a.size == 1)
+ assert(b.size == 1)
+ // check default value for newly added array column that is index - 3 and 4
+ assert(c(0)(2) == null && c(0)(3) == null)
+ }
+
+ def createTableForComplexTypes(dictionary: String, complexType: String):
Unit = {
+ if (complexType.equals("ARRAY")) {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("CREATE TABLE alter_com(intField INT, arr1 array<int>) STORED AS
carbondata")
+ sql("insert into alter_com values(1,array(1) )")
+ sql("insert into alter_com values(2,array(9,0) )")
+ sql("insert into alter_com values(3,array(11,12,13) )")
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(arr2 array<int>, arr3
array<string>) TBLPROPERTIES" +
+ s"('$dictionary'='arr3')")
+ val schema = sql("describe alter_com").collect()
+ assert(schema.size == 4)
+ } else if (complexType.equals("STRUCT")) {
+ sql("DROP TABLE IF EXISTS alter_struct")
+ sql(
+ "create table alter_struct(roll int, department
struct<id1:string,name1:string>) STORED " +
+ "AS carbondata")
+ sql("insert into alter_struct values(1, named_struct('id1',
'id1','name1','name1'))")
+ sql("ALTER TABLE alter_struct ADD COLUMNS(struct1
struct<a:string,b:string>, temp string," +
+ " intField int, struct2 struct<c:string,d:string,e:int>, arr
array<int>) TBLPROPERTIES " +
+ "('LOCAL_DICTIONARY_INCLUDE'='struct1, struct2')")
+ val schema = sql("describe alter_struct").collect()
+ assert(schema.size == 7)
+ }
+ }
+
+ test("Test alter add for arrays enabling local dictionary") {
+ import scala.collection.mutable.WrappedArray.make
+ createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "ARRAY")
+ // For the previous segments the default value for newly added array
column is null
+ insertIntoTableForArrayType
+ checkRestulForArrayType
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(arr4 array<int>) ")
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(arr5 array<int>, str
struct<a:int,b:string>) ")
+ sql(
+ "insert into alter_com
values(2,array(9,0),array(1,2,3),array('hello','world'),array(6,7)," +
+ "array(8,9), named_struct('a',1,'b','abcde') )")
+ val rows = sql("select * from alter_com").collect()
+ assert(rows(6)(0) == 2)
+ assert(rows(6)(1) == make(Array(9, 0)))
+ assert(rows(6)(2) == make(Array(1, 2, 3)))
+ assert(rows(6)(3) == make(Array("hello", "world")))
+ assert(rows(6)(4) == make(Array(6, 7)))
+ assert(rows(6)(5) == make(Array(8, 9)))
+ assert(rows(6)(6) == Row(1, "abcde"))
+ assert(rows.size == 7)
+
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 5)
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
+ test("Test alter add for arrays disabling local dictionary") {
+ createTableForComplexTypes("LOCAL_DICTIONARY_EXCLUDE", "ARRAY")
+ // For the previous segments the default value for newly added array
column is null
+ insertIntoTableForArrayType
+ checkRestulForArrayType
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
+ def insertIntoTableForStructType(): Unit = {
+ sql("insert into alter_struct values(2, named_struct('id1',
'id2','name1','name2'), " +
+ "named_struct('a','id2','b', 'abc2'), 'hello world', 5,
named_struct('c','id3'," +
+ "'d', 'abc3','e', 22), array(1,2,3) )")
+ sql("insert into alter_struct values(3, named_struct('id1',
'id3','name1','name3'), " +
+ "named_struct('a','id2.1','b', 'abc2.1'), 'India', 5,
named_struct('c','id3.1'," +
+ "'d', 'abc3.1','e', 25), array(4,5) )")
+ }
+
+ def checkResultForStructType(): Unit = {
+ val totalRows = sql("select * from alter_struct").collect()
+ val a = sql("select * from alter_struct where struct1.a = 'id2'
").collect()
+ val b = sql(
+ "select * from alter_struct where struct1.a = 'id2' or struct2.c =
'id3.1' or " +
+ "array_contains(arr,5) ")
+ .collect()
+ val c = sql("select * from alter_struct where roll = 1").collect()
+
+ assert(totalRows.size == 3)
+ assert(a.size == 1)
+ assert(b.size == 2)
+ // check default value for newly added struct column
+ assert(c(0)(2) == null)
+ }
+
+ test("Test alter add for structs enabling local dictionary") {
+ createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "STRUCT")
+ // For the previous segments the default value for newly added struct
column is null
+ insertIntoTableForStructType
+ checkResultForStructType
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_struct")
+ assert(addedColumns.size == 5)
+ sql("DROP TABLE IF EXISTS alter_struct")
+ }
+
+ test("Test alter add for structs, disabling local dictionary") {
+ createTableForComplexTypes("LOCAL_DICTIONARY_EXCLUDE", "STRUCT")
+ // For the previous segments the default value for newly added struct
column is null
+ insertIntoTableForStructType
+ checkResultForStructType
+ sql("DROP TABLE IF EXISTS alter_struct")
+ }
+
+ test("Validate alter add multi-level complex column") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("CREATE TABLE alter_com(intField INT, arr array<int>) " +
+ "STORED AS carbondata ")
+ var exception = intercept[Exception] {
+ sql("ALTER TABLE alter_com ADD COLUMNS(arr1 array<array<int>>) ")
+ }
+ val exceptionMessage =
+ "operation failed for default.alter_com: Alter table add operation
failed: Alter add " +
+ "columns with nested complex types is not allowed"
+ assert(exception.getMessage.contains(exceptionMessage))
+
+ exception = intercept[Exception] {
+ sql("ALTER TABLE alter_com ADD COLUMNS(struct1 struct<arr: array<int>>)
")
+ }
+ assert(exception.getMessage.contains(exceptionMessage))
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
+ test("Validate default values of complex columns added by alter command") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("CREATE TABLE alter_com(doubleField double, arr1 array<long> ) STORED
AS carbondata")
+ sql("insert into alter_com values(1.1,array(77))")
+ sql("ALTER TABLE alter_com ADD COLUMNS(arr2 array<int>)")
+ val exception = intercept[Exception] {
+ sql(
+ "ALTER TABLE alter_com ADD COLUMNS(arr3 array<int>)
TBLPROPERTIES('DEFAULT.VALUE.arr3'= " +
+ "'array(77)')")
+ }
+ val exceptionMessage =
+ "operation failed for default.alter_com: Alter table add operation
failed: Cannot add a " +
+ "default value in case of complex columns."
+ assert(exception.getMessage.contains(exceptionMessage))
+ }
+
+ test("Validate adding of map types through alter command") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql(
+ "CREATE TABLE alter_com(doubleField double, arr1 array<long>, m map<int,
string> ) STORED " +
+ "AS carbondata")
+ sql("insert into alter_com values(1.1,array(77),map(1,'abc'))")
+ val exception = intercept[Exception] {
+ sql("ALTER TABLE alter_com ADD COLUMNS(mapField map<int, string>)")
+ }
+ val exceptionMessage =
+ "operation failed for default.alter_com: Alter table add operation
failed: Add column is " +
+ "unsupported for map datatype column: mapfield"
+ assert(exception.getMessage.contains(exceptionMessage))
+ }
+
+ test("alter table add complex columns with comment") {
+ sql("""create table test_add_column_with_comment(
+ | col1 string comment 'col1 comment',
+ | col2 string)
+ | stored as carbondata""".stripMargin)
+ sql("""alter table test_add_column_with_comment add columns(
+ | col3 array<int> comment "col3 comment",
+ | col4 struct<a:int,b:string> comment "col4 comment",
+ | col5 array<string> comment "",
+ | col6 array<string> )""".stripMargin)
+ val describe = sql("describe test_add_column_with_comment")
+ var count = describe.filter("col_name='col3' and comment = 'col3
comment'").count()
+ assertResult(1)(count)
+ count = describe.filter("col_name='col4' and comment = 'col4
comment'").count()
+ assertResult(1)(count)
+ count = describe.filter("col_name='col5' and comment = ''").count()
+ assertResult(1)(count)
+ count = describe.filter("col_name='col6' and comment is null").count()
+ assertResult(1)(count)
+ sql("DROP TABLE IF EXISTS test_add_column_with_comment")
+ }
+
test("alter table add columns with comment") {
sql("""create table test_add_column_with_comment(
| col1 string comment 'col1 comment',
| col2 int,
| col3 string)
| stored as carbondata""".stripMargin)
- sql(
- """alter table test_add_column_with_comment add columns(
+ sql("""alter table test_add_column_with_comment add columns(
| col4 string comment "col4 comment",
| col5 int,
| col6 string comment "")""".stripMargin)
@@ -156,6 +427,38 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
assertResult(1)(count)
}
+ test("Validate datatypes and column names of added complex columns") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql(
+ """create table alter_com(
+ | col1 array<string>)
+ | stored as carbondata""".stripMargin)
+ sql(
+ """alter table alter_com add columns(
+ | col2 array<string>,
+ | col3 struct<a:int,b:long>,
+ | col4 string,
+ | col5 array<long>,
+ | col6 int,
+ | col7 struct<a:string> )""".stripMargin)
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 6)
+ for (i <- 0 until addedColumns.size) {
+ val column = addedColumns(i)
+ assert((column.getColumnName.equals("col2") &&
DataTypes.isArrayType(column.getDataType) &&
+ column.getNumberOfChild.equals(1)) ||
+ (column.getColumnName.equals("col3") &&
DataTypes.isStructType(column.getDataType) &&
+ column.getNumberOfChild.equals(2)) ||
+ (column.getColumnName.equals("col5") &&
DataTypes.isArrayType(column.getDataType) &&
+ column.getNumberOfChild.equals(1)) ||
+ (column.getColumnName.equals("col7") &&
DataTypes.isStructType(column.getDataType) &&
+ column.getNumberOfChild.equals(1)) ||
+ column.getColumnName.equals("col4") ||
column.getColumnName.equals("col6")
+ )
+ }
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
test("[CARBONDATA-3596] Fix exception when execute load data command or " +
"select sql on a table which includes complex columns after execute
'add column' command") {
CarbonProperties.getInstance()
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 23052a2..1f60f9b 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -248,16 +248,6 @@ class AlterTableValidationTestCase extends QueryTest with
BeforeAndAfterAll {
}
}
- test("test adding complex datatype column") {
- try {
- sql("alter table restructure add columns(arr array<string>)")
- assert(false, "Exception should be thrown for complex column add")
- } catch {
- case e: Exception =>
- println(e.getMessage)
- }
- }
-
test("test drop and add same column with different datatype and default
value") {
sql("alter table restructure drop columns(empname)")
sql(