This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new f52aa20
[CARBONDATA-4164][CARBONDATA-4198][CARBONDATA-4199][CARBONDATA-4234] Support
alter add map, multilevel complex columns and rename/change datatype.
f52aa20 is described below
commit f52aa20a82b3e9766d34511ac4296b50c5a3ea9e
Author: ShreelekhyaG <[email protected]>
AuthorDate: Wed Jul 14 22:01:14 2021 +0530
[CARBONDATA-4164][CARBONDATA-4198][CARBONDATA-4199][CARBONDATA-4234]
Support alter add map, multilevel complex columns and rename/change datatype.
Why is this PR needed?
Support alter add map, multilevel complex columns, and Change datatype for
complex type.
What changes were proposed in this PR?
1. Support adding of single-level and multi-level map columns
2. Support adding of multi-level complex columns(array/struct)
3. Support renaming of map columns including nested levels
4. Alter change datatype at nested levels (array/map/struct)
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4180
---
.../impl/RestructureBasedRawResultCollector.java | 35 +++--
.../core/scan/complextypes/PrimitiveQueryType.java | 7 +-
.../core/scan/executor/util/RestructureUtil.java | 49 ++++++-
.../apache/carbondata/core/util/DataTypeUtil.java | 9 +-
docs/ddl-of-carbondata.md | 37 +++++-
.../TestSIWithComplexArrayType.scala | 73 ++++++++++-
.../query/SecondaryIndexQueryResultProcessor.java | 15 ++-
.../spark/sql/catalyst/CarbonParserUtil.scala | 63 +++++----
.../command/carbonTableSchemaCommon.scala | 71 +++++-----
...nAlterTableColRenameDataTypeChangeCommand.scala | 65 +++++----
.../org/apache/spark/util/AlterTableUtil.scala | 44 ++++---
.../alterTable/TestAlterTableAddColumns.scala | 95 +++++++++-----
.../AlterTableColumnRenameTestCase.scala | 146 +++++++++++++++++----
.../vectorreader/ChangeDataTypeTestCases.scala | 52 ++++++++
14 files changed, 567 insertions(+), 194 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 80fb3c3..af0ddb9 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -166,21 +166,11 @@ public class RestructureBasedRawResultCollector extends
RawBasedResultCollector
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
} else if (currDataType.isComplexType()) {
- // Iterate over child dimensions and add its default value.
- List<CarbonDimension> children =
-
actualQueryDimensions[i].getDimension().getListOfChildDimensions();
try (ByteArrayOutputStream byteStream = new
ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteStream)) {
- if (DataTypes.isArrayType(currDataType)) {
- dataOutputStream.writeInt(1);
- } else if (DataTypes.isStructType(currDataType)) {
- dataOutputStream.writeShort(children.size());
- }
- for (int j = 0; j < children.size(); j++) {
- // update default null values based on datatype
- CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
- children.get(j).getDataType());
- }
+ // Iterate over child dimensions and add its default value.
+
addDefaultValueForComplexTypes(actualQueryDimensions[i].getDimension(),
+ currDataType, dataOutputStream);
newColumnDefaultValue = byteStream.toByteArray();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
@@ -200,4 +190,23 @@ public class RestructureBasedRawResultCollector extends
RawBasedResultCollector
byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
}
}
+
+ private void addDefaultValueForComplexTypes(CarbonDimension dimension,
DataType currDataType,
+ DataOutputStream dataOutputStream) throws IOException {
+ List<CarbonDimension> children = dimension.getListOfChildDimensions();
+ if (DataTypes.isArrayType(currDataType) ||
DataTypes.isMapType(currDataType)) {
+ dataOutputStream.writeInt(1);
+ } else if (DataTypes.isStructType(currDataType)) {
+ dataOutputStream.writeShort(children.size());
+ }
+ for (int j = 0; j < children.size(); j++) {
+ if (children.get(j).isComplex()) {
+ addDefaultValueForComplexTypes(children.get(j),
children.get(j).getDataType(),
+ dataOutputStream);
+ } else {
+ // update default null values based on datatype
+ CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
children.get(j).getDataType());
+ }
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 2328c75..674611d 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -76,7 +76,6 @@ public class PrimitiveQueryType extends ComplexQueryType
implements GenericQuery
@Override
public void setParentName(String parentName) {
this.parentName = parentName;
-
}
@Override
@@ -135,11 +134,15 @@ public class PrimitiveQueryType extends ComplexQueryType
implements GenericQuery
size = dataBuffer.array().length;
} else if (child.getDataType() == DataTypes.TIMESTAMP) {
size = DataTypes.LONG.getSizeInBytes();
+ } else if (dataBuffer.remaining() == DataTypes.INT.getSizeInBytes() &&
child.getDataType()
+ .equals(DataTypes.LONG)) {
+ // When datatype has been altered,
+ // get the actual data loaded size and then convert to long type.
+ size = DataTypes.INT.getSizeInBytes();
} else {
size = child.getDataType().getSizeInBytes();
}
actualData = getDataObject(dataBuffer, size);
-
return actualData;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 0710920..e74b6ea 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -119,15 +119,18 @@ public class RestructureUtil {
for (CarbonDimension tableDimension : tableComplexDimension) {
if (isColumnMatches(isTransactionalTable,
queryDimension.getDimension(),
tableDimension)) {
- ProjectionDimension currentBlockDimension = null;
+ ProjectionDimension currentBlockDimension;
// If projection dimension is child of struct field and contains
Parent Ordinal
if (null !=
queryDimension.getDimension().getComplexParentDimension()) {
currentBlockDimension = new
ProjectionDimension(queryDimension.getDimension());
} else {
currentBlockDimension = new ProjectionDimension(tableDimension);
+ // for complex dimension update datatype, set scale and
precision by traversing
+ // the child dimensions. Spark requires the resultant row with
latest datatype,
+ // update the dimension here to collect the resultant rows with
updated datatype.
+
fillNewDatatypesForComplexChildren(currentBlockDimension.getDimension(),
+ queryDimension.getDimension());
}
- // TODO: for complex dimension set scale and precision by
traversing
- // the child dimensions
currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
presentDimension.add(currentBlockDimension);
isDimensionExists[dimIndex] = true;
@@ -165,6 +168,46 @@ public class RestructureUtil {
return presentDimension;
}
+ public static CarbonDimension getCarbonDimension(String columnId,
+ List<CarbonDimension> dimensions) {
+ CarbonDimension carbonDimension = null;
+ if (dimensions == null) {
+ return carbonDimension;
+ }
+ for (CarbonDimension dim : dimensions) {
+ if (dim.isComplex()) {
+ carbonDimension = getCarbonDimension(columnId,
dim.getListOfChildDimensions());
+ } else if (dim.getColumnId().equalsIgnoreCase(columnId)) {
+ carbonDimension = dim;
+ break;
+ }
+ }
+ return carbonDimension;
+ }
+
+ public static void fillNewDatatypesForComplexChildren(CarbonDimension
currentDimension,
+ CarbonDimension tableDimension) {
+ if (tableDimension.getListOfChildDimensions() == null) {
+ return;
+ }
+ for (CarbonDimension childDimension :
tableDimension.getListOfChildDimensions()) {
+ if (childDimension.getDataType().isComplexType()) {
+ fillNewDatatypesForComplexChildren(currentDimension, childDimension);
+ } else {
+ // only in case of primitive types, datatype is allowed to be altered.
+ // Find and update the current block dimension datatype.
+ CarbonDimension currentChildDimension =
getCarbonDimension(childDimension.getColumnId(),
+ currentDimension.getListOfChildDimensions());
+ if (currentChildDimension != null) {
+ ColumnSchema currentSchema = currentChildDimension.getColumnSchema();
+ currentSchema.setDataType(childDimension.getDataType());
+
currentSchema.setPrecision(childDimension.getColumnSchema().getPrecision());
+ currentSchema.setScale(childDimension.getColumnSchema().getScale());
+ }
+ }
+ }
+ }
+
public static void fillExistingTableColumnIDMap(CarbonDimension tableColumn)
{
existingTableColumnIDMap.put(tableColumn.getColumnId(),
tableColumn.getColName());
List<CarbonDimension> children = tableColumn.getListOfChildDimensions();
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c2d380d..17d5601 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -740,7 +740,14 @@ public final class DataTypeUtil {
if (isEmptyByteArray(dataInBytes)) {
return null;
}
- return ByteUtil.toXorLong(dataInBytes, 0, dataInBytes.length);
+ DataType blockDatatype;
+ if (dataInBytes.length == DataTypes.INT.getSizeInBytes()) {
+ blockDatatype = DataTypes.INT;
+ } else {
+ blockDatatype = DataTypes.LONG;
+ }
+ return getDataBasedOnRestructuredDataType(dataInBytes, blockDatatype,
0,
+ dataInBytes.length);
} else if (actualDataType == DataTypes.TIMESTAMP) {
if (isEmptyByteArray(dataInBytes)) {
return null;
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index dd09b91..b482ae5 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -775,10 +775,9 @@ CarbonData DDL statements are documented here,which
includes:
```
ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING)
TBLPROPERTIES('DEFAULT.VALUE.a1'='10')
```
- **NOTE:** Adding of only single-level Complex datatype columns(only
array and struct) is supported.
- Example -
+
```
- ALTER TABLE <table-name> ADD COLUMNS(arrField array<int>, structField
struct<id1:string,name1:string>)
+ ALTER TABLE <table-name> ADD COLUMNS(arrField array<array<int>>,
structField struct<id1:string,name1:string>, mapField map<string,array<string>>)
```
Users can specify which columns to include and exclude for local dictionary
generation after adding new columns. These will be appended with the already
existing local dictionary include and exclude columns of main table
respectively.
@@ -866,7 +865,36 @@ Users can specify which columns to include and exclude for
local dictionary gene
ALTER TABLE test_db.carbon CHANGE oldArray newArray array<int>
```
- **NOTE:** Once the column is renamed, user has to take care about
replacing the fileheader with the new name or changing the column header in csv
file.
+ Example 7: Change column name in column: oldMapCol map\<int, int> from
oldMapCol to newMapCol.
+
+ ```
+ ALTER TABLE test_db.carbon CHANGE oldMapCol newMapCol map<int, int>
+ ```
+
+ Example 8: Change child column type in column: structField
struct\<id:int> from int to long.
+
+ ```
+ ALTER TABLE test_db.carbon CHANGE structField structField struct<id:long>
+ ```
+
+ Example 9: Change column name and type in column: oldArray array\<int>
from oldArray to newArray and int to long.
+
+ ```
+ ALTER TABLE test_db.carbon CHANGE oldArray newArray array<long>
+ ```
+ Example 10: Change column name and type in column: oldMapCol map\<int,
decimal(5,2)> from oldMapCol to newMapCol and decimal(5,2) to decimal(6,2).
+
+ ```
+ ALTER TABLE test_db.carbon CHANGE oldMapCol newMapCol map<int,
decimal(6,2)>
+ ```
+
+ Example 11: Change column name and type at nested level of column:
structFiled struct\<a:int,b:map\<int,int>> from b to b2 and int to long.
+
+ ```
+ ALTER TABLE test_db.carbon CHANGE structFiled structFiled
struct<a:int,b2:map<int,long>>
+ ```
+
+ **NOTE:** Once the column is renamed, user has to take care about
replacing the file header with the new name or changing the column header in
csv file.
- #### MERGE INDEX
@@ -885,7 +913,6 @@ Users can specify which columns to include and exclude for
local dictionary gene
**NOTE:**
* Merge index is supported on streaming table from carbondata 2.0.1
version.
But streaming segments (ROW_V1) cannot create merge index.
- * Rename column name is not supported for MAP type.
- #### SET and UNSET
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 8f6aaad..5892ee7 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -16,7 +16,7 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
-import scala.collection.mutable
+import scala.collection.mutable.WrappedArray.make
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
@@ -59,8 +59,7 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis')")
checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
- Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
- mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
+ Seq(Row("4", make(Array("India")), "g", make(Array("iron", "man",
"jarvis")))))
val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and name='g'")
val result2 = sql("select * from complextable where arr2[0]='iron' and
name='f'")
sql("create index index_11 on table complextable(arr2, name) as
'carbondata'")
@@ -87,6 +86,71 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
checkAnswer(result2, df2)
}
+ test("Test restructured array<int> and existing string column as index
columns on SI with compaction") {
+ sql("drop table if exists complextable")
+ sql("create table complextable (id string, name string, country
array<string>) stored as carbondata")
+ sql("insert into complextable select 3,'f',array('china')")
+ sql("drop index if exists index_11 on complextable")
+ sql("ALTER TABLE complextable ADD COLUMNS(arr2 array<int>)")
+ sql("insert into complextable select 4,'g',array('India'),array(1)")
+ // change datatype
+ sql("alter table complextable change arr2 arr2 array<long>")
+ sql("insert into complextable select
3,'f',array('china'),array(26557544541,null)")
+ sql("insert into complextable select
4,'g',array('India'),array(26557544541,46557544541,3)")
+ checkAnswer(sql("select * from complextable where array_contains(arr2,3)"),
+ Seq(Row("4", "g", make(Array("India")), make(Array(26557544541L,
46557544541L, 3)))))
+ val result1 = sql("select * from complextable where
array_contains(arr2,46557544541) and name='g'")
+ val result2 = sql("select * from complextable where arr2[0]=26557544541
and name='f'")
+ sql("create index index_11 on table complextable(arr2, name) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df1 = sql(" select * from complextable where
array_contains(arr2,46557544541) and name='g'")
+ val df2 = sql(" select * from complextable where arr2[0]=26557544541 and
name='f'")
+ if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ val doNotHitSIDf = sql(
+ " select * from complextable where array_contains(arr2,26557544541) and
array_contains" +
+ "(arr2,46557544541)")
+ if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result1, df1)
+ checkAnswer(result2, df2)
+ }
+
+ test("Test restructured array<timestamp> as index column on SI with
compaction") {
+ sql("drop table if exists complextable")
+ sql("create table complextable (name string, time date) stored as
carbondata")
+ sql("insert into complextable select 'b', '2017-02-01'")
+ sql("ALTER TABLE complextable ADD COLUMNS(projectdate array<timestamp>)")
+ sql("insert into complextable select 'b', '2017-02-01',array('2017-02-01
00:01:00','')")
+ sql("drop index if exists index_1 on complextable")
+ sql("insert into complextable select 'b', '2017-02-01',array('2017-02-01
00:01:00','2018-02-01 02:00:00')")
+ sql("insert into complextable select 'b',
'2017-02-01',array(null,'2018-02-01 02:00:00')")
+ sql("insert into complextable select 'b', '2017-02-01',null")
+ val result = sql(" select * from complextable where
array_contains(projectdate,cast('2017-02-01 00:01:00' as timestamp))")
+ sql("create index index_1 on table complextable(projectdate) as
'carbondata'")
+ sql("alter table complextable compact 'minor'")
+ val df = sql(" select * from complextable where
array_contains(projectdate,cast('2017-02-01 00:01:00' as timestamp))")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result, df)
+ }
+
+
+
test("Test restructured array<string> and string columns as index columns on
SI with compaction") {
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, name
string) stored as carbondata")
@@ -102,8 +166,7 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis'),'India'")
checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
- Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
- mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
+ Seq(Row("4", make(Array("India")), "g", make(Array("iron", "man",
"jarvis")), "India")))
val result1 = sql("select * from complextable where
array_contains(arr2,'iron') and addr='India'")
val result2 = sql("select * from complextable where arr2[0]='iron' and
addr='china'")
sql("create index index_11 on table complextable(arr2, addr) as
'carbondata'")
diff --git
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index d8ff50f..0640af0 100644
---
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -291,8 +291,13 @@ public class SecondaryIndexQueryResultProcessor {
} else {
if (isComplexColumn && complexColumnParentBlockIndexes.length == 0) {
// After restructure some complex column will not be present in
parent block.
- // In such case, set the SI implicit row value to empty byte array.
- preparedRow[i] = new byte[0];
+ // In such case, set the SI implicit row value to null or empty byte
array.
+ if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
+ // set null value for measures
+ preparedRow[i] = null;
+ } else {
+ preparedRow[i] = new byte[0];
+ }
} else if (isComplexColumn) {
// get the flattened data of complex column
byte[] complexKeyByIndex =
wrapper.getComplexKeyByIndex(complexIndex);
@@ -362,8 +367,14 @@ public class SecondaryIndexQueryResultProcessor {
*/
private Object getData(Object[] data, int index, DataType dataType) {
if (data == null || data.length == 0) {
+ if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+ return null;
+ }
return new byte[0];
} else if (data[0] == null) {
+ if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+ return null;
+ }
return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
}
if (dataType == DataTypes.TIMESTAMP && null != data[index]) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index cee16d7..7a5f53f 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -1124,26 +1124,19 @@ object CarbonParserUtil {
val dataTypeName =
DataTypeConverterUtil.convertToCarbonType(complexField).getName
val dataTypeInfo = CarbonParserUtil.parseDataType(columnName,
dataTypeName.toLowerCase, values)
complexField.dataType match {
- case Some(CarbonCommonConstants.ARRAY) =>
- val childField = complexField.children.get(0)
- val childType = childField.dataType
- val childName = columnName + CarbonCommonConstants.POINT +
childField.name
- val childValues = childType match {
- case d: DecimalType => Some(List((d.precision, d.scale)))
- case _ => None
- }
- val childDatatypeInfo = parseDataType(childName, childField,
childValues)
- dataTypeInfo.setChildren(List(childDatatypeInfo))
- case Some(CarbonCommonConstants.STRUCT) =>
+ case Some(CarbonCommonConstants.ARRAY) |
Some(CarbonCommonConstants.STRUCT) |
+ Some(CarbonCommonConstants.MAP) =>
var childTypeInfoList: List[DataTypeInfo] = null
for (childField <- complexField.children.get) {
val childType = childField.dataType
- val childName = columnName + CarbonCommonConstants.POINT +
childField.name.get
- val childValues = childType match {
- case d: DecimalType => Some(List((d.precision, d.scale)))
- case _ => None
+ val childName = columnName + CarbonCommonConstants.POINT +
childField.column
+ val decimalValues = if
(childType.get.contains(CarbonCommonConstants.DECIMAL)) {
+ // If datatype is decimal, extract its precision and scale.
+ Some(List(CommonUtil.getScaleAndPrecision(childType.get)))
+ } else {
+ None
}
- val childDatatypeInfo = parseDataType(childName, childField,
childValues)
+ val childDatatypeInfo = parseDataType(childName, childField,
decimalValues)
if (childTypeInfoList == null) {
childTypeInfoList = List(childDatatypeInfo)
} else {
@@ -1153,7 +1146,6 @@ object CarbonParserUtil {
dataTypeInfo.setChildren(childTypeInfoList)
case _ =>
}
- // TODO have to handle for map types [CARBONDATA-4199]
dataTypeInfo
}
@@ -1210,22 +1202,16 @@ object CarbonParserUtil {
case arrayType: ArrayType =>
val childType: DataType = arrayType.elementType
val childName = columnName + ".val"
- val childValues = childType match {
- case d: DecimalType => Some(List((d.precision, d.scale)))
- case _ => None
- }
- val childDatatypeInfo = parseColumn(childName, childType, childValues)
+ val decimalValues = getDecimalValues(childType)
+ val childDatatypeInfo = parseColumn(childName, childType,
decimalValues)
dataTypeInfo.setChildren(List(childDatatypeInfo))
case structType: StructType =>
var childTypeInfoList: List[DataTypeInfo] = null
for (childField <- structType) {
val childType = childField.dataType
val childName = columnName + CarbonCommonConstants.POINT +
childField.name
- val childValues = childType match {
- case d: DecimalType => Some(List((d.precision, d.scale)))
- case _ => None
- }
- val childDatatypeInfo = CarbonParserUtil.parseColumn(childName,
childType, childValues)
+ val decimalValues = getDecimalValues(childType)
+ val childDatatypeInfo = CarbonParserUtil.parseColumn(childName,
childType, decimalValues)
if (childTypeInfoList == null) {
childTypeInfoList = List(childDatatypeInfo)
} else {
@@ -1233,12 +1219,33 @@ object CarbonParserUtil {
}
}
dataTypeInfo.setChildren(childTypeInfoList)
+ case mapType: MapType =>
+ val keyType: DataType = mapType.keyType
+ val valType: DataType = mapType.valueType
+ var childTypeInfoList: List[DataTypeInfo] = List()
+ val childName1 = columnName + ".key"
+ val childName2 = columnName + ".value"
+ val keyTypeDecimalValues = getDecimalValues(keyType)
+ val valTypeDecimalValues = getDecimalValues(valType)
+ val childDatatypeInfo1 = CarbonParserUtil.parseColumn(childName1,
+ keyType, keyTypeDecimalValues)
+ val childDatatypeInfo2 = CarbonParserUtil.parseColumn(childName2,
+ valType, valTypeDecimalValues)
+ childTypeInfoList = childTypeInfoList :+ childDatatypeInfo1
+ childTypeInfoList = childTypeInfoList :+ childDatatypeInfo2
+ dataTypeInfo.setChildren(childTypeInfoList)
case _ =>
}
- // TODO have to handle for map types [CARBONDATA-4199]
dataTypeInfo
}
+ def getDecimalValues(inputType: DataType): Option[List[(Int, Int)]] = {
+ inputType match {
+ case d: DecimalType => Some(List((d.precision, d.scale)))
+ case _ => None
+ }
+ }
+
def checkFieldDefaultValue(fieldName: String, defaultValueColumnName:
String): Boolean = {
defaultValueColumnName.equalsIgnoreCase("default.value." + fieldName)
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 6c6030d..a00e58e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -244,11 +244,6 @@ class AlterTableColumnSchemaGenerator(
}
private def createChildSchema(childField: Field, currentSchemaOrdinal: Int):
ColumnSchema = {
- // TODO: should support adding multi-level complex columns: CARBONDATA-4164
- if (!childField.children.contains(null)) {
- throw new UnsupportedOperationException(
- "Alter add columns with nested complex types is not allowed")
- }
TableNewProcessor.createColumnSchema(
childField,
@@ -263,18 +258,46 @@ class AlterTableColumnSchemaGenerator(
isVarcharColumn(childField.name.getOrElse(childField.column)))
}
+ def addComplexChildCols(currField: Field,
+ currColumnSchema: ColumnSchema,
+ newCols: mutable.Buffer[ColumnSchema],
+ allColumns: mutable.Buffer[ColumnSchema],
+ longStringCols: mutable.Buffer[ColumnSchema],
+ currentSchemaOrdinal: Int): Unit = {
+ if (currField.children.get == null || currField.children.isEmpty) {
+ return
+ }
+ if (currColumnSchema.getDataType.isComplexType) {
+ val noOfChildren = currField.children.get.size
+ currColumnSchema.setNumberOfChild(noOfChildren)
+ currField.children.get.foreach(childField => {
+ val childSchema: ColumnSchema = createChildSchema(childField,
currentSchemaOrdinal)
+ if (childSchema.getDataType == DataTypes.VARCHAR) {
+ // put the new long string columns in 'longStringCols'
+ // and add them after old long string columns
+ longStringCols ++= Seq(childSchema)
+ } else {
+ allColumns ++= Seq(childSchema)
+ }
+ newCols ++= Seq(childSchema)
+ addComplexChildCols(childField, childSchema, newCols, allColumns,
longStringCols,
+ currentSchemaOrdinal)
+ })
+ }
+ }
+
def process: Seq[ColumnSchema] = {
val tableSchema = tableInfo.getFactTable
val tableCols = tableSchema.getListOfColumns.asScala
// previous maximum column schema ordinal + 1 is the current column schema
ordinal
val currentSchemaOrdinal = tableCols.map(col => col.getSchemaOrdinal).max
+ 1
- var longStringCols = Seq[ColumnSchema]()
+ val longStringCols = mutable.Buffer[ColumnSchema]()
// get all original dimension columns
// but exclude complex type columns and long string columns
var allColumns = tableCols.filter(x =>
(x.isDimensionColumn && !x.getDataType.isComplexType() &&
!x.isComplexColumn()
&& x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR)))
- var newCols = Seq[ColumnSchema]()
+ val newCols = mutable.Buffer[ColumnSchema]()
val invertedIndexCols: Array[String] = alterTableModel
.tableProperties
.get(CarbonCommonConstants.INVERTED_INDEX)
@@ -283,10 +306,6 @@ class AlterTableColumnSchemaGenerator(
// add new dimension columns
alterTableModel.dimCols.foreach(field => {
- if (field.dataType.get.toLowerCase().equals(CarbonCommonConstants.MAP)) {
- throw new MalformedCarbonCommandException(
- s"Add column is unsupported for map datatype column: ${ field.column
}")
- }
val encoders = new java.util.ArrayList[Encoding]()
val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
field,
@@ -307,34 +326,8 @@ class AlterTableColumnSchemaGenerator(
allColumns ++= Seq(columnSchema)
}
newCols ++= Seq(columnSchema)
- if (DataTypes.isArrayType(columnSchema.getDataType)) {
- columnSchema.setNumberOfChild(field.children.size)
- val childField = field.children.get(0)
- val childSchema: ColumnSchema = createChildSchema(childField,
currentSchemaOrdinal)
- if (childSchema.getDataType == DataTypes.VARCHAR) {
- // put the new long string columns in 'longStringCols'
- // and add them after old long string columns
- longStringCols ++= Seq(childSchema)
- } else {
- allColumns ++= Seq(childSchema)
- }
- newCols ++= Seq(childSchema)
- } else if (DataTypes.isStructType(columnSchema.getDataType)) {
- val noOfChildren = field.children.get.size
- columnSchema.setNumberOfChild(noOfChildren)
- for (i <- 0 to noOfChildren - 1) {
- val childField = field.children.get(i)
- val childSchema: ColumnSchema = createChildSchema(childField,
currentSchemaOrdinal)
- if (childSchema.getDataType == DataTypes.VARCHAR) {
- // put the new long string columns in 'longStringCols'
- // and add them after old long string columns
- longStringCols ++= Seq(childSchema)
- } else {
- allColumns ++= Seq(childSchema)
- }
- newCols ++= Seq(childSchema)
- }
- }
+ addComplexChildCols(field, columnSchema, newCols, allColumns,
longStringCols,
+ currentSchemaOrdinal)
})
// put the old long string columns
allColumns ++= tableCols.filter(x =>
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 44b03ed..01491fa 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -38,8 +38,8 @@ import
org.apache.carbondata.core.metadata.datatype.{DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn,
CarbonDimension}
import
org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent,
AlterTableColRenameAndDataTypeChangePreEvent, OperationContext,
OperationListenerBus}
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry,
TableInfo}
-import org.apache.carbondata.spark.util.DataTypeConverterUtil
+import org.apache.carbondata.format.{ColumnSchema, DataType,
SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String,
newColumnName: String)
extends MetadataCommand {
@@ -81,6 +81,9 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
// stores mapping of altered column names: old-column-name ->
new-column-name.
// Including both parent/table and children columns
val alteredColumnNamesMap = collection.mutable.LinkedHashMap.empty[String,
String]
+ // stores mapping of altered column data types: old-column-name ->
new-column-datatype.
+ // Including both parent/table and children columns
+ val alteredDatatypesMap = collection.mutable.LinkedHashMap.empty[String,
String]
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -150,11 +153,6 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
// set isDataTypeChange flag
val oldDatatype = oldCarbonColumn.head.getDataType
val newDatatype =
alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
- if (isColumnRename && (DataTypes.isMapType(oldDatatype) ||
-
newDatatype.equalsIgnoreCase(CarbonCommonConstants.MAP))) {
- throw new UnsupportedOperationException(
- "Alter rename is unsupported for Map datatype column")
- }
if (oldDatatype.getName.equalsIgnoreCase(newDatatype)) {
val newColumnPrecision =
alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
@@ -169,13 +167,13 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
newColumnScale)) {
isDataTypeChange = true
}
- if (DataTypes.isArrayType(oldDatatype) ||
DataTypes.isStructType(oldDatatype)) {
+ if (oldDatatype.isComplexType) {
val oldParent = oldCarbonColumn.head
val oldChildren =
oldParent.asInstanceOf[CarbonDimension].getListOfChildDimensions.asScala
.toList
AlterTableUtil.validateComplexStructure(oldChildren,
alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.getChildren(),
- alteredColumnNamesMap)
+ alteredColumnNamesMap, alteredDatatypesMap)
}
} else {
if (oldDatatype.isComplexType) {
@@ -188,7 +186,7 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
// If there is no columnrename and datatype change and comment change
// return directly without execution
if (!isColumnRename && !isDataTypeChange && !newColumnComment.isDefined
&&
- alteredColumnNamesMap.isEmpty) {
+ alteredColumnNamesMap.isEmpty && alteredDatatypesMap.isEmpty) {
return Seq.empty
}
// if column datatype change operation is on partition column, then fail
the
@@ -273,26 +271,41 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
}
addedTableColumnSchema = columnSchema
} else if (isComplexChild(columnSchema)) {
- if (alteredColumnNamesMap.contains(columnName)) {
- // matches exactly
- val newComplexChildName = alteredColumnNamesMap(columnName)
- columnSchema.setColumn_name(newComplexChildName)
- isSchemaEntryRequired = true
- } else {
- val alteredParent = checkIfParentIsAltered(columnName)
- /*
- * Lets say, if complex schema is: str struct<a: int>
- * and if parent column is changed from str -> str2
- * then its child name should also be changed from str.a -> str2.a
- */
- if (alteredParent != null) {
- val newParent = alteredColumnNamesMap(alteredParent)
- val newComplexChildName = newParent + columnName
- .split(alteredParent)(1)
+ // check if name is altered
+ if (!alteredColumnNamesMap.isEmpty) {
+ if (alteredColumnNamesMap.contains(columnName)) {
+ // matches exactly
+ val newComplexChildName = alteredColumnNamesMap(columnName)
columnSchema.setColumn_name(newComplexChildName)
isSchemaEntryRequired = true
+ } else {
+ val alteredParent = checkIfParentIsAltered(columnName)
+ /*
+ * Lets say, if complex schema is: str struct<a: int>
+ * and if parent column is changed from str -> str2
+ * then its child name should also be changed from str.a ->
str2.a
+ */
+ if (alteredParent != null) {
+ val newParent = alteredColumnNamesMap(alteredParent)
+ val newComplexChildName = newParent + columnName
+ .split(alteredParent)(1)
+ columnSchema.setColumn_name(newComplexChildName)
+ isSchemaEntryRequired = true
+ }
}
}
+ // check if datatype is altered
+ if (!alteredDatatypesMap.isEmpty &&
alteredDatatypesMap.get(columnName) != None) {
+ val newDatatype = alteredDatatypesMap.get(columnName).get
+ if (newDatatype.equals(CarbonCommonConstants.LONG)) {
+ columnSchema.setData_type(DataType.LONG)
+ } else if (newDatatype.contains(CarbonCommonConstants.DECIMAL)) {
+ val (newPrecision, newScale) =
CommonUtil.getScaleAndPrecision(newDatatype)
+ columnSchema.setPrecision(newPrecision)
+ columnSchema.setScale(newScale)
+ }
+ isSchemaEntryRequired = true
+ }
}
// make a new schema evolution entry after column rename or datatype
change
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 3250f7a..f7c56ba 100644
---
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.index.IndexStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock,
LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonTableIdentifier}
import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn,
CarbonDimension, ColumnSchema}
@@ -1088,7 +1088,8 @@ object AlterTableUtil {
*/
def validateComplexStructure(oldDimensionList: List[CarbonDimension],
newDimensionList: List[DataTypeInfo],
- alteredColumnNamesMap: mutable.LinkedHashMap[String, String]): Unit = {
+ alteredColumnNamesMap: mutable.LinkedHashMap[String, String],
+ alteredDatatypesMap: mutable.LinkedHashMap[String, String]): Unit = {
if (oldDimensionList == null && newDimensionList == null) {
throw new UnsupportedOperationException("Both old and new dimensions are
null")
} else if (oldDimensionList == null || newDimensionList == null) {
@@ -1104,30 +1105,43 @@ object AlterTableUtil {
val old_column_datatype = oldDimensionInfo.getDataType.getName
val new_column_name = newDimensionInfo
.columnName.split(CarbonCommonConstants.POINT.toCharArray).last
- val new_column_datatype = newDimensionInfo.dataType
+ var new_column_datatype = newDimensionInfo.dataType
+
+ // check if column datatypes are altered. If altered, validate them
if (!old_column_datatype.equalsIgnoreCase(new_column_datatype)) {
- // datatypes of complex children cannot be altered. So throwing
exception for now.
- throw new UnsupportedOperationException(
- "Altering datatypes of any child column is not supported")
+ this.validateColumnDataType(newDimensionInfo, oldDimensionInfo)
+ alteredDatatypesMap += (oldDimensionInfo.getColName ->
new_column_datatype)
+ } else if
(old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.DECIMAL) &&
+ old_column_datatype.equalsIgnoreCase(new_column_datatype)) {
+ val oldPrecision =
oldDimensionInfo.getDataType().asInstanceOf[DecimalType].getPrecision
+ val oldScale =
oldDimensionInfo.getDataType().asInstanceOf[DecimalType].getScale
+ if (oldPrecision != newDimensionInfo.precision || oldScale !=
newDimensionInfo.scale) {
+ this.validateColumnDataType(newDimensionInfo, oldDimensionInfo)
+ new_column_datatype = "decimal(" + newDimensionInfo.precision +
"," +
+ newDimensionInfo.scale + ")"
+ alteredDatatypesMap += (oldDimensionInfo.getColName ->
new_column_datatype)
+ }
}
+
+ // check if column names are altered
if (!old_column_name.equalsIgnoreCase(new_column_name)) {
alteredColumnNamesMap += (oldDimensionInfo.getColName ->
newDimensionInfo.columnName)
}
- if (old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.MAP) ||
- new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.MAP)) {
- throw new UnsupportedOperationException(
- "Cannot alter complex structure that includes map type column")
- } else if
(new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
-
old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
-
new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT) ||
-
old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT)) {
+ if (isComplexType(new_column_datatype) ||
isComplexType(old_column_datatype)) {
validateComplexStructure(oldDimensionInfo.getListOfChildDimensions.asScala.toList,
- newDimensionInfo.getChildren(), alteredColumnNamesMap)
+ newDimensionInfo.getChildren(), alteredColumnNamesMap,
alteredDatatypesMap)
}
}
}
}
+ // To identify if the datatype name is of complex type.
+ def isComplexType(dataTypeName: String): Boolean = {
+ dataTypeName.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
+ dataTypeName.equalsIgnoreCase(CarbonCommonConstants.STRUCT) ||
+ dataTypeName.equalsIgnoreCase(CarbonCommonConstants.MAP)
+ }
+
/**
* This method will validate a column for its data type and check whether
the column data type
* can be modified and update if conditions are met.
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index 8f3497b..4b412cb 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters
import scala.collection.mutable
+import scala.collection.mutable.WrappedArray.make
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
@@ -214,6 +215,28 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_com")
}
+ test("Test adding of map of all primitive datatypes") {
+ sql("DROP TABLE IF EXISTS alter_com")
+ sql("CREATE TABLE alter_com(intfield int) STORED AS carbondata")
+ sql("ALTER TABLE alter_com ADD COLUMNS(map1 Map<short,int>, map2
Map<long,double>, " +
+ "map3 Map<decimal(3,2),string>, map4 Map<char(5),varchar(50)>, map5
Map<boolean,date>, " +
+ "map6 Map<string,timestamp>)")
+ sql("insert into alter_com values(1, map(1,2),map(3,2.34),
map(1.23,'hello')," +
+ "map('abc','def'), map(true,'2017-02-01')," + "map('time','2018-02-01
02:00:00.0')) ")
+ sql("select * from alter_com").show(false)
+ checkAnswer(sql("select * from alter_com"),
+ Seq(Row(1,
+ Map(1 -> 2),
+ Map(3 -> 2.34),
+ Map(java.math.BigDecimal.valueOf(1.23).setScale(2) -> "hello"),
+ Map("abc" -> "def"),
+ Map(true -> Date.valueOf("2017-02-01")),
+ Map("time" -> Timestamp.valueOf("2018-02-01 02:00:00.0")))))
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 6)
+ }
+
+
test("Test alter add complex type and compaction") {
sql("DROP TABLE IF EXISTS alter_com")
sql("create table alter_com (a int, b string, arr1 array<string>) stored
as carbondata")
@@ -271,7 +294,7 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("insert into alter_struct values(1, named_struct('id1',
'id1','name1','name1'))")
sql("ALTER TABLE alter_struct ADD COLUMNS(struct1
struct<a:string,b:string>, temp string," +
" intField int, struct2 struct<c:string,d:string,e:int>, arr
array<int>) TBLPROPERTIES " +
- "('LOCAL_DICTIONARY_INCLUDE'='struct1, struct2')")
+ s"('$dictionary'='struct1, struct2')")
val schema = sql("describe alter_struct").collect()
assert(schema.size == 7)
}
@@ -288,7 +311,6 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql(
"insert into alter_com
values(2,array(9,0),array(1,2,3),array('hello','world'),array(6,7)," +
"array(8,9), named_struct('a',1,'b','abcde') )")
- sql("select * from alter_com").show(false)
checkAnswer(sql("select * from alter_com where array_contains(arr4,6)"),
Seq(Row(2,
make(Array(9, 0)),
@@ -354,22 +376,46 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS alter_struct")
}
- test("Validate alter add multi-level complex column") {
+ test("test alter add multi-level complex columns") {
sql("DROP TABLE IF EXISTS alter_com")
- sql("CREATE TABLE alter_com(intField INT, arr array<int>) " +
- "STORED AS carbondata ")
- var exception = intercept[Exception] {
- sql("ALTER TABLE alter_com ADD COLUMNS(arr1 array<array<int>>) ")
- }
- val exceptionMessage =
- "operation failed for default.alter_com: Alter table add operation
failed: Alter add " +
- "columns with nested complex types is not allowed"
- assert(exception.getMessage.contains(exceptionMessage))
-
- exception = intercept[Exception] {
- sql("ALTER TABLE alter_com ADD COLUMNS(struct1 struct<arr: array<int>>)
")
- }
- assert(exception.getMessage.contains(exceptionMessage))
+ sql("CREATE TABLE alter_com(intField INT) STORED AS carbondata ")
+ sql("insert into alter_com values(1)")
+ // multi-level nested array
+ sql(
+ "ALTER TABLE alter_com ADD COLUMNS(arr1 array<array<int>>, arr2
array<struct<a1:string, " +
+ "map1:Map<string, string>>>) ")
+ sql(
+ "insert into alter_com values(1, array(array(1,2)),
array(named_struct('a1','st','map1', " +
+ "map('a','b'))))")
+ // multi-level nested struct
+ sql("ALTER TABLE alter_com ADD COLUMNS(struct1 struct<s1:string, arr:
array<int>>," +
+ " struct2 struct<num:double,contact:map<string,array<int>>>) ")
+ sql("insert into alter_com values(1, " +
+ "array(array(1,2)), array(named_struct('a1','st','map1',
map('a','b'))), " +
+ "named_struct('s1','hi','arr',array(1,2)),
named_struct('num',2.3,'contact',map('ph'," +
+ "array(1,2))))")
+ // multi-level nested map
+ sql(
+ "ALTER TABLE alter_com ADD COLUMNS(map1 map<string,array<string>>, map2
map<string," +
+ "struct<d:int, s:struct<im:string>>>)")
+ sql("insert into alter_com values(1, " +
+ "array(array(1,2)), array(named_struct('a1','st','map1',
map('a','b'))), " +
+ "named_struct('s1','hi','arr',array(1,2)),
named_struct('num',2.3,'contact',map('ph'," +
+ "array(1,2))),map('a',array('hi')),
map('a',named_struct('d',23,'s',named_struct('im'," +
+ "'sh'))))")
+ sql("alter table alter_com compact 'minor'")
+ checkAnswer(sql("select * from alter_com"),
+ Seq(Row(1, null, null, null, null, null, null),
+ Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" ->
"b")))),
+ null, null, null, null),
+ Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" ->
"b")))),
+ Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1,
2)))), null, null),
+ Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" ->
"b")))),
+ Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1,
2)))),
+ Map("a" -> make(Array("hi"))), Map("a" -> Row(23, Row("sh"))))
+ ))
+ val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+ assert(addedColumns.size == 6)
sql("DROP TABLE IF EXISTS alter_com")
}
@@ -389,21 +435,6 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
assert(exception.getMessage.contains(exceptionMessage))
}
- test("Validate adding of map types through alter command") {
- sql("DROP TABLE IF EXISTS alter_com")
- sql(
- "CREATE TABLE alter_com(doubleField double, arr1 array<long>, m map<int,
string> ) STORED " +
- "AS carbondata")
- sql("insert into alter_com values(1.1,array(77),map(1,'abc'))")
- val exception = intercept[Exception] {
- sql("ALTER TABLE alter_com ADD COLUMNS(mapField map<int, string>)")
- }
- val exceptionMessage =
- "operation failed for default.alter_com: Alter table add operation
failed: Add column is " +
- "unsupported for map datatype column: mapfield"
- assert(exception.getMessage.contains(exceptionMessage))
- }
-
test("alter table add complex columns with comment") {
sql("""create table test_add_column_with_comment(
| col1 string comment 'col1 comment',
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index c54eb9a..cc7152b 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -83,13 +83,6 @@ class AlterTableColumnRenameTestCase extends QueryTest with
BeforeAndAfterAll {
"CREATE TABLE test_rename (str struct<a:int,b:long>, str2
struct<a:int,b:long>, map1 " +
"map<string, string>, str3 struct<a:int, b:map<string, string>>) STORED
AS carbondata")
- val ex1 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str struct<a:array<int>,b:long>")
- }
- assert(ex1.getMessage
- .contains(
- "column rename operation failed: Altering datatypes of any child
column is not supported"))
-
val ex2 = intercept[ProcessMetaDataException] {
sql("alter table test_rename change str str struct<a:int,b:long,c:int>")
}
@@ -125,14 +118,10 @@ class AlterTableColumnRenameTestCase extends QueryTest
with BeforeAndAfterAll {
sql("alter table test_rename change map1 map2 map<string,
struct<a:int>>")
}
assert(ex6.getMessage
- .contains("rename operation failed: Alter rename is unsupported for Map
datatype column"))
-
- val ex7 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str3 str33 struct<a:int,
bc:map<string, string>>")
- }
- assert(ex7.getMessage
.contains(
- "rename operation failed: Cannot alter complex structure that includes
map type column"))
+ "operation failed for default.test_rename: Alter table data type
change or column rename " +
+ "operation failed: Given column map1.val.value with data type STRING
cannot be modified. " +
+ "Only Int and Decimal data types are allowed for modification"))
// ensure all failed rename operations have been reverted to original state
val describe = sql("desc table test_rename")
@@ -307,17 +296,128 @@ class AlterTableColumnRenameTestCase extends QueryTest
with BeforeAndAfterAll {
make(Array("hello11", "world11")), make(Array(4555)))))
}
- test("validate alter change datatype for complex children columns") {
+ test("test alter rename and change datatype for map of
(primitive/array/struct)") {
sql("drop table if exists test_rename")
sql(
- "CREATE TABLE test_rename (str struct<a:int,b:long>) STORED AS
carbondata")
-
- val ex1 = intercept[ProcessMetaDataException] {
- sql("alter table test_rename change str str struct<a:long,b:long>")
- }
- assert(ex1.getMessage
- .contains(
- "column rename operation failed: Altering datatypes of any child
column is not supported"))
+ "CREATE TABLE test_rename (map1 map<int,int>, map2
map<string,array<int>>, " +
+ "map3 map<int, map<string,int>>, map4 map<string,struct<b:int>>) STORED
AS carbondata")
+ sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+ "map(2,map('hello',1)), map('hi',named_struct('b',3)))")
+ // rename parent column from map1 to map11 and read old rows
+ sql("alter table test_rename change map1 map11 map<int,int>")
+ sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+ "map(2,map('hello',1)), map('hi',named_struct('b',3)))")
+ checkAnswer(sql("select map11 from test_rename"), Seq(Row(Map(1 -> 2)),
+ Row(Map(1 -> 2))))
+ // rename parent column from map2 to map22 and read old rows
+ sql("alter table test_rename change map2 map22 map<string,array<int>>")
+ sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+ "map(2,map('hello',1)), map('hi',named_struct('b',3)))")
+ checkAnswer(sql("select map22 from test_rename"), Seq(Row(Map("a" ->
make(Array(1, 2)))),
+ Row(Map("a" -> make(Array(1, 2)))), Row(Map("a" -> make(Array(1, 2))))))
+ // rename child column and change datatype
+ sql("alter table test_rename change map4 map4 map<string,struct<b2:long>>")
+ sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+ "map(2,map('hello',1)), map('hi',named_struct('b',26557544541)))")
+ checkAnswer(sql("describe test_rename"),
+ Seq(Row("map11", "map<int,int>", null),
+ Row("map22", "map<string,array<int>>", null),
+ Row("map3", "map<int,map<string,int>>", null),
+ Row("map4", "map<string,struct<b2:bigint>>", null)))
+ checkAnswer(sql("select map4['hi']['b2'] from test_rename"),
+ Seq(Row(3), Row(3), Row(3), Row(26557544541L)))
+ }
+
+ test("test alter rename and change datatype for struct integer") {
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (str struct<a:int>) STORED AS carbondata")
+ sql("insert into test_rename values(named_struct('a', 1234))")
+ sql("insert into test_rename values(named_struct('a', 3456))")
+ // only rename operation
+ sql("alter table test_rename change str str1 struct<a1:int>")
+ // both rename and change datatype operation
+ sql("alter table test_rename change str1 str1 struct<a2:long>")
+ sql("insert into test_rename values(named_struct('a2', 26557544541))")
+ // rename child column
+ sql("alter table test_rename change str1 str2 struct<a3:long>")
+ sql("insert into test_rename values(named_struct('a3', 26557544541))")
+ checkAnswer(sql("describe test_rename"), Seq(Row("str2",
"struct<a3:bigint>", null)))
+ checkAnswer(sql("select str2 from test_rename"),
+ Seq(Row(Row(1234L)), Row(Row(3456L)), Row(Row(26557544541L)),
Row(Row(26557544541L))))
+ }
+
+ test("test alter rename and change datatype for map integer") {
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (name string,mapField1 MAP<int, int>) STORED
AS carbondata")
+ sql("insert into test_rename values('a',map(1,2))")
+ sql("insert into test_rename values('v',map(3,4))")
+ sql(s"create index si_1 on test_rename(name) as 'carbondata'")
+ // only rename operation
+ sql("alter table test_rename change mapField1 mapField2 MAP<int, int>")
+ sql("insert into test_rename values('df',map(5, 6))")
+ // both rename and change datatype operation
+ sql("alter table test_rename change mapField2 mapField3 MAP<int, long>")
+ sql("insert into test_rename values('sdf',map(7, 26557544541))")
+ sql("describe test_rename").show(false)
+ checkAnswer(sql("describe test_rename"),
+ Seq(Row("name", "string", null), Row("mapfield3", "map<int,bigint>",
null)))
+ checkAnswer(sql("select mapField3 from test_rename"),
+ Seq(Row(Map(1 -> 2L)), Row(Map(3 -> 4L)), Row(Map(5 -> 6L)), Row(Map(7
-> 26557544541L))))
+ }
+
+ test("test alter rename and change datatype for array integer") {
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (arr array<int>) STORED AS carbondata")
+ sql("insert into test_rename values(array(1,2,3))")
+ sql("insert into test_rename values(array(4,5,6))")
+ // only rename operation
+ sql("alter table test_rename change arr arr1 array<int>")
+ sql("insert into test_rename values(array(7,8,9))")
+ // both rename and change datatype operation
+ sql("alter table test_rename change arr1 arr2 array<long>")
+ sql("insert into test_rename values(array(26557544541,3,46557544541))")
+ checkAnswer(sql("describe test_rename"), Seq(Row("arr2", "array<bigint>",
null)))
+ checkAnswer(sql("select arr2 from test_rename"),
+ Seq(Row(make(Array(1, 2, 3))), Row(make(Array(4, 5, 6))),
Row(make(Array(7, 8, 9))),
+ Row(make(Array(26557544541L, 3, 46557544541L)))))
+ }
+
+ test("test alter rename and change datatype for complex decimal types") {
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (strField struct<a:decimal(5,2)>," +
+ "mapField1 map<int,decimal(5,2)>, mapField2
map<int,struct<a:decimal(5,2)>>, " +
+ "arrField array<decimal(5,2)>) STORED AS carbondata")
+ sql("insert into test_rename values(named_struct('a', 123.45),map(1,
123.45)," +
+ "map(1, named_struct('a', 123.45)),array(123.45))")
+ sql("insert into test_rename values(named_struct('a', 123.45),map(2,
123.45)," +
+ "map(2, named_struct('a', 123.45)),array(123.45))")
+ // rename and change datatype
+ sql("alter table test_rename change strField strField1
struct<a1:decimal(6,2)>")
+ sql("alter table test_rename change mapField1 mapField11
map<int,decimal(6,2)>")
+ // rename and change nested decimal datatype
+ sql("alter table test_rename change mapField2 mapField22
map<int,struct<a2:decimal(6,2)>>")
+ sql("alter table test_rename change arrField arrField1
array<decimal(6,2)>")
+ sql("insert into test_rename values(named_struct('a', 1234.45),map(1,
1234.45)," +
+ "map(1, named_struct('a2', 1234.45)),array(1234.45))")
+ sql("insert into test_rename values(named_struct('a', 1234.45),map(2,
1234.45)," +
+ "map(2, named_struct('a2', 1234.45)),array(1234.45))")
+ sql("alter table test_rename compact 'minor'")
+ checkAnswer(sql("describe test_rename"),
+ Seq(Row("strfield1", "struct<a1:decimal(6,2)>", null),
+ Row("mapfield11", "map<int,decimal(6,2)>", null),
+ Row("mapfield22", "map<int,struct<a2:decimal(6,2)>>", null),
+ Row("arrfield1", "array<decimal(6,2)>", null)))
+ val result1 = java.math.BigDecimal.valueOf(123.45).setScale(2)
+ val result2 = java.math.BigDecimal.valueOf(1234.45).setScale(2)
+ checkAnswer(sql("select strField1,mapField11,mapField22,arrField1 from
test_rename"),
+ Seq(Row(Row(result1), Map(1 -> result1), Map(1 -> Row(result1)),
+ make(Array(result1))),
+ Row(Row(result1), Map(2 -> result1), Map(2 -> Row(result1)),
+ make(Array(result1))),
+ Row(Row(result2), Map(1 -> result2), Map(1 -> Row(result2)),
+ make(Array(result2))),
+ Row(Row(result2), Map(2 -> result2), Map(2 -> Row(result2)),
+ make(Array(result2)))))
}
test("test change comment in case of complex types") {
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
index 6e60c45..e0e5c58 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
@@ -19,11 +19,14 @@ package org.apache.spark.carbondata.restructure.vectorreader
import java.math.BigDecimal
+import scala.collection.mutable.WrappedArray.make
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
class ChangeDataTypeTestCases extends QueryTest with BeforeAndAfterAll {
@@ -173,6 +176,55 @@ class ChangeDataTypeTestCases extends QueryTest with
BeforeAndAfterAll {
test_change_data_type()
}
+ test("test alter change datatype for complex types") {
+ sql("drop table if exists test_rename")
+ sql("CREATE TABLE test_rename (name string) STORED AS carbondata")
+ // add complex columns
+ sql("alter table test_rename add columns(mapField1 MAP<int, int>, " +
+ "strField1 struct<a:int,b:decimal(5,2)>, arrField1 array<int>)")
+ sql("insert into test_rename values('df',map(5, 6),named_struct('a',1,'b',
123.45),array(1))")
+ // change datatype operation
+ sql("alter table test_rename change mapField1 mapField1 MAP<int, long>")
+ assert(intercept[ProcessMetaDataException] {
+ sql("alter table test_rename change strField1 strField1
struct<a:long,b:decimal(3,2)>")
+ }.getMessage
+ .contains(
+ "operation failed for default.test_rename: Alter table data type
change or column rename " +
+ "operation failed: Given column strfield1.b cannot be modified.
Specified precision value" +
+ " 3 should be greater than current precision value 5"))
+ sql("alter table test_rename change strField1 strField1
struct<a:long,b:decimal(6,2)>")
+ sql("alter table test_rename change arrField1 arrField1 array<long>")
+ sql("insert into test_rename values('sdf',map(7, 26557544541)," +
+ "named_struct('a',26557544541,'b', 1234.45),array(26557544541))")
+ // add nested complex columns
+ sql("alter table test_rename add columns(mapField2 MAP<int, array<int>>, "
+
+ "strField2 struct<a:int,b:MAP<int, int>>, arrField2
array<struct<a:int>>)")
+ sql("insert into test_rename values('df',map(7, 26557544541),named_struct"
+
+ "('a',26557544541,'b', 1234.45),array(26557544541),map(5, array(1))," +
+ "named_struct('a',1,'b',map(5,6)),array(named_struct('a',1)))")
+ // change datatype operation at nested level
+ sql("alter table test_rename change mapField2 mapField2 MAP<int,
array<long>>")
+ sql("alter table test_rename change strField2 strField2
struct<a:int,b:map<int,long>>")
+ sql("alter table test_rename change arrField2 arrField2
array<struct<a:long>>")
+ sql("insert into test_rename values('sdf',map(7,
26557544541),named_struct" +
+ "('a',26557544541,'b', 1234.45),array(26557544541),map(7,
array(26557544541))," +
+ "named_struct('a',2,'b', map(7,
26557544541)),array(named_struct('a',26557544541)))")
+ sql("alter table test_rename compact 'minor'")
+ val result1 = java.math.BigDecimal.valueOf(123.45).setScale(2)
+ val result2 = java.math.BigDecimal.valueOf(1234.45).setScale(2)
+ checkAnswer(sql("select * from test_rename"),
+ Seq(
+ Row("df", Map(5 -> 6), Row(1, result1), make(Array(1)), null, null,
null),
+ Row("sdf", Map(7 -> 26557544541L), Row(26557544541L, result2),
make(Array(26557544541L)),
+ null, null, null),
+ Row("df", Map(7 -> 26557544541L), Row(26557544541L, result2),
make(Array(26557544541L)),
+ Map(5 -> make(Array(1))), Row(1, Map(5 -> 6)), make(Array(Row(1)))),
+ Row("sdf", Map(7 -> 26557544541L), Row(26557544541L, result2),
make(Array(26557544541L)),
+ Map(7 -> make(Array(26557544541L))), Row(2, Map(7 -> 26557544541L)),
+ make(Array(Row(26557544541L))))
+ ))
+ }
+
override def afterAll {
sqlContext.setConf("carbon.enable.vector.reader",
CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)