ShreelekhyaG commented on a change in pull request #4142:
URL: https://github.com/apache/carbondata/pull/4142#discussion_r644500344



##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
##########
@@ -140,6 +141,9 @@
           if (queryDimension.getDimension().getDataType() == DataTypes.DATE) {
             dimensionInfo.setDictionaryColumnAdded(true);
             newDictionaryColumnCount++;
+          } else if 
(queryDimension.getDimension().getDataType().isComplexType()) {
+            dimensionInfo.setComplexColumnAdded(true);
+            newComplexDictColumnCount++;

Review comment:
       Done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -111,36 +123,72 @@ private void 
fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
   }
 
   /**
-   * This method will fill the no dictionary byte array with newly added no 
dictionary columns
+   * This method will fill the no dictionary and complex byte array with newly 
added columns
    *
    * @param rows
    * @return
    */
-  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> 
rows) {
+  private void 
fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
     for (Object[] row : rows) {
       ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
       byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
       ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
       byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
           new byte[noDictKeyArray.length + 
dimensionInfo.getNewNoDictionaryColumnCount()][];
+      byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+          new byte[complexTypesKeyArray.length + 
dimensionInfo.getNewComplexColumnCount()][];
       int existingColumnValueIndex = 0;
       int newKeyArrayIndex = 0;
+      int existingComplexColumnValueIndex = 0;
+      int newComplexKeyArrayIndex = 0;
       for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
         if (actualQueryDimensions[i].getDimension().getDataType() != 
DataTypes.DATE
             && 
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          DataType currDataType = 
actualQueryDimensions[i].getDimension().getDataType();
           // if dimension exists then add the byte array value else add the 
default value
           if (dimensionInfo.getDimensionExists()[i]) {
-            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-                noDictKeyArray[existingColumnValueIndex++];
+            if (currDataType.isComplexType()) {
+              
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+                  complexTypesKeyArray[existingComplexColumnValueIndex++];
+            } else {
+              noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                  noDictKeyArray[existingColumnValueIndex++];
+            }
           } else {
             byte[] newColumnDefaultValue = null;
             Object defaultValue = dimensionInfo.getDefaultValues()[i];
             if (null != defaultValue) {
               newColumnDefaultValue = (byte[]) defaultValue;
-            } else if (actualQueryDimensions[i].getDimension().getDataType() 
== DataTypes.STRING) {
+            } else if (currDataType == DataTypes.STRING) {
               newColumnDefaultValue =
                   
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
                       CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else if (currDataType.isComplexType()) {
+              // Iterate over child dimensions and add its default value.
+              List<CarbonDimension> children =
+                  
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+              ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+              DataOutputStream dataOutputStream = new 
DataOutputStream(byteStream);
+              try {
+                if (DataTypes.isArrayType(currDataType)) {
+                  dataOutputStream.writeInt(1);
+                } else if (DataTypes.isStructType(currDataType)) {
+                  dataOutputStream.writeShort(children.size());
+                }
+                for (int j = 0; j < children.size(); j++) {
+                  // update default null values based on datatype
+                  updateNullValue(dataOutputStream, 
children.get(j).getDataType());
+                }
+                dataOutputStream.close();
+              } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+                e.printStackTrace();

Review comment:
       ok

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -111,36 +123,72 @@ private void 
fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
   }
 
   /**
-   * This method will fill the no dictionary byte array with newly added no 
dictionary columns
+   * This method will fill the no dictionary and complex byte array with newly 
added columns
    *
    * @param rows
    * @return
    */
-  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> 
rows) {
+  private void 
fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
     for (Object[] row : rows) {
       ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
       byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
       ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
       byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
           new byte[noDictKeyArray.length + 
dimensionInfo.getNewNoDictionaryColumnCount()][];
+      byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+          new byte[complexTypesKeyArray.length + 
dimensionInfo.getNewComplexColumnCount()][];
       int existingColumnValueIndex = 0;
       int newKeyArrayIndex = 0;
+      int existingComplexColumnValueIndex = 0;
+      int newComplexKeyArrayIndex = 0;
       for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
         if (actualQueryDimensions[i].getDimension().getDataType() != 
DataTypes.DATE
             && 
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          DataType currDataType = 
actualQueryDimensions[i].getDimension().getDataType();
           // if dimension exists then add the byte array value else add the 
default value
           if (dimensionInfo.getDimensionExists()[i]) {
-            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-                noDictKeyArray[existingColumnValueIndex++];
+            if (currDataType.isComplexType()) {
+              
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+                  complexTypesKeyArray[existingComplexColumnValueIndex++];
+            } else {
+              noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                  noDictKeyArray[existingColumnValueIndex++];
+            }
           } else {
             byte[] newColumnDefaultValue = null;
             Object defaultValue = dimensionInfo.getDefaultValues()[i];
             if (null != defaultValue) {
               newColumnDefaultValue = (byte[]) defaultValue;
-            } else if (actualQueryDimensions[i].getDimension().getDataType() 
== DataTypes.STRING) {
+            } else if (currDataType == DataTypes.STRING) {
               newColumnDefaultValue =
                   
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
                       CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else if (currDataType.isComplexType()) {
+              // Iterate over child dimensions and add its default value.
+              List<CarbonDimension> children =
+                  
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+              ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+              DataOutputStream dataOutputStream = new 
DataOutputStream(byteStream);
+              try {
+                if (DataTypes.isArrayType(currDataType)) {
+                  dataOutputStream.writeInt(1);
+                } else if (DataTypes.isStructType(currDataType)) {
+                  dataOutputStream.writeShort(children.size());
+                }
+                for (int j = 0; j < children.size(); j++) {
+                  // update default null values based on datatype
+                  updateNullValue(dataOutputStream, 
children.get(j).getDataType());
+                }
+                dataOutputStream.close();
+              } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+                e.printStackTrace();
+              }
+              newColumnDefaultValue = byteStream.toByteArray();

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -149,6 +197,27 @@ private void 
fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows)
         }
       }
       
byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
+      
byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
+    }
+  }
+
+  private void updateNullValue(DataOutputStream dataOutputStream, DataType 
dimensionDataType)
+      throws IOException {
+    if (dimensionDataType == DataTypes.STRING

Review comment:
       done

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
##########
@@ -214,6 +214,19 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS alter_com")
   }
 
+  test("Test alter add complex type and compaction") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql("create table alter_com (a int, b string, arr1 array<string>) stored 
as carbondata")
+    sql("insert into alter_com select 1,'a',array('hi')")
+    sql("insert into alter_com select 2,'b',array('hello','world')")
+    sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
+    sql("insert into alter_com select 
3,'c',array('hi'),named_struct('s1',4,'s2','d')")
+    sql("insert into alter_com select 
4,'d',array('hi'),named_struct('s1',4,'s2','d')")
+    sql("alter table alter_com compact 'minor'")
+    checkAnswer(sql("""Select count(*) from alter_com"""), Seq(Row(4)))

Review comment:
       done

##########
File path: 
integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
##########
@@ -293,26 +300,32 @@ private void processResult(List<CarbonIterator<RowBatch>> 
detailQueryResultItera
         // dictionary
         preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
       } else {
-        if (isComplexColumn) {
-          // get the flattened data of complex column
-          byte[] complexKeyByIndex = 
wrapper.getComplexKeyByIndex(complexIndex);
-          ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
-          GenericQueryType genericQueryType =
-              
complexDimensionInfoMap.get(complexColumnParentBlockIndexes[complexIndex++]);
-          int complexDataLength = byteArrayInput.getShort(2);
-          // In case, if array is empty
-          if (complexDataLength == 0) {
-            complexDataLength = complexDataLength + 1;
-          }
-          // get flattened array data
-          Object[] complexFlattenedData = new Object[complexDataLength];
-          Object[] data = 
genericQueryType.getObjectArrayDataBasedOnDataType(byteArrayInput);
-          for (int index = 0; index < complexDataLength; index++) {
-            complexFlattenedData[index] =
-                getData(data, index, dims.getColumnSchema().getDataType());
+        if (isComplexColumn || isComplexColumnAdded) {
+          if (complexColumnParentBlockIndexes.length > 0) {
+            // get the flattened data of complex column
+            byte[] complexKeyByIndex = 
wrapper.getComplexKeyByIndex(complexIndex);
+            ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
+            GenericQueryType genericQueryType =
+                
complexDimensionInfoMap.get(complexColumnParentBlockIndexes[complexIndex++]);
+            int complexDataLength = byteArrayInput.getShort(2);
+            // In case, if array is empty
+            if (complexDataLength == 0) {
+              complexDataLength = complexDataLength + 1;
+            }
+            // get flattened array data
+            Object[] complexFlattenedData = new Object[complexDataLength];
+            Object[] data = 
genericQueryType.getObjectArrayDataBasedOnDataType(byteArrayInput);
+            for (int index = 0; index < complexDataLength; index++) {
+              complexFlattenedData[index] =
+                  getData(data, index, dims.getColumnSchema().getDataType());
+            }
+            // store the dimesnion column index and the complex column 
flattened data to a map
+            complexDataMap.put(i, complexFlattenedData);
+          } else {
+            // when complex column is added after restructuring and
+            // column not present in parent block, let the SI row value be 
empty.

Review comment:
       ok, edited comment.

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
##########
@@ -214,6 +214,19 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS alter_com")
   }
 
+  test("Test alter add complex type and compaction") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql("create table alter_com (a int, b string, arr1 array<string>) stored 
as carbondata")
+    sql("insert into alter_com select 1,'a',array('hi')")
+    sql("insert into alter_com select 2,'b',array('hello','world')")
+    sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
+    sql("insert into alter_com select 
3,'c',array('hi'),named_struct('s1',4,'s2','d')")
+    sql("insert into alter_com select 
4,'d',array('hi'),named_struct('s1',4,'s2','d')")
+    sql("alter table alter_com compact 'minor'")
+    checkAnswer(sql("""Select count(*) from alter_com"""), Seq(Row(4)))
+    sql("DROP TABLE IF EXISTS alter_com")
+  }
+

Review comment:
       Following test case already exists with such a scenario: `Test alter add 
array column before creating SI`. It was not failing before in CI because added 
complex column details were present in `noDictionaryKeys` (supposed to be in 
`complexKeys`) and row is prepared from that empty byte array. Now as I have 
made changes to set `complexKeys `properly after restructuring, the testcase is 
failing as it's not able to identify as complex type and trying to prepare row 
from `noDictionaryKeys` array which is empty. The changes for SI now made in 
`prepareRowObjectForSorting ` fixes the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to