ShreelekhyaG commented on a change in pull request #4142:
URL: https://github.com/apache/carbondata/pull/4142#discussion_r644500344
##########
File path:
core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
##########
@@ -140,6 +141,9 @@
if (queryDimension.getDimension().getDataType() == DataTypes.DATE) {
dimensionInfo.setDictionaryColumnAdded(true);
newDictionaryColumnCount++;
+ } else if
(queryDimension.getDimension().getDataType().isComplexType()) {
+ dimensionInfo.setComplexColumnAdded(true);
+ newComplexDictColumnCount++;
Review comment:
Done
##########
File path:
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -111,36 +123,72 @@ private void
fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
}
/**
- * This method will fill the no dictionary byte array with newly added no
dictionary columns
+ * This method will fill the no dictionary and complex byte array with newly
added columns
*
* @param rows
* @return
*/
- private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]>
rows) {
+ private void
fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
for (Object[] row : rows) {
ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+ byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
ProjectionDimension[] actualQueryDimensions =
executionInfo.getActualQueryDimensions();
byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
new byte[noDictKeyArray.length +
dimensionInfo.getNewNoDictionaryColumnCount()][];
+ byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+ new byte[complexTypesKeyArray.length +
dimensionInfo.getNewComplexColumnCount()][];
int existingColumnValueIndex = 0;
int newKeyArrayIndex = 0;
+ int existingComplexColumnValueIndex = 0;
+ int newComplexKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (actualQueryDimensions[i].getDimension().getDataType() !=
DataTypes.DATE
&&
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+ DataType currDataType =
actualQueryDimensions[i].getDimension().getDataType();
// if dimension exists then add the byte array value else add the
default value
if (dimensionInfo.getDimensionExists()[i]) {
- noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
- noDictKeyArray[existingColumnValueIndex++];
+ if (currDataType.isComplexType()) {
+
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+ complexTypesKeyArray[existingComplexColumnValueIndex++];
+ } else {
+ noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+ noDictKeyArray[existingColumnValueIndex++];
+ }
} else {
byte[] newColumnDefaultValue = null;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
newColumnDefaultValue = (byte[]) defaultValue;
- } else if (actualQueryDimensions[i].getDimension().getDataType()
== DataTypes.STRING) {
+ } else if (currDataType == DataTypes.STRING) {
newColumnDefaultValue =
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+ } else if (currDataType.isComplexType()) {
+ // Iterate over child dimensions and add its default value.
+ List<CarbonDimension> children =
+
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteStream);
+ try {
+ if (DataTypes.isArrayType(currDataType)) {
+ dataOutputStream.writeInt(1);
+ } else if (DataTypes.isStructType(currDataType)) {
+ dataOutputStream.writeShort(children.size());
+ }
+ for (int j = 0; j < children.size(); j++) {
+ // update default null values based on datatype
+ updateNullValue(dataOutputStream,
children.get(j).getDataType());
+ }
+ dataOutputStream.close();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ e.printStackTrace();
Review comment:
ok
##########
File path:
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -111,36 +123,72 @@ private void
fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
}
/**
- * This method will fill the no dictionary byte array with newly added no
dictionary columns
+ * This method will fill the no dictionary and complex byte array with newly
added columns
*
* @param rows
* @return
*/
- private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]>
rows) {
+ private void
fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
for (Object[] row : rows) {
ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+ byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
ProjectionDimension[] actualQueryDimensions =
executionInfo.getActualQueryDimensions();
byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
new byte[noDictKeyArray.length +
dimensionInfo.getNewNoDictionaryColumnCount()][];
+ byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+ new byte[complexTypesKeyArray.length +
dimensionInfo.getNewComplexColumnCount()][];
int existingColumnValueIndex = 0;
int newKeyArrayIndex = 0;
+ int existingComplexColumnValueIndex = 0;
+ int newComplexKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (actualQueryDimensions[i].getDimension().getDataType() !=
DataTypes.DATE
&&
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+ DataType currDataType =
actualQueryDimensions[i].getDimension().getDataType();
// if dimension exists then add the byte array value else add the
default value
if (dimensionInfo.getDimensionExists()[i]) {
- noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
- noDictKeyArray[existingColumnValueIndex++];
+ if (currDataType.isComplexType()) {
+
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+ complexTypesKeyArray[existingComplexColumnValueIndex++];
+ } else {
+ noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+ noDictKeyArray[existingColumnValueIndex++];
+ }
} else {
byte[] newColumnDefaultValue = null;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
newColumnDefaultValue = (byte[]) defaultValue;
- } else if (actualQueryDimensions[i].getDimension().getDataType()
== DataTypes.STRING) {
+ } else if (currDataType == DataTypes.STRING) {
newColumnDefaultValue =
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+ } else if (currDataType.isComplexType()) {
+ // Iterate over child dimensions and add its default value.
+ List<CarbonDimension> children =
+
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteStream);
+ try {
+ if (DataTypes.isArrayType(currDataType)) {
+ dataOutputStream.writeInt(1);
+ } else if (DataTypes.isStructType(currDataType)) {
+ dataOutputStream.writeShort(children.size());
+ }
+ for (int j = 0; j < children.size(); j++) {
+ // update default null values based on datatype
+ updateNullValue(dataOutputStream,
children.get(j).getDataType());
+ }
+ dataOutputStream.close();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
+ newColumnDefaultValue = byteStream.toByteArray();
Review comment:
done
##########
File path:
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -149,6 +197,27 @@ private void
fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows)
}
}
byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
+
byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
+ }
+ }
+
+ private void updateNullValue(DataOutputStream dataOutputStream, DataType
dimensionDataType)
+ throws IOException {
+ if (dimensionDataType == DataTypes.STRING
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
##########
@@ -214,6 +214,19 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_com")
}
+ test("Test alter add complex type and compaction") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("create table alter_com (a int, b string, arr1 array<string>) stored
as carbondata")
+ sql("insert into alter_com select 1,'a',array('hi')")
+ sql("insert into alter_com select 2,'b',array('hello','world')")
+ sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
+ sql("insert into alter_com select
3,'c',array('hi'),named_struct('s1',4,'s2','d')")
+ sql("insert into alter_com select
4,'d',array('hi'),named_struct('s1',4,'s2','d')")
+ sql("alter table alter_com compact 'minor'")
+ checkAnswer(sql("""Select count(*) from alter_com"""), Seq(Row(4)))
Review comment:
done
##########
File path:
integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
##########
@@ -293,26 +300,32 @@ private void processResult(List<CarbonIterator<RowBatch>>
detailQueryResultItera
// dictionary
preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
} else {
- if (isComplexColumn) {
- // get the flattened data of complex column
- byte[] complexKeyByIndex =
wrapper.getComplexKeyByIndex(complexIndex);
- ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
- GenericQueryType genericQueryType =
-
complexDimensionInfoMap.get(complexColumnParentBlockIndexes[complexIndex++]);
- int complexDataLength = byteArrayInput.getShort(2);
- // In case, if array is empty
- if (complexDataLength == 0) {
- complexDataLength = complexDataLength + 1;
- }
- // get flattened array data
- Object[] complexFlattenedData = new Object[complexDataLength];
- Object[] data =
genericQueryType.getObjectArrayDataBasedOnDataType(byteArrayInput);
- for (int index = 0; index < complexDataLength; index++) {
- complexFlattenedData[index] =
- getData(data, index, dims.getColumnSchema().getDataType());
+ if (isComplexColumn || isComplexColumnAdded) {
+ if (complexColumnParentBlockIndexes.length > 0) {
+ // get the flattened data of complex column
+ byte[] complexKeyByIndex =
wrapper.getComplexKeyByIndex(complexIndex);
+ ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
+ GenericQueryType genericQueryType =
+
complexDimensionInfoMap.get(complexColumnParentBlockIndexes[complexIndex++]);
+ int complexDataLength = byteArrayInput.getShort(2);
+ // In case, if array is empty
+ if (complexDataLength == 0) {
+ complexDataLength = complexDataLength + 1;
+ }
+ // get flattened array data
+ Object[] complexFlattenedData = new Object[complexDataLength];
+ Object[] data =
genericQueryType.getObjectArrayDataBasedOnDataType(byteArrayInput);
+ for (int index = 0; index < complexDataLength; index++) {
+ complexFlattenedData[index] =
+ getData(data, index, dims.getColumnSchema().getDataType());
+ }
+ // store the dimesnion column index and the complex column
flattened data to a map
+ complexDataMap.put(i, complexFlattenedData);
+ } else {
+ // when complex column is added after restructuring and
+ // column not present in parent block, let the SI row value be
empty.
Review comment:
ok, edited comment.
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
##########
@@ -214,6 +214,19 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_com")
}
+ test("Test alter add complex type and compaction") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("create table alter_com (a int, b string, arr1 array<string>) stored
as carbondata")
+ sql("insert into alter_com select 1,'a',array('hi')")
+ sql("insert into alter_com select 2,'b',array('hello','world')")
+ sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
+ sql("insert into alter_com select
3,'c',array('hi'),named_struct('s1',4,'s2','d')")
+ sql("insert into alter_com select
4,'d',array('hi'),named_struct('s1',4,'s2','d')")
+ sql("alter table alter_com compact 'minor'")
+ checkAnswer(sql("""Select count(*) from alter_com"""), Seq(Row(4)))
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
Review comment:
Following test case already exists with such a scenario: `Test alter add
array column before creating SI`. It was not failing before in CI because added
complex column details were present in `noDictionaryKeys` (supposed to be in
`complexKeys`) and row is prepared from that empty byte array. Now as I have
made changes to set `complexKeys `properly after restructuring, the testcase is
failing as it's not able to identify as complex type and trying to prepare row
from `noDictionaryKeys` array which is empty. The changes for SI now made in
`prepareRowObjectForSorting ` fixes the issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]