This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 7a0fe21 [HOTFIX] fix dataloading/insertInto complex data type issue for partition table 7a0fe21 is described below commit 7a0fe210cf395fcbc70dabe38a4781d7558e50ea Author: QiangCai <qiang...@qq.com> AuthorDate: Thu Jan 2 16:12:13 2020 +0800 [HOTFIX] fix dataloading/insertInto complex data type issue for partition table for partition table, complex data type of target table should be converted to binary data type. This closes #3555 --- .../StandardPartitionComplexDataTypeTestCase.scala | 103 +++++++++++++++++++++ .../command/management/CarbonLoadDataCommand.scala | 16 +++- 2 files changed, 114 insertions(+), 5 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionComplexDataTypeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionComplexDataTypeTestCase.scala new file mode 100644 index 0000000..c3923e5 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionComplexDataTypeTestCase.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.standardpartition + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * StandardPartitionComplexDataTypeTestCase + */ +class StandardPartitionComplexDataTypeTestCase extends QueryTest with BeforeAndAfterAll { + + override protected def beforeAll(): Unit = { + dropTable + } + + override protected def afterAll(): Unit = { + dropTable + } + + def dropTable = { + sql("drop table if exists tbl_complex_p") + sql("drop table if exists tbl_complex_p_carbondata") + } + + test("test complex datatype for partition table") { + val tableName1 = "tbl_complex_p" + sql(s"drop table if exists $tableName1") + sql( + s""" + | create table $tableName1 ( + | col1 int, + | col2 string, + | col3 float, + | col4 struct<level: string, ratio: float, sub: struct<level: string, ratio: float>>, + | col5 array<struct<ratio: float>>, + | col6 map<string, struct<ratio: float>>, + | col7 date + | ) """.stripMargin) + sql(s""" + | insert into table $tableName1 + | select + | 1, + | 'a', + | 1.1, + | struct('b', 1.2, struct('bc', 1.21)), + | array(struct(1.3), struct(1.4)), + | map('l1', struct(1.5), 'l2', struct(1.6)), + | to_date('2019-01-01') + | """.stripMargin) + + val tableName2 = "tbl_complex_p_carbondata" + sql(s"drop table if exists $tableName2") + sql( + s""" + | create table $tableName2 ( + | col1 int, + | col2 string, + | col3 float, + | col4 struct<level: string, ratio: float, sub: struct<level: string, ratio: float>>, + | col5 array<struct<ratio: float>>, + | col6 map<string, struct<ratio: float>> + | ) + | stored as carbondata + | partitioned by (col7 date) + | """.stripMargin) + sql(s"insert into table $tableName2 select * from $tableName1") + checkAnswer( + sql( + s""" + |select + | cast(round(col4.ratio, 1) as float), + | cast(round(col4.sub.ratio, 2) as float), + | cast(round(col5[1].ratio, 1) as float), + | cast(round(col6['l1'].ratio, 1) as float) + | from $tableName2 + |""".stripMargin), + sql( + s""" + |select + | col4.ratio, col4.sub.ratio, + | col5[1].ratio, + | col6['l1'].ratio + | from $tableName1 + |""".stripMargin) + ) + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 4e560b8..7a853b9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -872,13 +872,19 @@ case class CarbonLoadDataCommand( val table = loadModel.getCarbonDataLoadSchema.getCarbonTable val metastoreSchema = StructType(catalogTable.schema.fields.map{f => val column = table.getColumnByName(f.name) - if (column.hasEncoding(Encoding.DICTIONARY)) { - f.copy(dataType = IntegerType) - } else if (f.dataType == TimestampType || f.dataType == DateType) { - f.copy(dataType = LongType) + val updatedDataType = if (column.hasEncoding(Encoding.DICTIONARY)) { + IntegerType } else { - f + f.dataType match { + case TimestampType | DateType => + LongType + case _: StructType | _: ArrayType | _: MapType => + BinaryType + case _ => + f.dataType + } } + f.copy(dataType = updatedDataType) }) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val catalog = new CatalogFileIndex(