Repository: carbondata Updated Branches: refs/heads/master 52ab73097 -> c2e2ba08f
Adds example for update and delete with Spark 2.1 This closes #1159 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c2e2ba08 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c2e2ba08 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c2e2ba08 Branch: refs/heads/master Commit: c2e2ba08f464b5427d23545a61689722c65e560a Parents: 52ab730 Author: mayun <[email protected]> Authored: Tue Jul 11 22:56:10 2017 +0800 Committer: chenliang613 <[email protected]> Committed: Thu Jul 13 22:09:02 2017 +0800 ---------------------------------------------------------------------- .../spark/src/main/resources/data_update.csv | 11 -- .../examples/DataUpdateDeleteExample.scala | 120 ++++++++++--- .../examples/DataUpdateDeleteExample.scala | 173 +++++++++++++++++++ 3 files changed, 267 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2e2ba08/examples/spark/src/main/resources/data_update.csv ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/resources/data_update.csv b/examples/spark/src/main/resources/data_update.csv deleted file mode 100644 index 3f72e00..0000000 --- a/examples/spark/src/main/resources/data_update.csv +++ /dev/null @@ -1,11 +0,0 @@ -ID,country,name,phonetype,serialname,salary -1,france,bbb1,phone197,ASD69643,25000 -2,france,bbb2,phone756,ASD42892,25001 -3,france,bbb3,phone1904,ASD37014,25002 -4,france,bbb4,phone2435,ASD66902,25003 -5,france,bbb5,phone2441,ASD90633,25004 -6,germany,bbb6,phone294,ASD59961,25005 -7,germany,bbb7,phone610,ASD14875,25006 -8,germany,bbb8,phone1848,ASD57308,25007 -9,germany,bbb9,phone706,ASD86717,25008 -10,germany,bbb10,phone685,ASD30505,25009 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2e2ba08/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala index 7be392d..830a819 100644 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala @@ -17,6 +17,13 @@ package org.apache.carbondata.examples +import java.io.File +import java.text.SimpleDateFormat + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.examples.util.ExampleUtils @@ -25,20 +32,25 @@ object DataUpdateDeleteExample { def main(args: Array[String]) { val cc = ExampleUtils.createCarbonContext("DataUpdateDeleteExample") - val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv" - val testData1 = ExampleUtils.currentPath + "/src/main/resources/data_update.csv" + + // for local files + var rootPath = ExampleUtils.currentPath + // for hdfs files + // var rootPath = "hdfs://hdfs-host/carbon" + + val testData = rootPath + "/src/main/resources/data.csv" // Specify date format based on raw data CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") cc.sql("DROP TABLE IF EXISTS t3") - cc.sql("DROP TABLE IF EXISTS update_table") + cc.sql("DROP TABLE IF EXISTS t5") // Create table, 6 dimensions, 1 measure cc.sql(""" CREATE TABLE IF NOT EXISTS t3 - (ID Int, date Date, country String, + (id Int, date Date, country String, name String, phonetype String, serialname char(10), salary Int) STORED BY 'carbondata' """) @@ -47,71 +59,127 @@ object DataUpdateDeleteExample { LOAD DATA LOCAL INPATH '$testData' INTO TABLE t3 """) + // Specify date format based on raw data + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + + // Simulate data and write to table t5 + var fields = Seq[StructField]() + fields = fields :+ DataTypes.createStructField("t5_id", DataTypes.IntegerType, false) + fields = fields :+ DataTypes.createStructField("t5_date", DataTypes.DateType, false) + fields = fields :+ DataTypes.createStructField("t5_country", DataTypes.StringType, false) + fields = fields :+ DataTypes.createStructField("t5_name", DataTypes.StringType, false) + fields = fields :+ DataTypes.createStructField("t5_phonetype", DataTypes.StringType, false) + fields = fields :+ DataTypes.createStructField("t5_serialname", DataTypes.StringType, false) + fields = fields :+ DataTypes.createStructField("t5_salary", DataTypes.IntegerType, false) + var schema = StructType(fields) + var sdf = new SimpleDateFormat("yyyy-MM-dd") + var data = cc.sparkContext.parallelize(1 to 10).map { x => + val day = x % 20 + 1 + var dateStr = "" + if (day >= 10) { + dateStr = "2017-07-" + day + } else { + dateStr = "2017-07-0" + day + } + val dt = new java.sql.Date(sdf.parse(dateStr).getTime); + var row = Seq[Any]() + row = row :+ x + row = row :+ dt + row = row :+ "china" + row = row :+ "bbb" + x + row = row :+ "phone" + 100 * x + row = row :+ "ASD" + (1000 * x - x) + row = row :+ (25000 + x) + Row.fromSeq(row) + } + var df = cc.createDataFrame(data, schema) + df.write + .format("carbondata") + .option("tableName", "t5") + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + cc.sql(""" + SELECT * FROM t5 ORDER BY t5_id + """).show() + // 1.Update data with simple SET cc.sql(""" - SELECT * FROM t3 ORDER BY ID + SELECT * FROM t3 ORDER BY t3.id """).show() // Update data where salary < 15003 + val dateStr = "2018-08-08" + cc.sql(s""" + UPDATE t3 SET (t3.date, t3.country) = ('$dateStr', 'india') WHERE t3.salary < 15003 + """).show() + // Query data again after the above update cc.sql(""" - UPDATE t3 SET (t3.country) = ('india') WHERE t3.salary < 15003 + SELECT * FROM t3 ORDER BY t3.id """).show() + cc.sql(""" UPDATE t3 SET (t3.salary) = (t3.salary + 9) WHERE t3.name = 'aaa1' """).show() - // Query data again after the above update cc.sql(""" - SELECT * FROM t3 ORDER BY ID + SELECT * FROM t3 ORDER BY t3.id """).show() // 2.Update data with subquery result SET cc.sql(""" - CREATE TABLE IF NOT EXISTS update_table - (ID Int, country String, - name String, phonetype String, serialname char(10), salary Int) - STORED BY 'carbondata' - """) - - cc.sql(s""" - LOAD DATA LOCAL INPATH '$testData1' INTO TABLE update_table - """) - + UPDATE t3 + SET (t3.country, t3.name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5) + WHERE t3.id < 5""").show() cc.sql(""" UPDATE t3 - SET (t3.country, t3.name) = (SELECT u.country, u.name FROM update_table u WHERE u.id = 5) + SET (t3.date, t3.serialname, t3.salary) = + (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5) WHERE t3.id < 5""").show() // Query data again after the above update cc.sql(""" - SELECT * FROM t3 ORDER BY ID + SELECT * FROM t3 ORDER BY t3.id """).show() // 3.Update data with join query result SET cc.sql(""" UPDATE t3 SET (t3.country, t3.salary) = - (SELECT u.country, f.salary FROM update_table u FULL JOIN update_table f - WHERE u.id = 8 and f.id=6) WHERE t3.id >6""").show() + (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u + WHERE u.id = t5_id and t5_id=6) WHERE t3.id >6""").show() // Query data again after the above update cc.sql(""" - SELECT * FROM t3 ORDER BY ID + SELECT * FROM t3 ORDER BY t3.id """).show() // 4.Delete data where salary > 15005 cc.sql(""" - DELETE FROM t3 WHERE salary > 15005 + DELETE FROM t3 WHERE t3.salary > 15005 + """).show() + + // Query data again after delete data + cc.sql(""" + SELECT * FROM t3 ORDER BY t3.id + """).show() + + // 5.Delete data WHERE id in (1, 2, $key) + var key = 3 + cc.sql(s""" + DELETE FROM t3 WHERE t3.id in (1, 2, $key) """).show() // Query data again after delete data cc.sql(""" - SELECT * FROM t3 ORDER BY ID + SELECT * FROM t3 ORDER BY t3.id """).show() // Drop table cc.sql("DROP TABLE IF EXISTS t3") - cc.sql("DROP TABLE IF EXISTS update_table") + cc.sql("DROP TABLE IF EXISTS t5") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2e2ba08/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala new file mode 100644 index 0000000..60b2664 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.text.SimpleDateFormat + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +object DataUpdateDeleteExample { + + def main(args: Array[String]) { + + // for local files + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + // for hdfs files + // var rootPath = "hdfs://hdfs-host/carbon" + + var storeLocation = s"$rootPath/examples/spark2/target/store" + var warehouse = s"$rootPath/examples/spark2/target/warehouse" + var metastoredb = s"$rootPath/examples/spark2/target" + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local") + .appName("DataUpdateDeleteExample") + .config("spark.sql.warehouse.dir", warehouse) + .config("spark.driver.host", "localhost") + .config("spark.sql.crossJoin.enabled", "true") + .getOrCreateCarbonSession(storeLocation, metastoredb) + spark.sparkContext.setLogLevel("WARN") + + // Specify date format based on raw data + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + + import spark.implicits._ + // Drop table + spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS t5") + + // Simulate data and write to table t3 + var sdf = new SimpleDateFormat("yyyy-MM-dd") + var df = spark.sparkContext.parallelize(1 to 10) + .map(x => (x, new java.sql.Date(sdf.parse("2015-07-" + (x % 10 + 10)).getTime), + "china", "aaa" + x, "phone" + 555 * x, "ASD" + (60000 + x), 14999 + x)) + .toDF("t3_id", "t3_date", "t3_country", "t3_name", + "t3_phonetype", "t3_serialname", "t3_salary") + df.write + .format("carbondata") + .option("tableName", "t3") + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + + // Simulate data and write to table t5 + df = spark.sparkContext.parallelize(1 to 10) + .map(x => (x, new java.sql.Date(sdf.parse("2017-07-" + (x % 20 + 1)).getTime), + "usa", "bbb" + x, "phone" + 100 * x, "ASD" + (1000 * x - x), 25000 + x)) + .toDF("t5_id", "t5_date", "t5_country", "t5_name", + "t5_phonetype", "t5_serialname", "t5_salary") + df.write + .format("carbondata") + .option("tableName", "t5") + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + spark.sql(""" + SELECT * FROM t5 ORDER BY t5_id + """).show() + + // 1.Update data with simple SET + // Update data where salary < 15003 + val dateStr = "2018-08-08" + spark.sql(s""" + UPDATE t3 SET (t3_date, t3_country) = ('$dateStr', 'india') WHERE t3_salary < 15003 + """).show() + // Query data again after the above update + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + + spark.sql(""" + UPDATE t3 SET (t3_salary) = (t3_salary + 9) WHERE t3_name = 'aaa1' + """).show() + // Query data again after the above update + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + + // 2.Update data with subquery result SET + spark.sql(""" + UPDATE t3 + SET (t3_country, t3_name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5) + WHERE t3_id < 5""").show() + spark.sql(""" + UPDATE t3 + SET (t3_date, t3_serialname, t3_salary) = + (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5) + WHERE t3_id < 5""").show() + + // Query data again after the above update + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + + // 3.Update data with join query result SET + spark.sql(""" + UPDATE t3 + SET (t3_country, t3_salary) = + (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u + WHERE u.t3_id = t5_id and t5_id=6) WHERE t3_id >6""").show() + + // Query data again after the above update + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + + // 4.Delete data where salary > 15005 + spark.sql(""" + DELETE FROM t3 WHERE t3_salary > 15005 + """).show() + + // Query data again after delete data + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + + // 5.Delete data WHERE id in (1, 2, $key) + var key = 3 + spark.sql(s""" + DELETE FROM t3 WHERE t3_id in (1, 2, $key) + """).show() + + // Query data again after delete data + spark.sql(""" + SELECT * FROM t3 ORDER BY t3_id + """).show() + + // Drop table + spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS t5") + + spark.stop() + } + +}
