This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new fee8b18 [CARBONDATA-4193] Fix compaction failure after alter add
complex column.
fee8b18 is described below
commit fee8b1822ecf238e9639fa51531e420181c1e065
Author: ShreelekhyaG <[email protected]>
AuthorDate: Wed May 26 19:14:40 2021 +0530
[CARBONDATA-4193] Fix compaction failure after alter add complex column.
Why is this PR needed?
1. When we perform compaction after alter add a complex column, the query
fails with
ArrayIndexOutOfBounds exception. While converting and adding row after
merge step
in WriteStepRowUtil.fromMergerRow, As complex dimension is present, the
complexKeys
array is accessed but doesnt have any values in array and throws
exception.
2. Creating SI with globalsort on newly added complex column throws
TreenodeException
(Caused by: java.lang.RuntimeException: Couldn't find positionId#172 in
[arr2#153])
What changes were proposed in this PR?
1. While restructuring row, added changes to fill complexKeys with default
values(null
values to children) according to the latest schema.
In SI queryresultprocessor, used the column property
isParentColumnComplex to identify
any complex type. If complex index column not present in the parent
table block,
assigned the SI row value to empty bytes.
2. For SI with globalsort, In case of complex type projection,
TableProperties object in
carbonEnv is not same as in carbonTable object and hence requiredColumns
is not
updated with positionId. So updating tableproperties from carbon env
itself.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4142
---
.../impl/RestructureBasedRawResultCollector.java | 63 +++++++++++++--
.../core/scan/executor/infos/DimensionInfo.java | 27 +++++++
.../core/scan/executor/util/RestructureUtil.java | 5 ++
.../apache/carbondata/core/util/CarbonUtil.java | 19 +++++
.../TestSIWithComplexArrayType.scala | 94 ++++++++++++++++++++--
.../query/SecondaryIndexQueryResultProcessor.java | 20 ++---
.../secondaryindex/command/SICreationCommand.scala | 8 +-
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 11 +--
.../spark/src/test/resources/secindex/array2.csv | 4 +
.../alterTable/TestAlterTableAddColumns.scala | 18 +++++
.../processing/datatypes/PrimitiveDataType.java | 17 +---
11 files changed, 239 insertions(+), 47 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index b2f5c07..80fb3c3 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -17,12 +17,18 @@
package org.apache.carbondata.core.scan.collector.impl;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -31,13 +37,19 @@ import
org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.log4j.Logger;
+
/**
* It is not a collector it is just a scanned result holder.
*/
public class RestructureBasedRawResultCollector extends
RawBasedResultCollector {
+ private static final Logger LOGGER =
+
LogServiceFactory.getLogService(RestructureBasedRawResultCollector.class.getName());
+
public RestructureBasedRawResultCollector(BlockExecutionInfo
blockExecutionInfos) {
super(blockExecutionInfos);
}
@@ -57,8 +69,9 @@ public class RestructureBasedRawResultCollector extends
RawBasedResultCollector
if (dimensionInfo.isDictionaryColumnAdded()) {
fillDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
}
- if (dimensionInfo.isNoDictionaryColumnAdded()) {
- fillNoDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
+ if (dimensionInfo.isNoDictionaryColumnAdded() || dimensionInfo
+ .isComplexColumnAdded()) {
+ fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(listBasedResult);
}
QueryStatistic resultPrepTime =
queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.RESULT_PREP_TIME);
@@ -111,36 +124,71 @@ public class RestructureBasedRawResultCollector extends
RawBasedResultCollector
}
/**
- * This method will fill the no dictionary byte array with newly added no
dictionary columns
+ * This method will fill the no dictionary and complex byte array with newly
added columns
*
* @param rows
* @return
*/
- private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]>
rows) {
+ private void
fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
for (Object[] row : rows) {
ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+ byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
ProjectionDimension[] actualQueryDimensions =
executionInfo.getActualQueryDimensions();
byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
new byte[noDictKeyArray.length +
dimensionInfo.getNewNoDictionaryColumnCount()][];
+ byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+ new byte[complexTypesKeyArray.length +
dimensionInfo.getNewComplexColumnCount()][];
int existingColumnValueIndex = 0;
int newKeyArrayIndex = 0;
+ int existingComplexColumnValueIndex = 0;
+ int newComplexKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (actualQueryDimensions[i].getDimension().getDataType() !=
DataTypes.DATE
&&
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+ DataType currDataType =
actualQueryDimensions[i].getDimension().getDataType();
// if dimension exists then add the byte array value else add the
default value
if (dimensionInfo.getDimensionExists()[i]) {
- noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
- noDictKeyArray[existingColumnValueIndex++];
+ if (currDataType.isComplexType()) {
+
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+ complexTypesKeyArray[existingComplexColumnValueIndex++];
+ } else {
+ noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+ noDictKeyArray[existingColumnValueIndex++];
+ }
} else {
byte[] newColumnDefaultValue = null;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
newColumnDefaultValue = (byte[]) defaultValue;
- } else if (actualQueryDimensions[i].getDimension().getDataType()
== DataTypes.STRING) {
+ } else if (currDataType == DataTypes.STRING) {
newColumnDefaultValue =
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+ } else if (currDataType.isComplexType()) {
+ // Iterate over child dimensions and add its default value.
+ List<CarbonDimension> children =
+
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+ try (ByteArrayOutputStream byteStream = new
ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteStream)) {
+ if (DataTypes.isArrayType(currDataType)) {
+ dataOutputStream.writeInt(1);
+ } else if (DataTypes.isStructType(currDataType)) {
+ dataOutputStream.writeShort(children.size());
+ }
+ for (int j = 0; j < children.size(); j++) {
+ // update default null values based on datatype
+ CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
+ children.get(j).getDataType());
+ }
+ newColumnDefaultValue = byteStream.toByteArray();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+ newColumnDefaultValue;
+ continue;
} else {
newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
}
@@ -149,6 +197,7 @@ public class RestructureBasedRawResultCollector extends
RawBasedResultCollector
}
}
byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
+
byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
}
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
index b41de82..0a9820f 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DimensionInfo.java
@@ -56,6 +56,17 @@ public class DimensionInfo {
* count of no dictionary columns not existing in the current block
*/
private int newNoDictionaryColumnCount;
+
+ /**
+ * count of complex columns not existing in the current block
+ */
+ private int newComplexColumnCount;
+
+ /**
+ * flag to check whether there exist a complex column in the query which
+ * does not exist in the current block
+ */
+ private boolean isComplexColumnAdded;
/**
* maintains the block datatype
*/
@@ -115,4 +126,20 @@ public class DimensionInfo {
public void setNewNoDictionaryColumnCount(int newNoDictionaryColumnCount) {
this.newNoDictionaryColumnCount = newNoDictionaryColumnCount;
}
+
+ public boolean isComplexColumnAdded() {
+ return isComplexColumnAdded;
+ }
+
+ public void setComplexColumnAdded(boolean complexColumnAdded) {
+ isComplexColumnAdded = complexColumnAdded;
+ }
+
+ public int getNewComplexColumnCount() {
+ return newComplexColumnCount;
+ }
+
+ public void setNewComplexColumnCount(int newComplexColumnCount) {
+ this.newComplexColumnCount = newComplexColumnCount;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 2f2cdfe..7fabe17 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -77,6 +77,7 @@ public class RestructureUtil {
dimensionInfo.dataType = new DataType[queryDimensions.length +
measureCount];
int newDictionaryColumnCount = 0;
int newNoDictionaryColumnCount = 0;
+ int newNoDictionaryComplexColumnCount = 0;
// selecting only those dimension which is present in the query
int dimIndex = 0;
for (ProjectionDimension queryDimension : queryDimensions) {
@@ -140,6 +141,9 @@ public class RestructureUtil {
if (queryDimension.getDimension().getDataType() == DataTypes.DATE) {
dimensionInfo.setDictionaryColumnAdded(true);
newDictionaryColumnCount++;
+ } else if
(queryDimension.getDimension().getDataType().isComplexType()) {
+ dimensionInfo.setComplexColumnAdded(true);
+ newNoDictionaryComplexColumnCount++;
} else {
dimensionInfo.setNoDictionaryColumnAdded(true);
newNoDictionaryColumnCount++;
@@ -150,6 +154,7 @@ public class RestructureUtil {
}
dimensionInfo.setNewDictionaryColumnCount(newDictionaryColumnCount);
dimensionInfo.setNewNoDictionaryColumnCount(newNoDictionaryColumnCount);
+ dimensionInfo.setNewComplexColumnCount(newNoDictionaryComplexColumnCount);
blockExecutionInfo.setDimensionInfo(dimensionInfo);
return presentDimension;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 7b762ba..1b81950 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3481,4 +3481,23 @@ public final class CarbonUtil {
});
}
}
+
+ public static void updateNullValueBasedOnDatatype(DataOutputStream
dataOutputStream,
+ DataType dataType) throws IOException {
+ if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+ if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+ } else {
+
dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+ }
+ dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+ } else {
+ if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+ } else {
+
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+ }
+ dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+ }
+ }
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 9e499e7..ef8bf3d 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -45,7 +45,7 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
sql("drop table if exists complextable5")
}
- test("Test alter add array column before creating SI") {
+ test("Test restructured array<string> and existing string column as index
columns on SI with compaction") {
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
@@ -60,11 +60,12 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
- val result1 = sql("select * from complextable where
array_contains(arr2,'iron')")
- val result2 = sql("select * from complextable where arr2[0]='iron'")
- sql("create index index_11 on table complextable(arr2) as 'carbondata'")
- val df1 = sql(" select * from complextable where
array_contains(arr2,'iron')")
- val df2 = sql(" select * from complextable where arr2[0]='iron'")
+ val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and name='g'")
+ val result2 = sql("select * from complextable where arr2[0]='iron' and
name='f'")
+ sql("create index index_11 on table complextable(arr2, name) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df1 = sql(" select * from complextable where
array_contains(arr2,'iron') and name='g'")
+ val df2 = sql(" select * from complextable where arr2[0]='iron' and
name='f'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
@@ -85,7 +86,49 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
checkAnswer(result2, df2)
}
- test("test array<string> on secondary index") {
+ test("Test restructured array<string> and string columns as index columns on
SI with compaction") {
+ sql("drop table if exists complextable")
+ sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
+ sql("insert into complextable select 1,array('china', 'us'), 'b'")
+ sql("insert into complextable select 2,array('pak'), 'v'")
+
+ sql("drop index if exists index_11 on complextable")
+ sql(
+ "ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
+ sql("ALTER TABLE complextable ADD COLUMNS(addr string)")
+ sql("insert into complextable select 3,array('china'),
'f',array('hello','world'),'china'")
+ sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis'),'India'")
+
+ checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
+ Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
+ mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
+ val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and addr='India'")
+ val result2 = sql("select * from complextable where arr2[0]='iron' and
addr='china'")
+ sql("create index index_11 on table complextable(arr2, addr) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df1 = sql(" select * from complextable where
array_contains(arr2,'iron') and addr='India'")
+ val df2 = sql(" select * from complextable where arr2[0]='iron' and
addr='china'")
+ if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ val doNotHitSIDf = sql(" select * from complextable where
array_contains(arr2,'iron') and array_contains(arr2,'man')")
+ if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result1, df1)
+ checkAnswer(result2, df2)
+ }
+
+ test("test array<string> on secondary index with compaction") {
sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")
@@ -95,6 +138,7 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
val result2 = sql(" select * from complextable where country[0]='china'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country) as 'carbondata'")
+ sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where
array_contains(country,'china')")
val df2 = sql(" select * from complextable where country[0]='china'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
@@ -117,7 +161,7 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
checkAnswer(result2, df2)
}
- test("test array<string> and string as index columns on secondary index") {
+ test("test array<string> and string as index columns on secondary index with
compaction") {
sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
sql("insert into complextable select 1, array('china', 'us'), 'b'")
sql("insert into complextable select 2, array('pak'), 'v'")
@@ -126,6 +170,7 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
val result = sql(" select * from complextable where
array_contains(country,'china') and name='f'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country, name) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
val df = sql(" select * from complextable where
array_contains(country,'china') and name='f'")
if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
assert(false)
@@ -177,6 +222,39 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
}
+ test("test SI global sort with si segment merge enabled for newly added
complex column") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+ sql("create table complextable2 (id int, name string, country
array<string>) stored as " +
+ "carbondata
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable2 options" +
+ s"('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ sql("ALTER TABLE complextable2 ADD COLUMNS(arr2 array<string>)")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array2.csv' into table
complextable2 options" +
+ s"('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country,arr2','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ val result = sql(" select * from complextable2 where
array_contains(arr2,'iron')")
+ sql("drop index if exists index_2 on complextable")
+ sql("create index index_2 on table complextable2(arr2) as 'carbondata'
properties" +
+ "('sort_scope'='global_sort') ")
+ checkAnswer(sql("select count(*) from complextable2 where
array_contains(arr2,'iron')"),
+ sql("select count(*) from complextable2 where
ni(array_contains(arr2,'iron'))"))
+ val df = sql(" select * from complextable2 where
array_contains(arr2,'iron')")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result, df)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ }
+
test("test SI global sort with si segment merge enabled for primitive data
types") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
diff --git
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 41a5c43..d8ff50f 100644
---
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -277,15 +277,11 @@ public class SecondaryIndexQueryResultProcessor {
CarbonDimension dims = dimensions.get(i);
boolean isComplexColumn = false;
// As complex column of MainTable is stored as its primitive type in SI,
- // we need to check if dimension is complex dimension or not based on
dimension
- // name. Check if name exists in complexDimensionInfoMap of main table
result
- if (!complexDimensionInfoMap.isEmpty() &&
complexColumnParentBlockIndexes.length > 0) {
- for (GenericQueryType queryType : complexDimensionInfoMap.values()) {
- if (queryType.getName().equalsIgnoreCase(dims.getColName())) {
- isComplexColumn = true;
- break;
- }
- }
+ // we need to check if dimension is complex dimension or not based on
isParentColumnComplex
+ // property.
+ if (dims.getColumnProperties() != null && Boolean
+
.parseBoolean(dims.getColumnProperties().get("isParentColumnComplex"))) {
+ isComplexColumn = true;
}
// fill all the no dictionary and dictionary data to the prepared row
first, fill the complex
// flatten data to prepared row at last
@@ -293,7 +289,11 @@ public class SecondaryIndexQueryResultProcessor {
// dictionary
preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
} else {
- if (isComplexColumn) {
+ if (isComplexColumn && complexColumnParentBlockIndexes.length == 0) {
+ // After restructure some complex column will not be present in
parent block.
+ // In such case, set the SI implicit row value to empty byte array.
+ preparedRow[i] = new byte[0];
+ } else if (isComplexColumn) {
// get the flattened data of complex column
byte[] complexKeyByIndex =
wrapper.getComplexKeyByIndex(complexIndex);
ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index b77677a..223bff1 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -680,18 +680,24 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
dataType: DataType = null): ColumnSchema = {
val columnSchema = new ColumnSchema()
val encodingList = parentColumnSchema.getEncodingList
+ var colPropMap = parentColumnSchema.getColumnProperties
// if data type is arrayType, then store the column as its CHILD data type
in SI
if (DataTypes.isArrayType(parentColumnSchema.getDataType)) {
columnSchema.setDataType(dataType)
+ if (colPropMap == null) {
+ colPropMap = new java.util.HashMap[String, String]()
+ }
+ colPropMap.put("isParentColumnComplex", "true")
+ columnSchema.setColumnProperties(colPropMap)
if (dataType == DataTypes.DATE) {
encodingList.add(Encoding.DIRECT_DICTIONARY)
encodingList.add(Encoding.DICTIONARY)
}
} else {
columnSchema.setDataType(parentColumnSchema.getDataType)
+ columnSchema.setColumnProperties(colPropMap)
}
columnSchema.setColumnName(parentColumnSchema.getColumnName)
- columnSchema.setColumnProperties(parentColumnSchema.getColumnProperties)
columnSchema.setEncodingList(encodingList)
columnSchema.setColumnUniqueId(parentColumnSchema.getColumnUniqueId)
columnSchema.setColumnReferenceId(parentColumnSchema.getColumnReferenceId)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index bc96996..eafbb3a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -195,7 +195,7 @@ object SecondaryIndexCreator {
mainTable,
projections.mkString(","),
Array(eachSegment),
- isPositionReferenceRequired = true)
+ isPositionReferenceRequired = true, !explodeColumn.isEmpty)
// flatten the complex SI
if (explodeColumn.nonEmpty) {
val columns = dataFrame.schema.map { x =>
@@ -564,7 +564,8 @@ object SecondaryIndexCreator {
carbonTable: CarbonTable,
projections: String,
segments: Array[String],
- isPositionReferenceRequired: Boolean = false): DataFrame = {
+ isPositionReferenceRequired: Boolean = false,
+ isComplexTypeProjection: Boolean = false): DataFrame = {
try {
CarbonThreadUtil.threadSet(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
carbonTable.getDatabaseName +
@@ -584,9 +585,9 @@ object SecondaryIndexCreator {
case p: Project =>
Project(p.projectList :+ positionId, p.child)
}
- val tableProperties = if (carbonTable.isHivePartitionTable) {
- // in case of partition table, TableProperties object in carbonEnv is
not same as
- // in carbonTable object, so update from carbon env itself.
+ val tableProperties = if (carbonTable.isHivePartitionTable ||
isComplexTypeProjection) {
+ // in case of partition table and complex type projection,
TableProperties object in
+ // carbonEnv is not same as in carbonTable object, so update from
carbon env itself.
CarbonEnv.getCarbonTable(Some(carbonTable.getDatabaseName),
carbonTable.getTableName)(
sparkSession).getTableInfo
.getFactTable
diff --git a/integration/spark/src/test/resources/secindex/array2.csv
b/integration/spark/src/test/resources/secindex/array2.csv
new file mode 100644
index 0000000..e453846
--- /dev/null
+++ b/integration/spark/src/test/resources/secindex/array2.csv
@@ -0,0 +1,4 @@
+1,abc,china$india$us,hello$world
+2,xyz,sri$can,iron$man$jarvis
+3,mno,rus$china,ex$ex2
+4,lok,hk$bang,ex$ex3
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index b4504e1..48d9bed 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -214,6 +214,24 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_com")
}
+ test("Test alter add complex type and compaction") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("create table alter_com (a int, b string, arr1 array<string>) stored
as carbondata")
+ sql("insert into alter_com select 1,'a',array('hi')")
+ sql("insert into alter_com select 2,'b',array('hello','world')")
+ sql("ALTER TABLE alter_com ADD COLUMNS(struct1 STRUCT<a:int, b:string>)")
+ sql("insert into alter_com select 3,'c',array('hi'),null")
+ sql("insert into alter_com select
4,'d',array('hi'),named_struct('s1',4,'s2','d')")
+ sql("alter table alter_com compact 'minor'")
+ checkAnswer(sql("""Select count(*) from alter_com"""), Seq(Row(4)))
+ checkAnswer(sql("Select * from alter_com"),
+ Seq(Row(1, "a", mutable.WrappedArray.make(Array("hi")), null),
+ Row(2, "b", mutable.WrappedArray.make(Array("hello", "world")), null),
+ Row(3, "c", mutable.WrappedArray.make(Array("hi")), null),
+ Row(4, "d", mutable.WrappedArray.make(Array("hi")), Row(4, "d"))))
+ sql("DROP TABLE IF EXISTS alter_com")
+ }
+
def insertIntoTableForArrayType(): Unit = {
sql("insert into alter_com
values(4,array(2),array(1,2,3,4),array('abc','def'))")
sql("insert into alter_com values(5,array(1,2),array(1),
array('Hulk','Thor'))")
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 1a09a5d..b0a4263 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -414,22 +414,7 @@ public class PrimitiveDataType implements
GenericDataType<Object> {
private void updateNullValue(DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder)
throws IOException {
- if (this.carbonDimension.getDataType() == DataTypes.STRING
- || this.carbonDimension.getDataType() == DataTypes.VARCHAR) {
- if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
- } else {
-
dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
- }
- dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
- } else {
- if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
- } else {
-
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
- }
- dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
- }
+ CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
this.carbonDimension.getDataType());
String message =
logHolder.getColumnMessageMap().get(carbonDimension.getColName());
if (null == message) {
message = CarbonDataProcessorUtil