This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new ad576d4 [CARBONDATA-3596] Fix exception when execute load data command or select sql on a table which includes complex columns after execute 'add column' command ad576d4 is described below commit ad576d43d7cc25dedceed8c78126e40b2e1b52e6 Author: Zhang Zhichao <441586...@qq.com> AuthorDate: Tue Nov 26 23:53:12 2019 +0800 [CARBONDATA-3596] Fix exception when execute load data command or select sql on a table which includes complex columns after execute 'add column' command Problem: After execute 'add column' command, it will throw exception when execute load data command or select sql on a table which includes complex columns Solution: Put complex type columns at the end of dimension columns after execute 'add column' command. This closes #3485 --- .gitignore | 6 + .../impl/DictionaryBasedResultCollector.java | 25 ++++- .../RestructureBasedDictionaryResultCollector.java | 5 +- .../collector/impl/RowIdBasedResultCollector.java | 2 +- .../datamap/examples/MinMaxIndexDataMap.java | 2 +- .../alterTable/TestAlterTableAddColumns.scala | 122 +++++++++++++++++++++ .../command/carbonTableSchemaCommon.scala | 26 ++++- log | 0 8 files changed, 176 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 854ebff..d0d1505 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,9 @@ store/CSDK/cmake-build-debug/* metastore_db/ derby.log python/.idea/ +*/.cache-main +*/.cache-tests +*/*/.cache-main +*/*/.cache-tests +*/*/*/.cache-main +*/*/*/.cache-tests diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java index 3e39dca..d011da3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java @@ -149,7 +149,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect fillComplexColumnDataBufferForThisRow(); for (int i = 0; i < queryDimensions.length; i++) { fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray, - comlexDimensionInfoMap, row, i); + comlexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal()); } } else { scannedResult.incrementCounter(); @@ -239,9 +239,23 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect } } + /** + * fill the data of dimension columns into row + * + * @param scannedResult + * @param surrogateResult + * @param noDictionaryKeys + * @param complexTypeKeyArray + * @param complexDimensionInfoMap + * @param row: row data + * @param i: dimension columns index + * @param actualOrdinal: the actual ordinal of dimension columns in segment + * + */ void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResult, byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray, - Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i) { + Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i, + int actualOrdinal) { if (!dictionaryEncodingArray[i]) { if (implicitColumnArray[i]) { if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID @@ -261,9 +275,8 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++])); } else { row[order[i]] = - complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal()) - .getDataBasedOnDataType( - ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++])); + complexDimensionInfoMap.get(actualOrdinal).getDataBasedOnDataType( + ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++])); } } else { if (queryDimensionToComplexParentOrdinal.get(i) != -1) { @@ -283,7 +296,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]); } } else if (complexDataTypeArray[i]) { - row[order[i]] = complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal()) + row[order[i]] = complexDimensionInfoMap.get(actualOrdinal) .getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++])); dictionaryColumnIndex++; } else { diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java index 73b0d6d..3627e00 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java @@ -43,7 +43,6 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe measureDefaultValues = new Object[queryMeasures.length]; fillMeasureDefaultValues(); initDimensionAndMeasureIndexesForFillingData(); - initDimensionAndMeasureIndexesForFillingData(); isDimensionExists = queryDimensions.length > 0; } @@ -83,6 +82,7 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe dictionaryColumnIndex = 0; noDictionaryColumnIndex = 0; complexTypeColumnIndex = 0; + int segmentDimensionsIdx = 0; for (int i = 0; i < queryDimensions.length; i++) { // fill default value in case the dimension does not exist in the current block if (!dimensionInfo.getDimensionExists()[i]) { @@ -98,7 +98,8 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe continue; } fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray, - comlexDimensionInfoMap, row, i); + comlexDimensionInfoMap, row, i, executionInfo + .getProjectionDimensions()[segmentDimensionsIdx++].getDimension().getOrdinal()); } } else { scannedResult.incrementCounter(); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java index 30ce616..2111b02 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java @@ -57,7 +57,7 @@ public class RowIdBasedResultCollector extends DictionaryBasedResultCollector { complexTypeColumnIndex = 0; for (int i = 0; i < queryDimensions.length; i++) { fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray, - comlexDimensionInfoMap, row, i); + comlexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal()); } } else { scannedResult.incrementCounter(); diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java index 54dfb1b..d32afd9 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java @@ -181,4 +181,4 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap { // keep default, one record in one datamap return 1; } -} \ No newline at end of file +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala new file mode 100644 index 0000000..a6d1d62 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.alterTable + +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + } + + override def afterAll(): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + } + + private def testAddColumnForComplexTable(): Unit = { + val tableName = "test_add_column_for_complex_table" + sql(s"""DROP TABLE IF EXISTS ${ tableName }""") + sql( + s""" + | CREATE TABLE IF NOT EXISTS ${ tableName }(id INT, name STRING, file array<array<float>>, + | city STRING, salary FLOAT, ls STRING, map_column map<short,int>, struct_column struct<s:short>) + | STORED BY 'carbondata' + | TBLPROPERTIES('sort_columns'='name', 'SORT_SCOPE'='LOCAL_SORT', 'LONG_STRING_COLUMNS'='ls', + | 'LOCAL_DICTIONARY_ENABLE'='true', 'LOCAL_DICTIONARY_INCLUDE'='city') + """.stripMargin) + sql( + s""" + | insert into table ${tableName} values + | (1, 'name1', array(array(1.1, 2.1), array(1.1, 2.1)), 'city1', 40000.0, '${ ("123" * 12000) }', map(1,1), named_struct('s',1)), + | (2, 'name2', array(array(1.2, 2.2), array(1.2, 2.2)), 'city2', 50000.0, '${ ("456" * 12000) }', map(2,2), named_struct('s',2)), + | (3, 'name3', array(array(1.3, 2.3), array(1.3, 2.3)), 'city3', 60000.0, '${ ("789" * 12000) }', map(3,3), named_struct('s',3)) + """.stripMargin) + checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(3))) + checkAnswer(sql(s"select name, city from ${ tableName } where id = 3"), Seq(Row("name3", "city3"))) + + sql(s"""desc formatted ${tableName}""").show(100, false) + sql(s"""alter table ${tableName} add columns (add_column string) TBLPROPERTIES('LOCAL_DICTIONARY_INCLUDE'='add_column')""") + sql(s"""ALTER TABLE ${tableName} SET TBLPROPERTIES('SORT_COLUMNS'='id, add_column, city')""") + sql(s"""desc formatted ${tableName}""").show(100, false) + + sql( + s""" + | insert into table ${tableName} values + | (4, 'name4', array(array(1.4, 2.4), array(1.4, 2.4)), 'city4', 70000.0, '${ ("123" * 12000) }', map(4,4), named_struct('s',4), 'add4'), + | (5, 'name5', array(array(1.5, 2.5), array(1.5, 2.5)), 'city5', 80000.0, '${ ("456" * 12000) }', map(5,5), named_struct('s',5), 'add5'), + | (6, 'name6', array(array(1.6, 2.6), array(1.6, 2.6)), 'city6', 90000.0, '${ ("789" * 12000) }', map(6,6), named_struct('s',6), 'add6') + """.stripMargin) + checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(6))) + checkAnswer(sql(s"""select add_column, id, city, name from ${ tableName } where id = 6"""), + Seq(Row("add6", 6, "city6", "name6"))) + + sql(s"""desc formatted ${tableName}""").show(100, false) + sql(s"""ALTER TABLE ${ tableName } DROP COLUMNS (city)""") + sql(s"""desc formatted ${tableName}""").show(100, false) + + sql( + s""" + | insert into table ${tableName} values + | (7, 'name7', array(array(1.4, 2.4), array(1.4, 2.4)), 70000.0, '${ ("123" * 12000) }', map(7,7), named_struct('s',7), 'add7'), + | (8, 'name8', array(array(1.5, 2.5), array(1.5, 2.5)), 80000.0, '${ ("456" * 12000) }', map(8,8), named_struct('s',8), 'add8'), + | (9, 'name9', array(array(1.6, 2.6), array(1.6, 2.6)), 90000.0, '${ ("789" * 12000) }', map(9,9), named_struct('s',9), 'add9') + """.stripMargin) + checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(9))) + checkAnswer(sql(s"""select id, add_column, name from ${ tableName } where id = 9"""), Seq(Row(9, "add9", "name9"))) + + sql(s"""desc formatted ${tableName}""").show(100, false) + sql(s"""alter table ${tableName} add columns (add_column_ls string) TBLPROPERTIES('LONG_STRING_COLUMNS'='add_column_ls')""") + sql(s"""desc formatted ${tableName}""").show(100, false) + + sql( + s""" + | insert into table ${tableName} values + | (10, 'name10', array(array(1.4, 2.4), array(1.4, 2.4)), 100000.0, '${ ("123" * 12000) }', map(4,4), named_struct('s',4), 'add4', '${ ("999" * 12000) }'), + | (11, 'name11', array(array(1.5, 2.5), array(1.5, 2.5)), 110000.0, '${ ("456" * 12000) }', map(5,5), named_struct('s',5), 'add5', '${ ("888" * 12000) }'), + | (12, 'name12', array(array(1.6, 2.6), array(1.6, 2.6)), 120000.0, '${ ("789" * 12000) }', map(6,6), named_struct('s',6), 'add6', '${ ("777" * 12000) }') + """.stripMargin) + checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(12))) + checkAnswer(sql(s"""select id, name, file, add_column_ls, map_column, struct_column from ${ tableName } where id = 10"""), + Seq(Row(10, "name10", + mutable.WrappedArray.make(Array(mutable.WrappedArray.make(Array(1.4, 2.4)), mutable.WrappedArray.make(Array(1.4, 2.4)))), + ("999" * 12000), Map(4 -> 4), Row(4)))) + + sql(s"""DROP TABLE IF EXISTS ${ tableName }""") + } + + test("[CARBONDATA-3596] Fix exception when execute load data command or select sql on a table which includes complex columns after execute 'add column' command") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + // test for not vector reader + testAddColumnForComplexTable() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + // test for vector reader + testAddColumnForComplexTable() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 3db3ebe..b6441fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -241,7 +241,12 @@ class AlterTableColumnSchemaGenerator( val tableSchema = tableInfo.getFactTable val tableCols = tableSchema.getListOfColumns.asScala val existingColsSize = tableCols.size - var allColumns = tableCols.filter(x => x.isDimensionColumn) + var longStringCols = Seq[ColumnSchema]() + // get all original dimension columns + // but exclude complex type columns and long string columns + var allColumns = tableCols.filter(x => + (x.isDimensionColumn && !x.getDataType.isComplexType() + && x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR))) var newCols = Seq[ColumnSchema]() var invertedIndxCols: Array[String] = Array[String]() if (alterTableModel.tableProperties.get(CarbonCommonConstants.INVERTED_INDEX).isDefined) { @@ -249,6 +254,7 @@ class AlterTableColumnSchemaGenerator( .split(',').map(_.trim) } + // add new dimension columns alterTableModel.dimCols.foreach(field => { val encoders = new java.util.ArrayList[Encoding]() encoders.add(Encoding.DICTIONARY) @@ -263,10 +269,26 @@ class AlterTableColumnSchemaGenerator( alterTableModel.databaseName.getOrElse(dbName), isSortColumn(field.name.getOrElse(field.column)), isVarcharColumn(field.name.getOrElse(field.column))) - allColumns ++= Seq(columnSchema) + if (columnSchema.getDataType == DataTypes.VARCHAR) { + // put the new long string columns in 'longStringCols' + // and add them after old long string columns + longStringCols ++= Seq(columnSchema) + } else { + allColumns ++= Seq(columnSchema) + } newCols ++= Seq(columnSchema) }) + // put the old long string columns + allColumns ++= tableCols.filter(x => + (x.isDimensionColumn && (x.getDataType == DataTypes.VARCHAR))) + // put the new long string columns + allColumns ++= longStringCols + // put complex type columns at the end of dimension columns + allColumns ++= tableCols.filter(x => + (x.isDimensionColumn && (x.getDataType.isComplexType() || x.getSchemaOrdinal == -1))) + // original measure columns allColumns ++= tableCols.filter(x => !x.isDimensionColumn) + // add new measure columns alterTableModel.msrCols.foreach(field => { val encoders = new java.util.ArrayList[Encoding]() val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema( diff --git a/log b/log deleted file mode 100644 index e69de29..0000000