[CARBONDATA-1918] Incorrect data is displayed when String is updated using Sentences
Incorrect data is displayed when updating a String column using Sentences UDF. Sentences UDF will give us a Array, When updating string with array, wrong data is getting updated. Therefore, we have to check for the supported type before updating. This closes #1704 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2610a609 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2610a609 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2610a609 Branch: refs/heads/branch-1.3 Commit: 2610a6091623c271552b7a69d402dded79ba3517 Parents: a9a0201 Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Wed Dec 20 18:16:10 2017 +0530 Committer: kumarvishal <kumarvishal.1...@gmail.com> Committed: Fri Feb 2 21:22:30 2018 +0530 ---------------------------------------------------------------------- .../sdv/generated/DataLoadingIUDTestCase.scala | 8 ++++---- .../testsuite/iud/UpdateCarbonTableTestCase.scala | 13 +++++++++++++ .../mutation/CarbonProjectForUpdateCommand.scala | 16 ++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2610a609/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala index b4459ab..4c232be 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala @@ -1858,13 +1858,13 @@ ignore("IUD-01-01-01_040-23", Include) { //Check for updating carbon table set column value to a value returned by split function +//Split will give us array value test("IUD-01-01-01_040-25", Include) { sql(s"""create table if not exists default.t_carbn01 (Active_status String,Item_type_cd INT,Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Discount_price DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Update_time TIMESTAMP,Create_date String)STORED BY 'org.apache.carbondata.format'""").collect sql(s"""insert into default.t_carbn01 select * from default.t_carbn01b""").collect - sql(s"""update default.t_carbn01 set (active_status)= (split('t','a')) """).collect - checkAnswer(s""" select active_status from default.t_carbn01 group by active_status """, - Seq(Row("t\\")), "DataLoadingIUDTestCase_IUD-01-01-01_040-25") - sql(s"""drop table default.t_carbn01 """).collect + intercept[Exception] { + sql(s"""update default.t_carbn01 set (active_status)= (split('t','a')) """).collect + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2610a609/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index cf4fc07..98c9a16 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -691,6 +691,19 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.FILE_SEPARATOR + "Part0") assert(f.list().length == 2) } + test("test sentences func in update statement") { + sql("drop table if exists senten") + sql("create table senten(name string, comment string) stored by 'carbondata'") + sql("insert into senten select 'aaa','comment for aaa'") + sql("insert into senten select 'bbb','comment for bbb'") + sql("select * from senten").show() + val errorMessage = intercept[Exception] { + sql("update senten set(comment)=(sentences('Hello there! How are you?'))").show() + }.getMessage + errorMessage + .contains("Unsupported data type: Array") + sql("drop table if exists senten") + } override def afterAll { sql("use default") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2610a609/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 2f12bef..318c904 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.ArrayType import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.LogServiceFactory @@ -186,6 +187,18 @@ private[sql] case class CarbonProjectForUpdateCommand( (tableName == relation.identifier.getCarbonTableIdentifier.getTableName) } + // from the dataFrame schema iterate through all the column to be updated and + // check for the data type, if the data type is complex then throw exception + def checkForUnsupportedDataType(dataFrame: DataFrame): Unit = { + dataFrame.schema.foreach(col => { + // the new column to be updated will be appended with "-updatedColumn" suffix + if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && + col.dataType.isInstanceOf[ArrayType]) { + throw new UnsupportedOperationException("Unsupported data type: Array") + } + }) + } + def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = { var header = "" var found = false @@ -206,6 +219,9 @@ private[sql] case class CarbonProjectForUpdateCommand( } header } + + // check for the data type of the new value to be updated + checkForUnsupportedDataType(dataFrame) val ex = dataFrame.queryExecution.analyzed val res = ex find { case relation: LogicalRelation