This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8bc9e [CARBONDATA-4282] Fix issues with table having complex
columns related to long string, SI, local dictionary
4d8bc9e is described below
commit 4d8bc9eccfdf699f28a7f5e757a125e0ade82026
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Sep 6 23:54:07 2021 +0530
[CARBONDATA-4282] Fix issues with table having complex columns related to
long string, SI, local dictionary
Why is this PR needed?
1.Insert/load fails after alter add complex column if table contains long
string columns.
2.create index on array of complex column (map/struct) throws null pointer
exception instead of correct error message.
3.alter table property local dictionary inlcude/exclude with newly added
map column is failing.
What changes were proposed in this PR?
1. The datatypes array and data row are of different order leading to
ClassCastException. Made changes to add newly added complex columns after the
long string columns and other dimensions in carbonTableSchemaCommon.scala
2. For complex columns, SI creation on only array of primitive types is
allowed. Check if the child column is of complex type and throw an exception.
Changes made in SICreationCommand.scala
3. In AlterTableUtil.scala, while validating local dictionary columns,
array and struct type are present but map type is missed. Added check for
complex types.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4214
---
.../TestSIWithComplexArrayType.scala | 12 +++-
.../command/carbonTableSchemaCommon.scala | 13 ++++-
.../secondaryindex/command/SICreationCommand.scala | 4 +-
.../org/apache/spark/util/AlterTableUtil.scala | 5 +-
.../alterTable/TestAlterTableAddColumns.scala | 66 +++++++++++++++++++++-
5 files changed, 89 insertions(+), 11 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 5892ee7..813808d 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -425,11 +425,19 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
}
test("test si creation with array") {
- sql("create table complextable (id int, name string, country
array<array<string>>, add array<int>) stored as carbondata")
+ sql("create table complextable (id int, name string, country
array<array<string>>," +
+ " add array<map<int,int>>, code array<struct<a:string,b:int>>) stored
as carbondata")
sql("drop index if exists index_1 on complextable")
+ val errorMessage = "SI creation with nested array complex type is not
supported yet"
assert(intercept[RuntimeException] {
sql("create index index_1 on table complextable(country) as
'carbondata'")
- }.getMessage.contains("SI creation with nested array complex type is not
supported yet"))
+ }.getMessage.contains(errorMessage))
+ assert(intercept[RuntimeException] {
+ sql("create index index_1 on table complextable(add) as 'carbondata'")
+ }.getMessage.contains(errorMessage))
+ assert(intercept[RuntimeException] {
+ sql("create index index_1 on table complextable(code) as 'carbondata'")
+ }.getMessage.contains(errorMessage))
}
test("test complex with null and empty data") {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index a00e58e..2096e93 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -263,6 +263,7 @@ class AlterTableColumnSchemaGenerator(
newCols: mutable.Buffer[ColumnSchema],
allColumns: mutable.Buffer[ColumnSchema],
longStringCols: mutable.Buffer[ColumnSchema],
+ complexCols: mutable.Buffer[ColumnSchema],
currentSchemaOrdinal: Int): Unit = {
if (currField.children.get == null || currField.children.isEmpty) {
return
@@ -276,12 +277,15 @@ class AlterTableColumnSchemaGenerator(
// put the new long string columns in 'longStringCols'
// and add them after old long string columns
longStringCols ++= Seq(childSchema)
+ } else if (childSchema.getDataType.isComplexType ||
+ CarbonUtil.isComplexColumn(childSchema.getColumnName)) {
+ complexCols ++= Seq(childSchema)
} else {
allColumns ++= Seq(childSchema)
}
newCols ++= Seq(childSchema)
addComplexChildCols(childField, childSchema, newCols, allColumns,
longStringCols,
- currentSchemaOrdinal)
+ complexCols, currentSchemaOrdinal)
})
}
}
@@ -292,6 +296,7 @@ class AlterTableColumnSchemaGenerator(
// previous maximum column schema ordinal + 1 is the current column schema
ordinal
val currentSchemaOrdinal = tableCols.map(col => col.getSchemaOrdinal).max
+ 1
val longStringCols = mutable.Buffer[ColumnSchema]()
+ val complexCols = mutable.Buffer[ColumnSchema]()
// get all original dimension columns
// but exclude complex type columns and long string columns
var allColumns = tableCols.filter(x =>
@@ -322,11 +327,13 @@ class AlterTableColumnSchemaGenerator(
// put the new long string columns in 'longStringCols'
// and add them after old long string columns
longStringCols ++= Seq(columnSchema)
+ } else if (columnSchema.getDataType.isComplexType) {
+ complexCols ++= Seq(columnSchema)
} else {
allColumns ++= Seq(columnSchema)
}
newCols ++= Seq(columnSchema)
- addComplexChildCols(field, columnSchema, newCols, allColumns,
longStringCols,
+ addComplexChildCols(field, columnSchema, newCols, allColumns,
longStringCols, complexCols,
currentSchemaOrdinal)
})
// put the old long string columns
@@ -338,6 +345,8 @@ class AlterTableColumnSchemaGenerator(
allColumns ++= tableCols.filter(x =>
(x.isDimensionColumn &&
(x.getDataType.isComplexType() || x.isComplexColumn() ||
x.getSchemaOrdinal == -1)))
+ // put the new complex columns after long string columns and at the end of
dimension columns
+ allColumns ++= complexCols
// original measure columns
allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
// add new measure columns
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index b14e418..2c8dc08 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -497,7 +497,9 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
}
if (dimension.getNumberOfChild > 0) {
val complexChildDims = dimension.getListOfChildDimensions.asScala
- if (complexChildDims.exists(col =>
DataTypes.isArrayType(col.getDataType))) {
+ // For complex columns, SI creation on only array of primitive types
is allowed.
+ // Check if child column is of complex type and throw exception.
+ if (complexChildDims.exists(col => col.isComplex)) {
throw new ErrorMessage(
"SI creation with nested array complex type is not supported
yet")
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index f7c56ba..2168ba9 100644
---
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -992,10 +992,7 @@ object AlterTableUtil {
.equalsIgnoreCase("STRING") &&
!col.getDataType.toString
.equalsIgnoreCase("VARCHAR") &&
- !col.getDataType.toString
- .equalsIgnoreCase("STRUCT") &&
- !col.getDataType.toString
- .equalsIgnoreCase("ARRAY"))) {
+ !col.getDataType.isComplexType)) {
val errMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE
column: " + dictCol.trim +
" is not a string/complex/varchar datatype column.
LOCAL_DICTIONARY_INCLUDE" +
"/LOCAL_DICTIONARY_EXCLUDE should be no " +
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index 4b412cb..f5bfb32 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -237,9 +237,10 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
}
- test("Test alter add complex type and compaction") {
+ test("Test alter add complex type with long string column and compaction") {
sql("DROP TABLE IF EXISTS alter_com")
- sql("create table alter_com (a int, b string, arr1 array<string>) stored
as carbondata")
+ sql("create table alter_com (a int, b string, arr1 array<string>) stored
as carbondata" +
+ " tblproperties('long_string_columns'='b')")
sql("insert into alter_com select 1,'a',array('hi')")
sql("insert into alter_com select 2,'b',array('hello','world')")
sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
@@ -297,6 +298,17 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
s"('$dictionary'='struct1, struct2')")
val schema = sql("describe alter_struct").collect()
assert(schema.size == 7)
+ } else if (complexType.equals("MAP")) {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql(
+ "create table alter_com(roll int, department map<string,string>)
STORED " +
+ "AS carbondata")
+ sql("insert into alter_com values(1, map('id1','name1'))")
+ sql("ALTER TABLE alter_com ADD COLUMNS(map1 map<string,string>, " +
+ "map2 map<string,string>)")
+ sql(s"alter table alter_com set tblproperties('$dictionary'='map1,
map2')")
+ val schema = sql("describe alter_com").collect()
+ assert(schema.size == 4)
}
}
@@ -333,6 +345,56 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_com")
}
+ def insertIntoTableForMapType(): Unit = {
+ sql("insert into alter_com
values(2,map('id2','name2'),map('key1','val1'),map('key2','val2'))")
+ sql("insert into alter_com values(3,map('id3','name3'),map('key3','val3'),
map('key4','val4'))")
+ sql("insert into alter_com
values(4,map('id4','name4'),map('key5','val5'),map('key6','val6'))")
+ }
+
+ def checkRestulForMapType(): Unit = {
+ val totalRows = sql("select * from alter_com").collect()
+ val a = sql("select * from alter_com where map1['key1']='val1'").collect
+ val b = sql("select * from alter_com where map2['key4']='val4'").collect
+ val c = sql("select * from alter_com where roll = 1").collect
+ assert(totalRows.size == 4)
+ assert(a.size == 1)
+ assert(b.size == 1)
+ // check default value for newly added map columns that is index - 3 and 4
+ assert(c(0)(2) == null && c(0)(3) == null)
+ }
+
+ test("Test alter add for map enabling local dictionary") {
+ createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "MAP")
+ insertIntoTableForMapType()
+ checkRestulForMapType()
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(map3 map<int,int>) ")
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(map4 map<int,int>, str
struct<a:int,b:string>) ")
+ sql(
+ "insert into alter_com values(5,map('df','dfg'),map('df','dfg'),
map('df','dfg'),map(6,7)," +
+ "map(5,9),named_struct('a',1,'b','abcde'))")
+ sql("alter table alter_com compact 'minor'")
+ assert(sql("select * from alter_com where map3[6]=7").collect().size == 1)
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 5)
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
+ test("Test alter add for map disabling local dictionary") {
+ createTableForComplexTypes("LOCAL_DICTIONARY_EXCLUDE", "MAP")
+ insertIntoTableForMapType()
+ checkRestulForMapType()
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(map3 map<int,int>) ")
+ sql(s"ALTER TABLE alter_com ADD COLUMNS(map4 map<int,int>, str
struct<a:int,b:string>) ")
+ sql(
+ "insert into alter_com values(5,map('df','dfg'),map('df','dfg'),
map('df','dfg'),map(6,7)," +
+ "map(5,9),named_struct('a',1,'b','abcde'))")
+ sql("alter table alter_com compact 'minor'")
+ assert(sql("select * from alter_com where map3[6]=7").collect().size == 1)
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 5)
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
def insertIntoTableForStructType(): Unit = {
sql("insert into alter_struct values(2, named_struct('id1',
'id2','name1','name2'), " +
"named_struct('a','id2','b', 'abc2'), 'hello world', 5,
named_struct('c','id3'," +