This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8bc9e  [CARBONDATA-4282] Fix issues with table having complex 
columns related to long string, SI, local dictionary
4d8bc9e is described below

commit 4d8bc9eccfdf699f28a7f5e757a125e0ade82026
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Sep 6 23:54:07 2021 +0530

    [CARBONDATA-4282] Fix issues with table having complex columns related to 
long string, SI, local dictionary
    
    Why is this PR needed?
    1.Insert/load fails after alter add complex column if table contains long 
string columns.
    2.create index on array of complex column (map/struct) throws null pointer 
exception instead of correct error message.
    3.alter table property local dictionary inlcude/exclude with newly added 
map column is failing.
    
    What changes were proposed in this PR?
    1. The datatypes array and data row are of different order leading to 
ClassCastException. Made changes to add newly added complex columns after the 
long string columns and other dimensions in carbonTableSchemaCommon.scala
    2. For complex columns, SI creation on only array of primitive types is 
allowed. Check if the child column is of complex type and throw an exception. 
Changes made in SICreationCommand.scala
    3. In AlterTableUtil.scala, while validating local dictionary columns, 
array and struct type are present but map type is missed. Added check for 
complex types.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4214
---
 .../TestSIWithComplexArrayType.scala               | 12 +++-
 .../command/carbonTableSchemaCommon.scala          | 13 ++++-
 .../secondaryindex/command/SICreationCommand.scala |  4 +-
 .../org/apache/spark/util/AlterTableUtil.scala     |  5 +-
 .../alterTable/TestAlterTableAddColumns.scala      | 66 +++++++++++++++++++++-
 5 files changed, 89 insertions(+), 11 deletions(-)

diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 5892ee7..813808d 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -425,11 +425,19 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
   }
 
   test("test si creation with array") {
-    sql("create table complextable (id int, name string, country 
array<array<string>>, add array<int>) stored as carbondata")
+    sql("create table complextable (id int, name string, country 
array<array<string>>," +
+        " add array<map<int,int>>, code array<struct<a:string,b:int>>) stored 
as carbondata")
     sql("drop index if exists index_1 on complextable")
+    val errorMessage = "SI creation with nested array complex type is not 
supported yet"
     assert(intercept[RuntimeException] {
       sql("create index index_1 on table complextable(country) as 
'carbondata'")
-    }.getMessage.contains("SI creation with nested array complex type is not 
supported yet"))
+    }.getMessage.contains(errorMessage))
+    assert(intercept[RuntimeException] {
+      sql("create index index_1 on table complextable(add) as 'carbondata'")
+    }.getMessage.contains(errorMessage))
+    assert(intercept[RuntimeException] {
+      sql("create index index_1 on table complextable(code) as 'carbondata'")
+    }.getMessage.contains(errorMessage))
   }
 
   test("test complex with null and empty data") {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index a00e58e..2096e93 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -263,6 +263,7 @@ class AlterTableColumnSchemaGenerator(
       newCols: mutable.Buffer[ColumnSchema],
       allColumns: mutable.Buffer[ColumnSchema],
       longStringCols: mutable.Buffer[ColumnSchema],
+      complexCols: mutable.Buffer[ColumnSchema],
       currentSchemaOrdinal: Int): Unit = {
     if (currField.children.get == null || currField.children.isEmpty) {
       return
@@ -276,12 +277,15 @@ class AlterTableColumnSchemaGenerator(
           // put the new long string columns in 'longStringCols'
           // and add them after old long string columns
           longStringCols ++= Seq(childSchema)
+        } else if (childSchema.getDataType.isComplexType ||
+                   CarbonUtil.isComplexColumn(childSchema.getColumnName)) {
+          complexCols ++= Seq(childSchema)
         } else {
           allColumns ++= Seq(childSchema)
         }
         newCols ++= Seq(childSchema)
         addComplexChildCols(childField, childSchema, newCols, allColumns, 
longStringCols,
-          currentSchemaOrdinal)
+          complexCols, currentSchemaOrdinal)
       })
     }
   }
@@ -292,6 +296,7 @@ class AlterTableColumnSchemaGenerator(
     // previous maximum column schema ordinal + 1 is the current column schema 
ordinal
     val currentSchemaOrdinal = tableCols.map(col => col.getSchemaOrdinal).max 
+ 1
     val longStringCols = mutable.Buffer[ColumnSchema]()
+    val complexCols = mutable.Buffer[ColumnSchema]()
     // get all original dimension columns
     // but exclude complex type columns and long string columns
     var allColumns = tableCols.filter(x =>
@@ -322,11 +327,13 @@ class AlterTableColumnSchemaGenerator(
         // put the new long string columns in 'longStringCols'
         // and add them after old long string columns
         longStringCols ++= Seq(columnSchema)
+      } else if (columnSchema.getDataType.isComplexType) {
+        complexCols ++= Seq(columnSchema)
       } else {
         allColumns ++= Seq(columnSchema)
       }
       newCols ++= Seq(columnSchema)
-      addComplexChildCols(field, columnSchema, newCols, allColumns, 
longStringCols,
+      addComplexChildCols(field, columnSchema, newCols, allColumns, 
longStringCols, complexCols,
         currentSchemaOrdinal)
     })
     // put the old long string columns
@@ -338,6 +345,8 @@ class AlterTableColumnSchemaGenerator(
     allColumns ++= tableCols.filter(x =>
       (x.isDimensionColumn &&
        (x.getDataType.isComplexType() || x.isComplexColumn() || 
x.getSchemaOrdinal == -1)))
+    // put the new complex columns after long string columns and at the end of 
dimension columns
+    allColumns ++= complexCols
     // original measure columns
     allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
     // add new measure columns
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index b14e418..2c8dc08 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -497,7 +497,9 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         }
         if (dimension.getNumberOfChild > 0) {
           val complexChildDims = dimension.getListOfChildDimensions.asScala
-          if (complexChildDims.exists(col => 
DataTypes.isArrayType(col.getDataType))) {
+          // For complex columns, SI creation on only array of primitive types 
is allowed.
+          // Check if child column is of complex type and throw exception.
+          if (complexChildDims.exists(col => col.isComplex)) {
             throw new ErrorMessage(
               "SI creation with nested array complex type is not supported 
yet")
           }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index f7c56ba..2168ba9 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -992,10 +992,7 @@ object AlterTableUtil {
                                      .equalsIgnoreCase("STRING") &&
                                    !col.getDataType.toString
                                      .equalsIgnoreCase("VARCHAR") &&
-                                   !col.getDataType.toString
-                                     .equalsIgnoreCase("STRUCT") &&
-                                   !col.getDataType.toString
-                                     .equalsIgnoreCase("ARRAY"))) {
+                                   !col.getDataType.isComplexType)) {
         val errMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE 
column: " + dictCol.trim +
                      " is not a string/complex/varchar datatype column. 
LOCAL_DICTIONARY_INCLUDE" +
                      "/LOCAL_DICTIONARY_EXCLUDE should be no " +
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index 4b412cb..f5bfb32 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -237,9 +237,10 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
   }
 
 
-  test("Test alter add complex type and compaction") {
+  test("Test alter add complex type with long string column and compaction") {
     sql("DROP TABLE IF EXISTS alter_com")
-    sql("create table alter_com (a int, b string, arr1 array<string>) stored 
as carbondata")
+    sql("create table alter_com (a int, b string, arr1 array<string>) stored 
as carbondata" +
+        " tblproperties('long_string_columns'='b')")
     sql("insert into alter_com select 1,'a',array('hi')")
     sql("insert into alter_com select 2,'b',array('hello','world')")
     sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
@@ -297,6 +298,17 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
           s"('$dictionary'='struct1, struct2')")
       val schema = sql("describe alter_struct").collect()
       assert(schema.size == 7)
+    } else if (complexType.equals("MAP")) {
+      sql("DROP TABLE IF EXISTS alter_com")
+      sql(
+        "create table alter_com(roll int, department map<string,string>) 
STORED " +
+        "AS carbondata")
+      sql("insert into alter_com values(1, map('id1','name1'))")
+      sql("ALTER TABLE alter_com ADD COLUMNS(map1 map<string,string>, " +
+          "map2 map<string,string>)")
+      sql(s"alter table alter_com set tblproperties('$dictionary'='map1, 
map2')")
+      val schema = sql("describe alter_com").collect()
+      assert(schema.size == 4)
     }
   }
 
@@ -333,6 +345,56 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS alter_com")
   }
 
+  def insertIntoTableForMapType(): Unit = {
+    sql("insert into alter_com 
values(2,map('id2','name2'),map('key1','val1'),map('key2','val2'))")
+    sql("insert into alter_com values(3,map('id3','name3'),map('key3','val3'), 
map('key4','val4'))")
+    sql("insert into alter_com 
values(4,map('id4','name4'),map('key5','val5'),map('key6','val6'))")
+  }
+
+  def checkRestulForMapType(): Unit = {
+    val totalRows = sql("select * from alter_com").collect()
+    val a = sql("select * from alter_com where map1['key1']='val1'").collect
+    val b = sql("select * from alter_com where map2['key4']='val4'").collect
+    val c = sql("select * from alter_com where roll = 1").collect
+    assert(totalRows.size == 4)
+    assert(a.size == 1)
+    assert(b.size == 1)
+    // check default value for newly added map columns that is index - 3 and 4
+    assert(c(0)(2) == null && c(0)(3) == null)
+  }
+
+  test("Test alter add for map enabling local dictionary") {
+    createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "MAP")
+    insertIntoTableForMapType()
+    checkRestulForMapType()
+    sql(s"ALTER TABLE alter_com ADD COLUMNS(map3 map<int,int>) ")
+    sql(s"ALTER TABLE alter_com ADD COLUMNS(map4 map<int,int>, str 
struct<a:int,b:string>) ")
+    sql(
+      "insert into alter_com values(5,map('df','dfg'),map('df','dfg'), 
map('df','dfg'),map(6,7)," +
+      "map(5,9),named_struct('a',1,'b','abcde'))")
+    sql("alter table alter_com compact 'minor'")
+    assert(sql("select * from alter_com where map3[6]=7").collect().size == 1)
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+    assert(addedColumns.size == 5)
+    sql("DROP TABLE IF EXISTS alter_com")
+  }
+
+  test("Test alter add for map disabling local dictionary") {
+    createTableForComplexTypes("LOCAL_DICTIONARY_EXCLUDE", "MAP")
+    insertIntoTableForMapType()
+    checkRestulForMapType()
+    sql(s"ALTER TABLE alter_com ADD COLUMNS(map3 map<int,int>) ")
+    sql(s"ALTER TABLE alter_com ADD COLUMNS(map4 map<int,int>, str 
struct<a:int,b:string>) ")
+    sql(
+      "insert into alter_com values(5,map('df','dfg'),map('df','dfg'), 
map('df','dfg'),map(6,7)," +
+      "map(5,9),named_struct('a',1,'b','abcde'))")
+    sql("alter table alter_com compact 'minor'")
+    assert(sql("select * from alter_com where map3[6]=7").collect().size == 1)
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+    assert(addedColumns.size == 5)
+    sql("DROP TABLE IF EXISTS alter_com")
+  }
+
   def insertIntoTableForStructType(): Unit = {
     sql("insert into alter_struct values(2, named_struct('id1', 
'id2','name1','name2'), " +
       "named_struct('a','id2','b', 'abc2'), 'hello world', 5, 
named_struct('c','id3'," +

Reply via email to