This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new fee8b18  [CARBONDATA-4193] Fix compaction failure after alter add 
complex column.
fee8b18 is described below

commit fee8b1822ecf238e9639fa51531e420181c1e065
Author: ShreelekhyaG <[email protected]>
AuthorDate: Wed May 26 19:14:40 2021 +0530

    [CARBONDATA-4193] Fix compaction failure after alter add complex column.
    
    Why is this PR needed?
    1. When we perform compaction after alter add a complex column, the query 
fails with
       ArrayIndexOutOfBounds exception. While converting and adding row after 
merge step
       in WriteStepRowUtil.fromMergerRow, As complex dimension is present, the 
complexKeys
       array is accessed but doesnt have any values in array and throws 
exception.
    2. Creating SI with globalsort on newly added complex column throws 
TreenodeException
       (Caused by: java.lang.RuntimeException: Couldn't find positionId#172 in 
[arr2#153])
    
    What changes were proposed in this PR?
    1. While restructuring row, added changes to fill complexKeys with default 
values(null
       values to children) according to the latest schema.
       In SI queryresultprocessor, used the column property 
isParentColumnComplex to identify
       any complex type. If complex index column not present in the parent 
table block,
       assigned the SI row value to empty bytes.
    2. For SI with globalsort, In case of complex type projection, 
TableProperties object in
       carbonEnv is not same as in carbonTable object and hence requiredColumns 
is not
       updated with positionId. So updating tableproperties from carbon env 
itself.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4142
---
 .../impl/RestructureBasedRawResultCollector.java   | 63 +++++++++++++--
 .../core/scan/executor/infos/DimensionInfo.java    | 27 +++++++
 .../core/scan/executor/util/RestructureUtil.java   |  5 ++
 .../apache/carbondata/core/util/CarbonUtil.java    | 19 +++++
 .../TestSIWithComplexArrayType.scala               | 94 ++++++++++++++++++++--
 .../query/SecondaryIndexQueryResultProcessor.java  | 20 ++---
 .../secondaryindex/command/SICreationCommand.scala |  8 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala | 11 +--
 .../spark/src/test/resources/secindex/array2.csv   |  4 +
 .../alterTable/TestAlterTableAddColumns.scala      | 18 +++++
 .../processing/datatypes/PrimitiveDataType.java    | 17 +---
 11 files changed, 239 insertions(+), 47 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index b2f5c07..80fb3c3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -17,12 +17,18 @@
 
 package org.apache.carbondata.core.scan.collector.impl;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -31,13 +37,19 @@ import 
org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
+import org.apache.log4j.Logger;
+
 /**
  * It is not a collector it is just a scanned result holder.
  */
 public class RestructureBasedRawResultCollector extends 
RawBasedResultCollector {
 
+  private static final Logger LOGGER =
+      
LogServiceFactory.getLogService(RestructureBasedRawResultCollector.class.getName());
+
   public RestructureBasedRawResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
     super(blockExecutionInfos);
   }
@@ -57,8 +69,9 @@ public class RestructureBasedRawResultCollector extends 
RawBasedResultCollector
     if (dimensionInfo.isDictionaryColumnAdded()) {
       fillDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
     }
-    if (dimensionInfo.isNoDictionaryColumnAdded()) {
-      fillNoDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
+    if (dimensionInfo.isNoDictionaryColumnAdded() || dimensionInfo
+        .isComplexColumnAdded()) {
+      fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(listBasedResult);
     }
     QueryStatistic resultPrepTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.RESULT_PREP_TIME);
@@ -111,36 +124,71 @@ public class RestructureBasedRawResultCollector extends 
RawBasedResultCollector
   }
 
   /**
-   * This method will fill the no dictionary byte array with newly added no 
dictionary columns
+   * This method will fill the no dictionary and complex byte array with newly 
added columns
    *
    * @param rows
    * @return
    */
-  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> 
rows) {
+  private void 
fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
     for (Object[] row : rows) {
       ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
       byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
       ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
       byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
           new byte[noDictKeyArray.length + 
dimensionInfo.getNewNoDictionaryColumnCount()][];
+      byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+          new byte[complexTypesKeyArray.length + 
dimensionInfo.getNewComplexColumnCount()][];
       int existingColumnValueIndex = 0;
       int newKeyArrayIndex = 0;
+      int existingComplexColumnValueIndex = 0;
+      int newComplexKeyArrayIndex = 0;
       for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
         if (actualQueryDimensions[i].getDimension().getDataType() != 
DataTypes.DATE
             && 
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          DataType currDataType = 
actualQueryDimensions[i].getDimension().getDataType();
           // if dimension exists then add the byte array value else add the 
default value
           if (dimensionInfo.getDimensionExists()[i]) {
-            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-                noDictKeyArray[existingColumnValueIndex++];
+            if (currDataType.isComplexType()) {
+              
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+                  complexTypesKeyArray[existingComplexColumnValueIndex++];
+            } else {
+              noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                  noDictKeyArray[existingColumnValueIndex++];
+            }
           } else {
             byte[] newColumnDefaultValue = null;
             Object defaultValue = dimensionInfo.getDefaultValues()[i];
             if (null != defaultValue) {
               newColumnDefaultValue = (byte[]) defaultValue;
-            } else if (actualQueryDimensions[i].getDimension().getDataType() 
== DataTypes.STRING) {
+            } else if (currDataType == DataTypes.STRING) {
               newColumnDefaultValue =
                   
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
                       CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else if (currDataType.isComplexType()) {
+              // Iterate over child dimensions and add its default value.
+              List<CarbonDimension> children =
+                  
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+              try (ByteArrayOutputStream byteStream = new 
ByteArrayOutputStream();
+                  DataOutputStream dataOutputStream = new 
DataOutputStream(byteStream)) {
+                if (DataTypes.isArrayType(currDataType)) {
+                  dataOutputStream.writeInt(1);
+                } else if (DataTypes.isStructType(currDataType)) {
+                  dataOutputStream.writeShort(children.size());
+                }
+                for (int j = 0; j < children.size(); j++) {
+                  // update default null values based on datatype
+                  CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
+                      children.get(j).getDataType());
+                }
+                newColumnDefaultValue = byteStream.toByteArray();
+              } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+                throw new RuntimeException(e);
+              }
+              
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+                  newColumnDefaultValue;
+              continue;
             } else {
               newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
             }
@@ -149,6 +197,7 @@ public class RestructureBasedRawResultCollector extends 
RawBasedResultCollector
         }
       }
       
byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
+      
byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
index b41de82..0a9820f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
@@ -56,6 +56,17 @@ public class DimensionInfo {
    * count of no dictionary columns not existing in the current block
    */
   private int newNoDictionaryColumnCount;
+
+  /**
+   * count of complex columns not existing in the current block
+   */
+  private int newComplexColumnCount;
+
+  /**
+   * flag to check whether there exist a complex column in the query which
+   * does not exist in the current block
+   */
+  private boolean isComplexColumnAdded;
   /**
   * maintains the block datatype
   */
@@ -115,4 +126,20 @@ public class DimensionInfo {
   public void setNewNoDictionaryColumnCount(int newNoDictionaryColumnCount) {
     this.newNoDictionaryColumnCount = newNoDictionaryColumnCount;
   }
+
+  public boolean isComplexColumnAdded() {
+    return isComplexColumnAdded;
+  }
+
+  public void setComplexColumnAdded(boolean complexColumnAdded) {
+    isComplexColumnAdded = complexColumnAdded;
+  }
+
+  public int getNewComplexColumnCount() {
+    return newComplexColumnCount;
+  }
+
+  public void setNewComplexColumnCount(int newComplexColumnCount) {
+    this.newComplexColumnCount = newComplexColumnCount;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 2f2cdfe..7fabe17 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -77,6 +77,7 @@ public class RestructureUtil {
     dimensionInfo.dataType = new DataType[queryDimensions.length + 
measureCount];
     int newDictionaryColumnCount = 0;
     int newNoDictionaryColumnCount = 0;
+    int newNoDictionaryComplexColumnCount = 0;
     // selecting only those dimension which is present in the query
     int dimIndex = 0;
     for (ProjectionDimension queryDimension : queryDimensions) {
@@ -140,6 +141,9 @@ public class RestructureUtil {
           if (queryDimension.getDimension().getDataType() == DataTypes.DATE) {
             dimensionInfo.setDictionaryColumnAdded(true);
             newDictionaryColumnCount++;
+          } else if 
(queryDimension.getDimension().getDataType().isComplexType()) {
+            dimensionInfo.setComplexColumnAdded(true);
+            newNoDictionaryComplexColumnCount++;
           } else {
             dimensionInfo.setNoDictionaryColumnAdded(true);
             newNoDictionaryColumnCount++;
@@ -150,6 +154,7 @@ public class RestructureUtil {
     }
     dimensionInfo.setNewDictionaryColumnCount(newDictionaryColumnCount);
     dimensionInfo.setNewNoDictionaryColumnCount(newNoDictionaryColumnCount);
+    dimensionInfo.setNewComplexColumnCount(newNoDictionaryComplexColumnCount);
     blockExecutionInfo.setDimensionInfo(dimensionInfo);
     return presentDimension;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 7b762ba..1b81950 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3481,4 +3481,23 @@ public final class CarbonUtil {
       });
     }
   }
+
+  public static void updateNullValueBasedOnDatatype(DataOutputStream 
dataOutputStream,
+      DataType dataType) throws IOException {
+    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      } else {
+        
dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      }
+      dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+    } else {
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      } else {
+        
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      }
+      dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+    }
+  }
 }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 9e499e7..ef8bf3d 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -45,7 +45,7 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table if exists complextable5")
   }
 
-  test("Test alter add array column before creating SI") {
+  test("Test restructured array<string> and existing string column as index 
columns on SI with compaction") {
     sql("drop table if exists complextable")
     sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
     sql("insert into complextable select 1,array('china', 'us'), 'b'")
@@ -60,11 +60,12 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(sql("select * from complextable where 
array_contains(arr2,'iron')"),
       Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
         mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
-    val result1 = sql("select * from complextable where 
array_contains(arr2,'iron')")
-    val result2 = sql("select * from complextable where arr2[0]='iron'")
-    sql("create index index_11 on table complextable(arr2) as 'carbondata'")
-    val df1 = sql(" select * from complextable where 
array_contains(arr2,'iron')")
-    val df2 = sql(" select * from complextable where arr2[0]='iron'")
+    val result1 = sql("select * from complextable where 
array_contains(arr2,'iron') and name='g'")
+    val result2 = sql("select * from complextable where arr2[0]='iron' and 
name='f'")
+    sql("create index index_11 on table complextable(arr2, name) as 
'carbondata'")
+    sql("alter table complextable compact 'minor'")
+    val df1 = sql(" select * from complextable where 
array_contains(arr2,'iron') and name='g'")
+    val df2 = sql(" select * from complextable where arr2[0]='iron' and 
name='f'")
     if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
       assert(false)
     } else {
@@ -85,7 +86,49 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(result2, df2)
   }
 
-  test("test array<string> on secondary index") {
+  test("Test restructured array<string> and string columns as index columns on 
SI with compaction") {
+    sql("drop table if exists complextable")
+    sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
+    sql("insert into complextable select 1,array('china', 'us'), 'b'")
+    sql("insert into complextable select 2,array('pak'), 'v'")
+
+    sql("drop index if exists index_11 on complextable")
+    sql(
+      "ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
+    sql("ALTER TABLE complextable ADD COLUMNS(addr string)")
+    sql("insert into complextable select 3,array('china'), 
'f',array('hello','world'),'china'")
+    sql("insert into complextable select 
4,array('India'),'g',array('iron','man','jarvis'),'India'")
+
+    checkAnswer(sql("select * from complextable where 
array_contains(arr2,'iron')"),
+      Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
+        mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
+    val result1 = sql("select * from complextable where 
array_contains(arr2,'iron') and addr='India'")
+    val result2 = sql("select * from complextable where arr2[0]='iron' and 
addr='china'")
+    sql("create index index_11 on table complextable(arr2, addr) as 
'carbondata'")
+    sql("alter table complextable compact 'minor'")
+    val df1 = sql(" select * from complextable where 
array_contains(arr2,'iron') and addr='India'")
+    val df2 = sql(" select * from complextable where arr2[0]='iron' and 
addr='china'")
+    if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    val doNotHitSIDf = sql(" select * from complextable where 
array_contains(arr2,'iron') and array_contains(arr2,'man')")
+    if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result1, df1)
+    checkAnswer(result2, df2)
+  }
+
+  test("test array<string> on secondary index with compaction") {
     sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
     sql("insert into complextable select 1,array('china', 'us'), 'b'")
     sql("insert into complextable select 2,array('pak'), 'v'")
@@ -95,6 +138,7 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     val result2 = sql(" select * from complextable where country[0]='china'")
     sql("drop index if exists index_1 on complextable")
     sql("create index index_1 on table complextable(country) as 'carbondata'")
+    sql("alter table complextable compact 'minor'")
     val df1 = sql(" select * from complextable where 
array_contains(country,'china')")
     val df2 = sql(" select * from complextable where country[0]='china'")
     if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
@@ -117,7 +161,7 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(result2, df2)
   }
 
-  test("test array<string> and string as index columns on secondary index") {
+  test("test array<string> and string as index columns on secondary index with 
compaction") {
     sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
     sql("insert into complextable select 1, array('china', 'us'), 'b'")
     sql("insert into complextable select 2, array('pak'), 'v'")
@@ -126,6 +170,7 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     val result = sql(" select * from complextable where 
array_contains(country,'china') and name='f'")
     sql("drop index if exists index_1 on complextable")
     sql("create index index_1 on table complextable(country, name) as 
'carbondata'")
+    sql("alter table complextable compact 'minor'")
     val df = sql(" select * from complextable where 
array_contains(country,'china') and name='f'")
     if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
       assert(false)
@@ -177,6 +222,39 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
       .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
   }
 
+  test("test SI global sort with si segment merge enabled for newly added 
complex column") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+    sql("create table complextable2 (id int, name string, country 
array<string>) stored as " +
+        "carbondata 
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable2 options" +
+      s"('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+      "'global_sort_partitions'='10')")
+    sql("ALTER TABLE complextable2 ADD COLUMNS(arr2 array<string>)")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array2.csv' into table 
complextable2 options" +
+      s"('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='id,name,country,arr2','complex_delimiter_level_1'='$',"
 +
+      "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable2 where 
array_contains(arr2,'iron')")
+    sql("drop index if exists index_2 on complextable")
+    sql("create index index_2 on table complextable2(arr2) as 'carbondata' 
properties" +
+        "('sort_scope'='global_sort') ")
+    checkAnswer(sql("select count(*) from complextable2 where 
array_contains(arr2,'iron')"),
+      sql("select count(*) from complextable2 where 
ni(array_contains(arr2,'iron'))"))
+    val df = sql(" select * from complextable2 where 
array_contains(arr2,'iron')")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+  }
+
   test("test SI global sort with si segment merge enabled for primitive data 
types") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 41a5c43..d8ff50f 100644
--- 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -277,15 +277,11 @@ public class SecondaryIndexQueryResultProcessor {
       CarbonDimension dims = dimensions.get(i);
       boolean isComplexColumn = false;
       // As complex column of MainTable is stored as its primitive type in SI,
-      // we need to check if dimension is complex dimension or not based on 
dimension
-      // name. Check if name exists in complexDimensionInfoMap of main table 
result
-      if (!complexDimensionInfoMap.isEmpty() && 
complexColumnParentBlockIndexes.length > 0) {
-        for (GenericQueryType queryType : complexDimensionInfoMap.values()) {
-          if (queryType.getName().equalsIgnoreCase(dims.getColName())) {
-            isComplexColumn = true;
-            break;
-          }
-        }
+      // we need to check if dimension is complex dimension or not based on 
isParentColumnComplex
+      // property.
+      if (dims.getColumnProperties() != null && Boolean
+          
.parseBoolean(dims.getColumnProperties().get("isParentColumnComplex"))) {
+        isComplexColumn = true;
       }
       // fill all the no dictionary and dictionary data to the prepared row 
first, fill the complex
       // flatten data to prepared row at last
@@ -293,7 +289,11 @@ public class SecondaryIndexQueryResultProcessor {
         // dictionary
         preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
       } else {
-        if (isComplexColumn) {
+        if (isComplexColumn && complexColumnParentBlockIndexes.length == 0) {
+          // After restructure some complex column will not be present in 
parent block.
+          // In such case, set the SI implicit row value to empty byte array.
+          preparedRow[i] = new byte[0];
+        } else if (isComplexColumn) {
           // get the flattened data of complex column
           byte[] complexKeyByIndex = 
wrapper.getComplexKeyByIndex(complexIndex);
           ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index b77677a..223bff1 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -680,18 +680,24 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       dataType: DataType = null): ColumnSchema = {
     val columnSchema = new ColumnSchema()
     val encodingList = parentColumnSchema.getEncodingList
+    var colPropMap = parentColumnSchema.getColumnProperties
     // if data type is arrayType, then store the column as its CHILD data type 
in SI
     if (DataTypes.isArrayType(parentColumnSchema.getDataType)) {
       columnSchema.setDataType(dataType)
+      if (colPropMap == null) {
+        colPropMap = new java.util.HashMap[String, String]()
+      }
+      colPropMap.put("isParentColumnComplex", "true")
+      columnSchema.setColumnProperties(colPropMap)
       if (dataType == DataTypes.DATE) {
         encodingList.add(Encoding.DIRECT_DICTIONARY)
         encodingList.add(Encoding.DICTIONARY)
       }
     } else {
       columnSchema.setDataType(parentColumnSchema.getDataType)
+      columnSchema.setColumnProperties(colPropMap)
     }
     columnSchema.setColumnName(parentColumnSchema.getColumnName)
-    columnSchema.setColumnProperties(parentColumnSchema.getColumnProperties)
     columnSchema.setEncodingList(encodingList)
     columnSchema.setColumnUniqueId(parentColumnSchema.getColumnUniqueId)
     columnSchema.setColumnReferenceId(parentColumnSchema.getColumnReferenceId)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index bc96996..eafbb3a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -195,7 +195,7 @@ object SecondaryIndexCreator {
                   mainTable,
                   projections.mkString(","),
                   Array(eachSegment),
-                  isPositionReferenceRequired = true)
+                  isPositionReferenceRequired = true, !explodeColumn.isEmpty)
                 // flatten the complex SI
                 if (explodeColumn.nonEmpty) {
                   val columns = dataFrame.schema.map { x =>
@@ -564,7 +564,8 @@ object SecondaryIndexCreator {
       carbonTable: CarbonTable,
       projections: String,
       segments: Array[String],
-      isPositionReferenceRequired: Boolean = false): DataFrame = {
+      isPositionReferenceRequired: Boolean = false,
+      isComplexTypeProjection: Boolean = false): DataFrame = {
     try {
       CarbonThreadUtil.threadSet(
         CarbonCommonConstants.CARBON_INPUT_SEGMENTS + 
carbonTable.getDatabaseName +
@@ -584,9 +585,9 @@ object SecondaryIndexCreator {
         case p: Project =>
           Project(p.projectList :+ positionId, p.child)
       }
-      val tableProperties = if (carbonTable.isHivePartitionTable) {
-        // in case of partition table, TableProperties object in carbonEnv is 
not same as
-        // in carbonTable object, so update from carbon env itself.
+      val tableProperties = if (carbonTable.isHivePartitionTable || 
isComplexTypeProjection) {
+        // in case of partition table and complex type projection, 
TableProperties object in
+        // carbonEnv is not same as in carbonTable object, so update from 
carbon env itself.
         CarbonEnv.getCarbonTable(Some(carbonTable.getDatabaseName), 
carbonTable.getTableName)(
           sparkSession).getTableInfo
           .getFactTable
diff --git a/integration/spark/src/test/resources/secindex/array2.csv 
b/integration/spark/src/test/resources/secindex/array2.csv
new file mode 100644
index 0000000..e453846
--- /dev/null
+++ b/integration/spark/src/test/resources/secindex/array2.csv
@@ -0,0 +1,4 @@
+1,abc,china$india$us,hello$world
+2,xyz,sri$can,iron$man$jarvis
+3,mno,rus$china,ex$ex2
+4,lok,hk$bang,ex$ex3
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index b4504e1..48d9bed 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -214,6 +214,24 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS alter_com")
   }
 
+  test("Test alter add complex type and compaction") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql("create table alter_com (a int, b string, arr1 array<string>) stored 
as carbondata")
+    sql("insert into alter_com select 1,'a',array('hi')")
+    sql("insert into alter_com select 2,'b',array('hello','world')")
+    sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
+    sql("insert into alter_com select 3,'c',array('hi'),null")
+    sql("insert into alter_com select 
4,'d',array('hi'),named_struct('s1',4,'s2','d')")
+    sql("alter table alter_com compact 'minor'")
+    checkAnswer(sql("""Select count(*) from alter_com"""), Seq(Row(4)))
+    checkAnswer(sql("Select * from alter_com"),
+      Seq(Row(1, "a", mutable.WrappedArray.make(Array("hi")), null),
+        Row(2, "b", mutable.WrappedArray.make(Array("hello", "world")), null),
+        Row(3, "c", mutable.WrappedArray.make(Array("hi")), null),
+        Row(4, "d", mutable.WrappedArray.make(Array("hi")), Row(4, "d"))))
+    sql("DROP TABLE IF EXISTS alter_com")
+  }
+
   def insertIntoTableForArrayType(): Unit = {
     sql("insert into alter_com 
values(4,array(2),array(1,2,3,4),array('abc','def'))")
     sql("insert into alter_com values(5,array(1,2),array(1), 
array('Hulk','Thor'))")
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 1a09a5d..b0a4263 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -414,22 +414,7 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
 
   private void updateNullValue(DataOutputStream dataOutputStream, 
BadRecordLogHolder logHolder)
       throws IOException {
-    if (this.carbonDimension.getDataType() == DataTypes.STRING
-        || this.carbonDimension.getDataType() == DataTypes.VARCHAR) {
-      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-        
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
-      } else {
-        
dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
-      }
-      dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
-    } else {
-      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-        
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
-      } else {
-        
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
-      }
-      dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
-    }
+    CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream, 
this.carbonDimension.getDataType());
     String message = 
logHolder.getColumnMessageMap().get(carbonDimension.getColName());
     if (null == message) {
       message = CarbonDataProcessorUtil

Reply via email to