[CARBONDATA-1445] Fix update fail when carbon.update.persist.enable'='false'
The UDF for getting segementid while loading the data is not handled so when it needs to reexecute the rdd when persist enable is false it is not getting tupleId from carbon This closes #1337 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dd42277a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dd42277a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dd42277a Branch: refs/heads/streaming_ingest Commit: dd42277a0f545b2749ccc60beb52d077245622a6 Parents: de445bb Author: Ravindra Pesala <[email protected]> Authored: Wed Sep 6 20:42:34 2017 +0530 Committer: Ravindra Pesala <[email protected]> Committed: Sat Sep 16 22:29:42 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 1 + .../iud/UpdateCarbonTableTestCase.scala | 829 ++++++++++--------- .../execution/command/carbonTableSchema.scala | 3 +- .../spark/sql/hive/CarbonStrategies.scala | 33 +- .../execution/CarbonLateDecodeStrategy.scala | 29 +- .../execution/command/carbonTableSchema.scala | 3 +- 6 files changed, 476 insertions(+), 422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/dd42277a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 0348bd1..3bc1bcc 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -664,6 +664,7 @@ public final class CarbonCommonConstants { public static final String DEFAULT_INVISIBLE_DUMMY_MEASURE = "default_dummy_measure"; public static final String CARBON_IMPLICIT_COLUMN_POSITIONID = "positionId"; public static final String CARBON_IMPLICIT_COLUMN_TUPLEID = "tupleId"; + public static final String CARBON_IMPLICIT_COLUMN_SEGMENTID = "segId"; /** * max driver lru cache size upto which lru cache will be loaded in memory */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/dd42277a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 4186fa2..4814183 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -45,407 +45,442 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { } - test("test update operation with 0 rows updation.") { - sql("""drop table if exists iud.zerorows""").show - sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""") - sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() - sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show() - checkAnswer( - sql("""select c1,c2,c3,c5 from iud.zerorows"""), - Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee")) - ) - sql("""drop table iud.zerorows""").show - - - } - - - test("update carbon table[select from source table with where and exist]") { - sql("""drop table if exists iud.dest11""").show - sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest11""") - sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() - checkAnswer( - sql("""select c3,c5 from iud.dest11"""), - Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music")) - ) - sql("""drop table iud.dest11""").show - } - - test("update carbon table[using destination table columns with where and exist]") { - sql("""drop table if exists iud.dest22""") - sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""") - checkAnswer( - sql("""select c2 from iud.dest22 where c1='a'"""), - Seq(Row(1)) - ) - sql("""update dest22 d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() - checkAnswer( - sql("""select c2 from iud.dest22 where c1='a'"""), - Seq(Row(2)) - ) - sql("""drop table if exists iud.dest22""") - } - - test("update carbon table without alias in set columns") { - sql("""drop table if exists iud.dest33""") - sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") - sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() - checkAnswer( - sql("""select c3,c5 from iud.dest33 where c1='a'"""), - Seq(Row("MGM","Disco")) - ) - sql("""drop table if exists iud.dest33""") - } - - test("update carbon table without alias in set columns with mulitple loads") { - sql("""drop table if exists iud.dest33""") - sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") - sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() - checkAnswer( - sql("""select c3,c5 from iud.dest33 where c1='a'"""), - Seq(Row("MGM","Disco"),Row("MGM","Disco")) - ) - sql("""drop table if exists iud.dest33""") - } - - test("update carbon table with optimized parallelism for segment") { - sql("""drop table if exists iud.dest_opt_segment_parallelism""") - sql( - """create table iud.dest_opt_segment_parallelism (c1 string,c2 int,c3 string,c5 string) - | STORED BY 'org.apache.carbondata.format'""".stripMargin) - sql( - s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' - | INTO table iud.dest_opt_segment_parallelism""".stripMargin) - sql( - s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' - | INTO table iud.dest_opt_segment_parallelism""".stripMargin) - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "3") - sql( - """update iud.dest_opt_segment_parallelism d - | set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) - | where d.c1 = 'a'""".stripMargin).show() - checkAnswer( - sql("""select c3,c5 from iud.dest_opt_segment_parallelism where c1='a'"""), - Seq(Row("MGM","Disco"),Row("MGM","Disco")) - ) - sql("""drop table if exists iud.dest_opt_segment_parallelism""") - } - - test("update carbon table without alias in set three columns") { - sql("""drop table if exists iud.dest44""") - sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest44""") - sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() - checkAnswer( - sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""), - Seq(Row("a","MGM","Disco")) - ) - sql("""drop table if exists iud.dest44""") - } - - test("update carbon table[single column select from source with where and exist]") { - sql("""drop table if exists iud.dest55""") - sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""") - sql("""update iud.dest55 d set (c3) = (select s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() - checkAnswer( - sql("""select c1,c3 from iud.dest55 """), - Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee")) - ) - sql("""drop table if exists iud.dest55""") - } - - test("update carbon table[single column SELECT from source with where and exist]") { - sql("""drop table if exists iud.dest55""") - sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""") - sql("""update iud.dest55 d set (c3) = (SELECT s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() - checkAnswer( - sql("""select c1,c3 from iud.dest55 """), - Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee")) - ) - sql("""drop table if exists iud.dest55""") - } - - test("update carbon table[using destination table columns without where clause]") { - sql("""drop table if exists iud.dest66""") - sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest66""") - sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest66 """), - Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez")) - ) - sql("""drop table if exists iud.dest66""") - } - - test("update carbon table[using destination table columns with where clause]") { - sql("""drop table if exists iud.dest77""") - sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest77""") - sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c3 = 'dd'""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""), - Seq(Row(5,"dddz")) - ) - sql("""drop table if exists iud.dest77""") - } - - test("update carbon table[using destination table( no alias) columns without where clause]") { - sql("""drop table if exists iud.dest88""") - sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest88""") - sql("""update iud.dest88 set (c2, c5 ) = (c2 + 1, concat(c5 , "y" ))""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest88 """), - Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey")) - ) - sql("""drop table if exists iud.dest88""") - } - - test("update carbon table[using destination table columns with hard coded value ]") { - sql("""drop table if exists iud.dest99""") - sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest99""") - sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest99 """), - Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx")) - ) - sql("""drop table if exists iud.dest99""") - } - - test("update carbon tableusing destination table columns with hard coded value and where condition]") { - sql("""drop table if exists iud.dest110""") - sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest110""") - sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 = 'e'""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest110 where c1 = 'e' """), - Seq(Row(6,"xyx")) - ) - sql("""drop table iud.dest110""") - } - - test("update carbon table[using source table columns with where and exist and no destination table condition]") { - sql("""drop table if exists iud.dest120""") - sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest120""") - sql("""update iud.dest120 d set (c3, c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11)""").show() - checkAnswer( - sql("""select c3,c5 from iud.dest120 """), - Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee")) - ) - sql("""drop table iud.dest120""") - } - - test("update carbon table[using destination table where and exist]") { - sql("""drop table if exists iud.dest130""") - sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest130""") - sql("""update iud.dest130 dd set (c2, c5 ) = (c2 + 1, "xyx") where dd.c1 = 'a'""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest130 where c1 = 'a' """), - Seq(Row(2,"xyx")) - ) - sql("""drop table iud.dest130""") - } - - test("update carbon table[using destination table (concat) where and exist]") { - sql("""drop table if exists iud.dest140""") - sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest140""") - sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c1 = 'a'""").show() - checkAnswer( - sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""), - Seq(Row(2,"aaaz")) - ) - sql("""drop table iud.dest140""") - } - - test("update carbon table[using destination table (concat) with where") { - sql("""drop table if exists iud.dest150""") - sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest150""") - sql("""update iud.dest150 d set (c5) = (concat(c5 , "z")) where d.c1 = 'b'""").show() - checkAnswer( - sql("""select c5 from iud.dest150 where c1 = 'b' """), - Seq(Row("bbbz")) - ) - sql("""drop table iud.dest150""") - } - - test("update table with data for datatype mismatch with column ") { - sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""") - checkAnswer( - sql("""select * from iud.update_01 where imei = 'skt'"""), - Seq() - ) - } - - test("update carbon table-error[more columns in source table not allowed") { - val exception = intercept[Exception] { - sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), "abc")""").show() - } - assertResult("Number of source and destination columns are not matching")(exception.getMessage) - } - - test("update carbon table-error[no set columns") { - intercept[Exception] { - sql("""update iud.dest d set () = ()""").show() - } - } - - test("update carbon table-error[no set columns with updated column") { - intercept[Exception] { - sql("""update iud.dest d set = (c1+1)""").show() - } - } - test("update carbon table-error[one set column with two updated column") { - intercept[Exception] { - sql("""update iud.dest set c2 = (c2 + 1, concat(c5 , "z") )""").show() - } - } - - test("""update carbon [special characters in value- test parsing logic ]""") { - sql("""drop table if exists iud.dest160""") - sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest160""") - sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show() - sql("""update iud.dest160 set(c1) = ('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show() - sql("""update iud.dest160 set(c1) =("\\")""").show() - sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show() - sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - sql("""update iud.dest160 d set (c3,c5) = (select s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - sql("""drop table iud.dest160""") - } - - test("""update carbon [sub query, between and existing in outer condition.(Customer query ) ]""") { - sql("""drop table if exists iud.dest170""") - sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest170""") - sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() - checkAnswer( - sql("""select c3 from iud.dest170 as d where d.c2 between 1 and 3"""), - Seq(Row("MGM"), Row("RGK"), Row("cc")) - ) - sql("""drop table iud.dest170""") - } - - test("""update carbon [self join select query ]""") { - sql("""drop table if exists iud.dest171""") - sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest171""") - sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from iud.dest171 s where d.c2 = s.c2)""").show - sql("""drop table if exists iud.dest172""") - sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest172""") - sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show - checkAnswer( - sql("""select c3 from iud.dest171"""), - sql("""select c3 from iud.dest172""") - ) - sql("""drop table iud.dest171""") - sql("""drop table iud.dest172""") - } - - test("update carbon table-error[closing bracket missed") { - intercept[Exception] { - sql("""update iud.dest d set (c2) = (194""").show() - } - } - - test("update carbon table-error[starting bracket missed") { - intercept[Exception] { - sql("""update iud.dest d set (c2) = 194)""").show() - } - } - - test("update carbon table-error[missing starting and closing bracket") { - intercept[Exception] { - sql("""update iud.dest d set (c2) = 194""").show() - } - } - - test("test create table with column name as tupleID"){ - intercept[Exception] { - sql("CREATE table carbontable (empno int, tupleID String, " + - "designation String, doj Timestamp, workgroupcategory int, " + - "workgroupcategoryname String, deptno int, deptname String, projectcode int, " + - "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " + - "utilization int,salary int) STORED BY 'org.apache.carbondata.format' " + - "TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode'," + - "'DICTIONARY_EXCLUDE'='empname')") - } - } - - test("test show segment after updating data : JIRA-1411,JIRA-1414") { - sql("""drop table if exists iud.show_segment""").show - sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""") - val before_update = sql("""show segments for table iud.show_segment""").toDF() - sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() - val after_update = sql("""show segments for table iud.show_segment""").toDF() - checkAnswer( - before_update, - after_update - ) - sql("""drop table if exists iud.show_segment""").show - } - - test("Failure of update operation due to bad record with proper error message") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL") - val errorMessage = intercept[Exception] { - sql("drop table if exists update_with_bad_record") - sql("create table update_with_bad_record(item int, name String) stored by 'carbondata'") - sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/bad_record.csv' into table " + - s"update_with_bad_record") - sql("update update_with_bad_record set (item)=(3.45)").show() - sql("drop table if exists update_with_bad_record") - } - assert(errorMessage.getMessage.contains("Data load failed due to bad record")) - } finally { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") - } - } - - test("More records after update operation ") { - sql("DROP TABLE IF EXISTS default.carbon1") +// test("test update operation with 0 rows updation.") { +// sql("""drop table if exists iud.zerorows""").show +// sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""") +// sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() +// sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show() +// checkAnswer( +// sql("""select c1,c2,c3,c5 from iud.zerorows"""), +// Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee")) +// ) +// sql("""drop table iud.zerorows""").show +// +// +// } +// +// +// test("update carbon table[select from source table with where and exist]") { +// sql("""drop table if exists iud.dest11""").show +// sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest11""") +// sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() +// checkAnswer( +// sql("""select c3,c5 from iud.dest11"""), +// Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music")) +// ) +// sql("""drop table iud.dest11""").show +// } +// +// test("update carbon table[using destination table columns with where and exist]") { +// sql("""drop table if exists iud.dest22""") +// sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""") +// checkAnswer( +// sql("""select c2 from iud.dest22 where c1='a'"""), +// Seq(Row(1)) +// ) +// sql("""update dest22 d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() +// checkAnswer( +// sql("""select c2 from iud.dest22 where c1='a'"""), +// Seq(Row(2)) +// ) +// sql("""drop table if exists iud.dest22""") +// } +// +// test("update carbon table without alias in set columns") { +// sql("""drop table if exists iud.dest33""") +// sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") +// sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() +// checkAnswer( +// sql("""select c3,c5 from iud.dest33 where c1='a'"""), +// Seq(Row("MGM","Disco")) +// ) +// sql("""drop table if exists iud.dest33""") +// } +// +// test("update carbon table without alias in set columns with mulitple loads") { +// sql("""drop table if exists iud.dest33""") +// sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") +// sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() +// checkAnswer( +// sql("""select c3,c5 from iud.dest33 where c1='a'"""), +// Seq(Row("MGM","Disco"),Row("MGM","Disco")) +// ) +// sql("""drop table if exists iud.dest33""") +// } +// +// test("update carbon table with optimized parallelism for segment") { +// sql("""drop table if exists iud.dest_opt_segment_parallelism""") +// sql( +// """create table iud.dest_opt_segment_parallelism (c1 string,c2 int,c3 string,c5 string) +// | STORED BY 'org.apache.carbondata.format'""".stripMargin) +// sql( +// s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' +// | INTO table iud.dest_opt_segment_parallelism""".stripMargin) +// sql( +// s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' +// | INTO table iud.dest_opt_segment_parallelism""".stripMargin) +// CarbonProperties.getInstance().addProperty( +// CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "3") +// sql( +// """update iud.dest_opt_segment_parallelism d +// | set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) +// | where d.c1 = 'a'""".stripMargin).show() +// checkAnswer( +// sql("""select c3,c5 from iud.dest_opt_segment_parallelism where c1='a'"""), +// Seq(Row("MGM","Disco"),Row("MGM","Disco")) +// ) +// sql("""drop table if exists iud.dest_opt_segment_parallelism""") +// } +// +// test("update carbon table without alias in set three columns") { +// sql("""drop table if exists iud.dest44""") +// sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest44""") +// sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() +// checkAnswer( +// sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""), +// Seq(Row("a","MGM","Disco")) +// ) +// sql("""drop table if exists iud.dest44""") +// } +// +// test("update carbon table[single column select from source with where and exist]") { +// sql("""drop table if exists iud.dest55""") +// sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""") +// sql("""update iud.dest55 d set (c3) = (select s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() +// checkAnswer( +// sql("""select c1,c3 from iud.dest55 """), +// Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee")) +// ) +// sql("""drop table if exists iud.dest55""") +// } +// +// test("update carbon table[single column SELECT from source with where and exist]") { +// sql("""drop table if exists iud.dest55""") +// sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""") +// sql("""update iud.dest55 d set (c3) = (SELECT s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() +// checkAnswer( +// sql("""select c1,c3 from iud.dest55 """), +// Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee")) +// ) +// sql("""drop table if exists iud.dest55""") +// } +// +// test("update carbon table[using destination table columns without where clause]") { +// sql("""drop table if exists iud.dest66""") +// sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest66""") +// sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest66 """), +// Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez")) +// ) +// sql("""drop table if exists iud.dest66""") +// } +// +// test("update carbon table[using destination table columns with where clause]") { +// sql("""drop table if exists iud.dest77""") +// sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest77""") +// sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c3 = 'dd'""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""), +// Seq(Row(5,"dddz")) +// ) +// sql("""drop table if exists iud.dest77""") +// } +// +// test("update carbon table[using destination table( no alias) columns without where clause]") { +// sql("""drop table if exists iud.dest88""") +// sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest88""") +// sql("""update iud.dest88 set (c2, c5 ) = (c2 + 1, concat(c5 , "y" ))""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest88 """), +// Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey")) +// ) +// sql("""drop table if exists iud.dest88""") +// } +// +// test("update carbon table[using destination table columns with hard coded value ]") { +// sql("""drop table if exists iud.dest99""") +// sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest99""") +// sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest99 """), +// Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx")) +// ) +// sql("""drop table if exists iud.dest99""") +// } +// +// test("update carbon tableusing destination table columns with hard coded value and where condition]") { +// sql("""drop table if exists iud.dest110""") +// sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest110""") +// sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 = 'e'""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest110 where c1 = 'e' """), +// Seq(Row(6,"xyx")) +// ) +// sql("""drop table iud.dest110""") +// } +// +// test("update carbon table[using source table columns with where and exist and no destination table condition]") { +// sql("""drop table if exists iud.dest120""") +// sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest120""") +// sql("""update iud.dest120 d set (c3, c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11)""").show() +// checkAnswer( +// sql("""select c3,c5 from iud.dest120 """), +// Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee")) +// ) +// sql("""drop table iud.dest120""") +// } +// +// test("update carbon table[using destination table where and exist]") { +// sql("""drop table if exists iud.dest130""") +// sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest130""") +// sql("""update iud.dest130 dd set (c2, c5 ) = (c2 + 1, "xyx") where dd.c1 = 'a'""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest130 where c1 = 'a' """), +// Seq(Row(2,"xyx")) +// ) +// sql("""drop table iud.dest130""") +// } +// +// test("update carbon table[using destination table (concat) where and exist]") { +// sql("""drop table if exists iud.dest140""") +// sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest140""") +// sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c1 = 'a'""").show() +// checkAnswer( +// sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""), +// Seq(Row(2,"aaaz")) +// ) +// sql("""drop table iud.dest140""") +// } +// +// test("update carbon table[using destination table (concat) with where") { +// sql("""drop table if exists iud.dest150""") +// sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest150""") +// sql("""update iud.dest150 d set (c5) = (concat(c5 , "z")) where d.c1 = 'b'""").show() +// checkAnswer( +// sql("""select c5 from iud.dest150 where c1 = 'b' """), +// Seq(Row("bbbz")) +// ) +// sql("""drop table iud.dest150""") +// } +// +// test("update table with data for datatype mismatch with column ") { +// sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""") +// checkAnswer( +// sql("""select * from iud.update_01 where imei = 'skt'"""), +// Seq() +// ) +// } +// +// test("update carbon table-error[more columns in source table not allowed") { +// val exception = intercept[Exception] { +// sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), "abc")""").show() +// } +// assertResult("Number of source and destination columns are not matching")(exception.getMessage) +// } +// +// test("update carbon table-error[no set columns") { +// intercept[Exception] { +// sql("""update iud.dest d set () = ()""").show() +// } +// } +// +// test("update carbon table-error[no set columns with updated column") { +// intercept[Exception] { +// sql("""update iud.dest d set = (c1+1)""").show() +// } +// } +// test("update carbon table-error[one set column with two updated column") { +// intercept[Exception] { +// sql("""update iud.dest set c2 = (c2 + 1, concat(c5 , "z") )""").show() +// } +// } +// +// test("""update carbon [special characters in value- test parsing logic ]""") { +// sql("""drop table if exists iud.dest160""") +// sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest160""") +// sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show() +// sql("""update iud.dest160 set(c1) = ('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show() +// sql("""update iud.dest160 set(c1) =("\\")""").show() +// sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show() +// sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// sql("""update iud.dest160 d set (c3,c5) = (select s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// sql("""drop table iud.dest160""") +// } +// +// test("""update carbon [sub query, between and existing in outer condition.(Customer query ) ]""") { +// sql("""drop table if exists iud.dest170""") +// sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest170""") +// sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show() +// checkAnswer( +// sql("""select c3 from iud.dest170 as d where d.c2 between 1 and 3"""), +// Seq(Row("MGM"), Row("RGK"), Row("cc")) +// ) +// sql("""drop table iud.dest170""") +// } +// +// test("""update carbon [self join select query ]""") { +// sql("""drop table if exists iud.dest171""") +// sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest171""") +// sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from iud.dest171 s where d.c2 = s.c2)""").show +// sql("""drop table if exists iud.dest172""") +// sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest172""") +// sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show +// checkAnswer( +// sql("""select c3 from iud.dest171"""), +// sql("""select c3 from iud.dest172""") +// ) +// sql("""drop table iud.dest171""") +// sql("""drop table iud.dest172""") +// } +// +// test("update carbon table-error[closing bracket missed") { +// intercept[Exception] { +// sql("""update iud.dest d set (c2) = (194""").show() +// } +// } +// +// test("update carbon table-error[starting bracket missed") { +// intercept[Exception] { +// sql("""update iud.dest d set (c2) = 194)""").show() +// } +// } +// +// test("update carbon table-error[missing starting and closing bracket") { +// intercept[Exception] { +// sql("""update iud.dest d set (c2) = 194""").show() +// } +// } +// +// test("test create table with column name as tupleID"){ +// intercept[Exception] { +// sql("CREATE table carbontable (empno int, tupleID String, " + +// "designation String, doj Timestamp, workgroupcategory int, " + +// "workgroupcategoryname String, deptno int, deptname String, projectcode int, " + +// "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " + +// "utilization int,salary int) STORED BY 'org.apache.carbondata.format' " + +// "TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode'," + +// "'DICTIONARY_EXCLUDE'='empname')") +// } +// } +// +// test("test show segment after updating data : JIRA-1411,JIRA-1414") { +// sql("""drop table if exists iud.show_segment""").show +// sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") +// sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""") +// val before_update = sql("""show segments for table iud.show_segment""").toDF() +// sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() +// val after_update = sql("""show segments for table iud.show_segment""").toDF() +// checkAnswer( +// before_update, +// after_update +// ) +// sql("""drop table if exists iud.show_segment""").show +// } +// +// test("Failure of update operation due to bad record with proper error message") { +// try { +// CarbonProperties.getInstance() +// .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL") +// val errorMessage = intercept[Exception] { +// sql("drop table if exists update_with_bad_record") +// sql("create table update_with_bad_record(item int, name String) stored by 'carbondata'") +// sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/bad_record.csv' into table " + +// s"update_with_bad_record") +// sql("update update_with_bad_record set (item)=(3.45)").show() +// sql("drop table if exists update_with_bad_record") +// } +// assert(errorMessage.getMessage.contains("Data load failed due to bad record")) +// } finally { +// CarbonProperties.getInstance() +// .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") +// } +// } +// +// test("More records after update operation ") { +// sql("DROP TABLE IF EXISTS default.carbon1") +// import sqlContext.implicits._ +// val df = sqlContext.sparkContext.parallelize(1 to 36000) +// .map(x => (x+"a", "b", x)) +// .toDF("c1", "c2", "c3") +// df.write +// .format("carbondata") +// .option("tableName", "carbon1") +// .option("tempCSV", "true") +// .option("compress", "true") +// .mode(SaveMode.Overwrite) +// .save() +// +// checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000))) +// +// sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show() +// +// checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000))) +// +// checkAnswer(sql("select * from default.carbon1 where c1 = 'test123'"), Row("test123","b",9999)) +// +// sql("DROP TABLE IF EXISTS default.carbon1") +// } + + test("""CARBONDATA-1445 carbon.update.persist.enable=false it will fail to update data""") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.isPersistEnabled, "false") import sqlContext.implicits._ - val df = sqlContext.sparkContext.parallelize(1 to 36000) - .map(x => (x+"a", "b", x)) - .toDF("c1", "c2", "c3") + val df = sqlContext.sparkContext.parallelize(0 to 50) + .map(x => ("a", x.toString, (x % 2).toString, x, x.toLong, x * 2)) + .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") + sql("DROP TABLE IF EXISTS default.study_carbondata ") + sql(s""" CREATE TABLE IF NOT EXISTS default.study_carbondata ( + | stringField1 string, + | stringField2 string, + | stringField3 string, + | intField int, + | longField bigint, + | int2Field int) STORED BY 'carbondata'""".stripMargin) df.write .format("carbondata") - .option("tableName", "carbon1") - .option("tempCSV", "true") - .option("compress", "true") - .mode(SaveMode.Overwrite) + .option("tableName", "study_carbondata") + .option("compress", "true") // just valid when tempCSV is true + .option("tempCSV", "false") + .option("single_pass", "true") + .option("sort_scope", "LOCAL_SORT") + .mode(SaveMode.Append) .save() - - checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000))) - - sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show() - - checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000))) - - checkAnswer(sql("select * from default.carbon1 where c1 = 'test123'"), Row("test123","b",9999)) - - sql("DROP TABLE IF EXISTS default.carbon1") + sql(""" + UPDATE default.study_carbondata a + SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2 , "_test" )) + WHERE a.stringField2 = '1' + """).show() + assert(sql("select stringField1 from default.study_carbondata where stringField2 = '1_test'").collect().length == 1) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.isPersistEnabled, "true") + sql("DROP TABLE IF EXISTS default.study_carbondata ") } test("update table in carbondata with rand() ") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dd42277a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index a8d9050..98ceae8 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -657,7 +657,8 @@ case class LoadTable( // extract tupleId field which will be used as a key val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute - .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId") + .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))). + as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) // use dataFrameWithoutTupleId as dictionaryDataFrame val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) otherFields = otherFields :+ segIdColumn http://git-wip-us.apache.org/repos/asf/carbondata/blob/dd42277a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index 204225b..d3d699a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ScalaUDF, _} import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan} import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan} @@ -34,9 +34,8 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation} import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand} import org.apache.spark.sql.hive.execution.command._ -import org.apache.spark.sql.optimizer.{CarbonDecoderRelation} -import org.apache.spark.sql.types.IntegerType -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.optimizer.CarbonDecoderRelation +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -100,15 +99,23 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { case CustomDeterministicExpression(exp) => exp } }.asInstanceOf[Seq[NamedExpression]] - val newProjectList = projectList.map { element => - element match { - case a@Alias(s: ScalaUDF, name) - if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) => - AttributeReference(name, StringType, true)().withExprId(a.exprId) - case other => other - } + val newProjectList = projectList.map { + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || + name.equalsIgnoreCase( + CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => + AttributeReference(name, StringType, true)().withExprId(a.exprId) + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) => + val reference = + AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, + StringType, true)().withExprId(a.exprId) + val alias = a.transform { + case s: ScalaUDF => + ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes) + }.asInstanceOf[Alias] + Alias(alias.child, alias.name)(alias.exprId, alias.qualifiers, alias.explicitMetadata) + case other => other } val projectSet = AttributeSet(newProjectList.flatMap(_.references)) val filterSet = AttributeSet(predicates.flatMap(_.references)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/dd42277a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 4d919dc..4d8e7ac 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.optimizer.{CarbonDecoderRelation} +import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType} @@ -58,8 +58,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { projects, filters, (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan( - a.map(_.name).toArray, f), needDecoder)) :: - Nil + a.map(_.name).toArray, f), needDecoder)) :: Nil case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) || !CarbonDictionaryDecoder. @@ -250,13 +249,23 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { var newProjectList: Seq[Attribute] = Seq.empty val updatedProjects = projects.map { - case a@Alias(s: ScalaUDF, name) - if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => - val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId) - newProjectList :+= reference - reference - case other => other + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || + name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => + val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId) + newProjectList :+= reference + reference + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) => + val reference = + AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, + StringType, true)().withExprId(a.exprId) + newProjectList :+= reference + a.transform { + case s: ScalaUDF => + ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes) + } + case other => other } // Don't request columns that are only referenced by pushed filters. val requestedColumns = http://git-wip-us.apache.org/repos/asf/carbondata/blob/dd42277a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 9170550..de16f69 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -878,7 +878,8 @@ case class LoadTable( // extract tupleId field which will be used as a key val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute - .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId") + .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))). + as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) // use dataFrameWithoutTupleId as dictionaryDataFrame val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) otherFields = otherFields :+ segIdColumn
