[CARBONDATA-2259] Add auto CI for examples The examples are guiding new users how to use CarbonData, but usually some examples can't work due to some code changed. 1.Add auto CI for examples code. 2.Merge DataFrameAPIExample.scala, CarbonDataFrameExample.scala into one.
This closes #2071 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/26976a81 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/26976a81 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/26976a81 Branch: refs/heads/master Commit: 26976a816e278697b1f059f55dca9a95a55d62da Parents: ee9df2e Author: chenliang613 <[email protected]> Authored: Sat Mar 17 16:52:01 2018 +0800 Committer: ravipesala <[email protected]> Committed: Mon Mar 26 21:51:34 2018 +0530 ---------------------------------------------------------------------- examples/spark2/pom.xml | 102 +++++++++++++++- .../examples/AllDictionaryExample.scala | 29 +++-- .../carbondata/examples/AllDictionaryUtil.scala | 109 ----------------- .../carbondata/examples/AlluxioExample.scala | 19 ++- .../carbondata/examples/AlterTableExample.scala | 59 ++++------ .../examples/CarbonDataFrameExample.scala | 29 +---- .../examples/CarbonPartitionExample.scala | 91 +++++++-------- .../examples/CarbonSessionExample.scala | 43 +++---- .../examples/CarbonSortColumnsExample.scala | 31 +++-- .../examples/CaseClassDataFrameAPIExample.scala | 14 ++- .../carbondata/examples/ConcurrencyTest.scala | 4 +- .../examples/DataFrameAPIExample.scala | 49 -------- .../examples/DataFrameComplexTypeExample.scala | 25 ++-- .../examples/DataManagementExample.scala | 55 +++++---- .../examples/DataUpdateDeleteExample.scala | 107 ++++++++--------- .../carbondata/examples/ExampleUtils.scala | 112 ------------------ .../carbondata/examples/HadoopFileExample.scala | 32 ++++- .../examples/PreAggregateDataMapExample.scala | 29 +++-- .../examples/QuerySegmentExample.scala | 64 +++++----- .../examples/SparkSessionExample.scala | 77 +++++++----- .../examples/SparkStreamingExample.scala | 1 + .../examples/StandardPartitionExample.scala | 22 ++-- .../StreamingUsingBatchLoadExample.scala | 2 + .../StreamingWithRowParserExample.scala | 1 + .../examples/StructuredStreamingExample.scala | 1 + .../TableLevelCompactionOptionExample.scala | 50 +++++--- .../TimeSeriesPreAggregateTableExample.scala | 31 ++--- .../examples/util/AllDictionaryUtil.scala | 110 ++++++++++++++++++ .../carbondata/examples/util/ExampleUtils.scala | 116 +++++++++++++++++++ .../carbondata/examplesCI/RunExamples.scala | 108 +++++++++++++++++ 30 files changed, 875 insertions(+), 647 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml index f64dc9f..e562fbf 100644 --- a/examples/spark2/pom.xml +++ b/examples/spark2/pom.xml @@ -67,20 +67,60 @@ <artifactId>httpclient</artifactId> <version>4.2</version> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> - <sourceDirectory>src/main/scala</sourceDirectory> + <testSourceDirectory>src/test/scala</testSourceDirectory> <resources> <resource> + <directory>src/resources</directory> + </resource> + <resource> <directory>.</directory> <includes> - <include>CARBON_EXAMPLESLogResource.properties</include> + <include>CARBON_SPARK_INTERFACELogResource.properties</include> </includes> </resource> </resources> <plugins> <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + <!-- Note config is repeated in scalatest config --> + <configuration> + <skip>false</skip> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + <testFailureIgnore>false</testFailureIgnore> + <failIfNoTests>false</failIfNoTests> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> @@ -93,6 +133,13 @@ <phase>compile</phase> </execution> <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> <phase>process-resources</phase> <goals> <goal>compile</goal> @@ -101,13 +148,56 @@ </executions> </plugin> <plugin> - <artifactId>maven-compiler-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.4.1</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>com.ning.maven.plugins</groupId> + <artifactId>maven-duplicate-finder-plugin</artifactId> <configuration> - <source>1.7</source> - <target>1.7</target> + <skip>true</skip> </configuration> </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <!-- Note config is repeated in surefire config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <testFailureIgnore>false</testFailureIgnore> + <filereports>CarbonTestSuite.txt</filereports> + <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <stderr /> + <environmentVariables> + </environmentVariables> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> - + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala index 2f337f4..808e319 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala @@ -17,17 +17,27 @@ package org.apache.carbondata.examples +import org.apache.spark.sql.SparkSession + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.{AllDictionaryUtil, ExampleUtils} + object AllDictionaryExample { def main(args: Array[String]) { val spark = ExampleUtils.createCarbonSession("AllDictionaryExample") + exampleBody(spark) + spark.close() + } + + def exampleBody(spark : SparkSession): Unit = { val testData = ExampleUtils.currentPath + "/src/main/resources/dataSample.csv" val csvHeader = "ID,date,country,name,phonetype,serialname,salary" val dictCol = "|date|country|name|phonetype|serialname|" - val allDictFile = ExampleUtils.currentPath + "/src/main/resources/data.dictionary" + val allDictFile = ExampleUtils.currentPath + "/target/data.dictionary" + // extract all dictionary files from source data AllDictionaryUtil.extractDictionary(spark.sparkContext, testData, allDictFile, csvHeader, dictCol) @@ -35,11 +45,11 @@ object AllDictionaryExample { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS dictionary_table") spark.sql( s""" - | CREATE TABLE IF NOT EXISTS t3( + | CREATE TABLE IF NOT EXISTS dictionary_table( | ID Int, | date Date, | country String, @@ -52,22 +62,25 @@ object AllDictionaryExample { """.stripMargin) spark.sql(s""" - LOAD DATA LOCAL INPATH '$testData' into table t3 + LOAD DATA LOCAL INPATH '$testData' into table dictionary_table options('ALL_DICTIONARY_PATH'='$allDictFile', 'SINGLE_PASS'='true') """) spark.sql(""" - SELECT * FROM t3 + SELECT * FROM dictionary_table """).show() spark.sql(""" - SELECT * FROM t3 where floatField=3.5 + SELECT * FROM dictionary_table where floatField=3.5 """).show() - spark.sql("DROP TABLE IF EXISTS t3") + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + + spark.sql("DROP TABLE IF EXISTS dictionary_table") // clean local dictionary files AllDictionaryUtil.cleanDictionary(allDictFile) } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala deleted file mode 100644 index 20f7bba..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.DataOutputStream - -import scala.collection.mutable.{ArrayBuffer, HashSet} - -import org.apache.spark.SparkContext - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datastore.impl.FileFactory - -object AllDictionaryUtil { - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def extractDictionary(sc: SparkContext, - srcData: String, - outputPath: String, - fileHeader: String, - dictCol: String): Unit = { - val fileHeaderArr = fileHeader.split(",") - val isDictCol = new Array[Boolean](fileHeaderArr.length) - for (i <- 0 until fileHeaderArr.length) { - if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) { - isDictCol(i) = true - } else { - isDictCol(i) = false - } - } - val dictionaryRdd = sc.textFile(srcData).flatMap(x => { - val tokens = x.split(",") - val result = new ArrayBuffer[(Int, String)]() - for (i <- 0 until isDictCol.length) { - if (isDictCol(i)) { - try { - result += ((i, tokens(i))) - } catch { - case _: ArrayIndexOutOfBoundsException => - LOGGER.error("Read a bad record: " + x) - } - } - } - result - }).groupByKey().flatMap(x => { - val distinctValues = new HashSet[(Int, String)]() - for (value <- x._2) { - distinctValues.add(x._1, value) - } - distinctValues - }) - val dictionaryValues = dictionaryRdd.map(x => x._1 + "," + x._2).collect() - saveToFile(dictionaryValues, outputPath) - } - - def cleanDictionary(outputPath: String): Unit = { - try { - val fileType = FileFactory.getFileType(outputPath) - val file = FileFactory.getCarbonFile(outputPath, fileType) - if (file.exists()) { - file.delete() - } - } catch { - case ex: Exception => - LOGGER.error("Clean dictionary catching exception:" + ex) - } - } - - def saveToFile(contents: Array[String], outputPath: String): Unit = { - var writer: DataOutputStream = null - try { - val fileType = FileFactory.getFileType(outputPath) - val file = FileFactory.getCarbonFile(outputPath, fileType) - if (!file.exists()) { - file.createNewFile() - } - writer = FileFactory.getDataOutputStream(outputPath, fileType) - for (content <- contents) { - writer.writeBytes(content + "\n") - } - } catch { - case ex: Exception => - LOGGER.error("Save dictionary to file catching exception:" + ex) - } finally { - if (writer != null) { - try { - writer.close() - } catch { - case ex: Exception => - LOGGER.error("Close output stream catching exception:" + ex) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala index 6c183a5..31110ce 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala @@ -17,9 +17,13 @@ package org.apache.carbondata.examples +import org.apache.spark.sql.SparkSession + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils + /** * configure alluxio: @@ -32,6 +36,11 @@ import org.apache.carbondata.core.util.CarbonProperties object AlluxioExample { def main(args: Array[String]) { val spark = ExampleUtils.createCarbonSession("AlluxioExample") + exampleBody(spark) + spark.close() + } + + def exampleBody(spark : SparkSession): Unit = { spark.sparkContext.hadoopConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem") FileFactory.getConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem") @@ -39,26 +48,26 @@ object AlluxioExample { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS alluxio_table") spark.sql(""" - CREATE TABLE IF NOT EXISTS t3 + CREATE TABLE IF NOT EXISTS alluxio_table (ID Int, date Date, country String, name String, phonetype String, serialname String, salary Int) STORED BY 'carbondata' """) spark.sql(s""" - LOAD DATA LOCAL INPATH 'alluxio://localhost:19998/data.csv' into table t3 + LOAD DATA LOCAL INPATH 'alluxio://localhost:19998/data.csv' into table alluxio_table """) spark.sql(""" SELECT country, count(salary) AS amount - FROM t3 + FROM alluxio_table WHERE country IN ('china','france') GROUP BY country """).show() - spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS alluxio_table") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlterTableExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlterTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlterTableExample.scala index 6fffd30..edb806a 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlterTableExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlterTableExample.scala @@ -17,12 +17,10 @@ package org.apache.carbondata.examples -import java.io.File - import org.apache.spark.sql.SparkSession -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils + /** * For alter table relative syntax, you can refer to DDL operation @@ -32,32 +30,18 @@ object AlterTableExample { def main(args: Array[String]): Unit = { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") - - import org.apache.spark.sql.CarbonSession._ - - val spark = SparkSession - .builder() - .master("local") - .appName("AlterTableExample") - .config("spark.sql.warehouse.dir", warehouse) - .getOrCreateCarbonSession(storeLocation) - - spark.sparkContext.setLogLevel("WARN") + val spark = ExampleUtils.createCarbonSession("AlterTableExample") + exampleBody(spark) + spark.close() + } - spark.sql("DROP TABLE IF EXISTS carbon_table") - spark.sql("DROP TABLE IF EXISTS new_carbon_table") + def exampleBody(spark : SparkSession): Unit = { + spark.sql("DROP TABLE IF EXISTS alter_table") + spark.sql("DROP TABLE IF EXISTS new_alter_table") spark.sql( s""" - | CREATE TABLE carbon_table( + | CREATE TABLE alter_table( | shortField SHORT, | intField INT, | bigintField LONG, @@ -75,28 +59,25 @@ object AlterTableExample { """.stripMargin) // Alter table change data type - spark.sql("DESCRIBE FORMATTED carbon_table").show() - spark.sql("ALTER TABLE carbon_table CHANGE intField intField BIGINT").show() + spark.sql("DESCRIBE FORMATTED alter_table").show() + spark.sql("ALTER TABLE alter_table CHANGE intField intField BIGINT").show() // Alter table add columns - spark.sql("DESCRIBE FORMATTED carbon_table").show() - spark.sql("ALTER TABLE carbon_table ADD COLUMNS (newField STRING) " + + spark.sql("DESCRIBE FORMATTED alter_table").show() + spark.sql("ALTER TABLE alter_table ADD COLUMNS (newField STRING) " + "TBLPROPERTIES ('DEFAULT.VALUE.newField'='def')").show() // Alter table drop columns - spark.sql("DESCRIBE FORMATTED carbon_table").show() - spark.sql("ALTER TABLE carbon_table DROP COLUMNS (newField)").show() - spark.sql("DESCRIBE FORMATTED carbon_table").show() + spark.sql("DESCRIBE FORMATTED alter_table").show() + spark.sql("ALTER TABLE alter_table DROP COLUMNS (newField)").show() + spark.sql("DESCRIBE FORMATTED alter_table").show() // Alter table rename table name spark.sql("SHOW TABLES").show() - spark.sql("ALTER TABLE carbon_table RENAME TO new_carbon_table").show() + spark.sql("ALTER TABLE alter_table RENAME TO new_alter_table").show() spark.sql("SHOW TABLES").show() - spark.sql("DROP TABLE IF EXISTS carbon_table") - spark.sql("DROP TABLE IF EXISTS new_carbon_table") - - spark.stop() - + spark.sql("DROP TABLE IF EXISTS alter_table") + spark.sql("DROP TABLE IF EXISTS new_alter_table") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonDataFrameExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonDataFrameExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonDataFrameExample.scala index c8f8023..65c3b78 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonDataFrameExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonDataFrameExample.scala @@ -17,34 +17,19 @@ package org.apache.carbondata.examples -import java.io.File - import org.apache.spark.sql.{SaveMode, SparkSession} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils object CarbonDataFrameExample { def main(args: Array[String]) { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") - - import org.apache.spark.sql.CarbonSession._ - val spark = SparkSession - .builder() - .master("local") - .appName("CarbonDataFrameExample") - .config("spark.sql.warehouse.dir", warehouse) - .getOrCreateCarbonSession(storeLocation) - - spark.sparkContext.setLogLevel("ERROR") + val spark = ExampleUtils.createCarbonSession("CarbonDataFrameExample") + exampleBody(spark) + spark.close() + } + def exampleBody(spark : SparkSession): Unit = { // Writes Dataframe to CarbonData file: import spark.implicits._ val df = spark.sparkContext.parallelize(1 to 100) @@ -83,7 +68,5 @@ object CarbonDataFrameExample { carbondf.filter($"number" > 31).show() spark.sql("DROP TABLE IF EXISTS carbon_df_table") - - spark.stop() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala index 2391dbe..1c4b1bc 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala @@ -19,37 +19,33 @@ package org.apache.carbondata.examples import java.io.File -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils import org.apache.carbondata.spark.exception.ProcessMetaDataException + + object CarbonPartitionExample { def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("CarbonPartitionExample") + exampleBody(spark) + spark.close() + } + + def exampleBody(spark : SparkSession): Unit = { val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" val testData = s"$rootPath/integration/spark-common-test/src/test/resources/partition_data.csv" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - import org.apache.spark.sql.CarbonSession._ - - val spark = SparkSession - .builder() - .master("local") - .appName("CarbonPartitionExample") - .config("spark.sql.warehouse.dir", warehouse) - .getOrCreateCarbonSession(storeLocation) - - spark.sparkContext.setLogLevel("WARN") // range partition with bucket defined spark.sql("DROP TABLE IF EXISTS t0") @@ -66,9 +62,7 @@ object CarbonPartitionExample { | PARTITIONED BY (logdate Timestamp) | STORED BY 'carbondata' | TBLPROPERTIES('PARTITION_TYPE'='RANGE', - | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01', - | 'BUCKETNUMBER'='3', - | 'BUCKETCOLUMNS'='vin') + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01') """.stripMargin) // none partition table @@ -119,25 +113,25 @@ object CarbonPartitionExample { | PARTITIONED BY (vin String) | STORED BY 'carbondata' | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5') - """.stripMargin) + """.stripMargin) // list partition spark.sql("DROP TABLE IF EXISTS t5") spark.sql(""" - | CREATE TABLE IF NOT EXISTS t5 - | ( - | id Int, - | vin String, - | logdate Timestamp, - | phonenumber Long, - | area String, - | salary Int - |) - | PARTITIONED BY (country String) - | STORED BY 'carbondata' - | TBLPROPERTIES('PARTITION_TYPE'='LIST', - | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ') - """.stripMargin) + | CREATE TABLE IF NOT EXISTS t5 + | ( + | id Int, + | vin String, + | logdate Timestamp, + | phonenumber Long, + | area String, + | salary Int + |) + | PARTITIONED BY (country String) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ') + """.stripMargin) // load data into partition table spark.sql(s""" @@ -163,9 +157,9 @@ object CarbonPartitionExample { // hive partition table spark.sql("DROP TABLE IF EXISTS t7") spark.sql(""" - | create table t7(id int, name string) partitioned by (city string) - | row format delimited fields terminated by ',' - """.stripMargin) + | create table t7(id int, name string) partitioned by (city string) + | row format delimited fields terminated by ',' + """.stripMargin) spark.sql("alter table t7 add partition (city = 'Hangzhou')") // not default db partition table @@ -177,16 +171,16 @@ object CarbonPartitionExample { spark.sql(s"DROP DATABASE IF EXISTS partitionDB") spark.sql(s"CREATE DATABASE partitionDB") spark.sql(s""" - | CREATE TABLE IF NOT EXISTS partitionDB.t9( - | id Int, - | logdate Timestamp, - | phonenumber Int, - | country String, - | area String - | ) - | PARTITIONED BY (vin String) - | STORED BY 'carbondata' - | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5') + | CREATE TABLE IF NOT EXISTS partitionDB.t9( + | id Int, + | logdate Timestamp, + | phonenumber Int, + | country String, + | area String + | ) + | PARTITIONED BY (vin String) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5') """.stripMargin) // show tables @@ -204,17 +198,18 @@ object CarbonPartitionExample { spark.sql("""SHOW PARTITIONS t7""").show(100, false) spark.sql("""SHOW PARTITIONS partitionDB.t9""").show(100, false) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + // drop table spark.sql("DROP TABLE IF EXISTS t0") spark.sql("DROP TABLE IF EXISTS t1") + spark.sql("DROP TABLE IF EXISTS t2") spark.sql("DROP TABLE IF EXISTS t3") spark.sql("DROP TABLE IF EXISTS t5") spark.sql("DROP TABLE IF EXISTS t7") spark.sql("DROP TABLE IF EXISTS partitionDB.t9") spark.sql(s"DROP DATABASE IF EXISTS partitionDB") - - spark.close() - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala index d97a9f0..d254152 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala @@ -20,13 +20,22 @@ package org.apache.carbondata.examples import java.io.File import org.apache.log4j.PropertyConfigurator +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils object CarbonSessionExample { def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("CarbonSessionExample") + spark.sparkContext.setLogLevel("INFO") + exampleBody(spark) + spark.close() + } + + def exampleBody(spark : SparkSession): Unit = { val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath @@ -38,15 +47,12 @@ object CarbonSessionExample { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") - val spark = ExampleUtils.createCarbonSession("CarbonSessionExample") - spark.sparkContext.setLogLevel("INFO") - - spark.sql("DROP TABLE IF EXISTS carbon_table") + spark.sql("DROP TABLE IF EXISTS carbonsession_table") // Create table spark.sql( s""" - | CREATE TABLE carbon_table( + | CREATE TABLE carbonsession_table( | shortField SHORT, | intField INT, | bigintField LONG, @@ -68,7 +74,7 @@ object CarbonSessionExample { spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE carbonsession_table | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') """.stripMargin) // scalastyle:on @@ -76,61 +82,58 @@ object CarbonSessionExample { spark.sql( s""" | SELECT charField, stringField, intField - | FROM carbon_table + | FROM carbonsession_table | WHERE stringfield = 'spark' AND decimalField > 40 """.stripMargin).show() spark.sql( s""" | SELECT * - | FROM carbon_table WHERE length(stringField) = 5 + | FROM carbonsession_table WHERE length(stringField) = 5 """.stripMargin).show() spark.sql( s""" | SELECT * - | FROM carbon_table WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23" + | FROM carbonsession_table WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23" """.stripMargin).show() - spark.sql("SELECT count(stringField) FROM carbon_table").show() + spark.sql("SELECT count(stringField) FROM carbonsession_table").show() spark.sql( s""" | SELECT sum(intField), stringField - | FROM carbon_table + | FROM carbonsession_table | GROUP BY stringField """.stripMargin).show() spark.sql( s""" | SELECT t1.*, t2.* - | FROM carbon_table t1, carbon_table t2 + | FROM carbonsession_table t1, carbonsession_table t2 | WHERE t1.stringField = t2.stringField """.stripMargin).show() spark.sql( s""" | WITH t1 AS ( - | SELECT * FROM carbon_table + | SELECT * FROM carbonsession_table | UNION ALL - | SELECT * FROM carbon_table + | SELECT * FROM carbonsession_table | ) | SELECT t1.*, t2.* - | FROM t1, carbon_table t2 + | FROM t1, carbonsession_table t2 | WHERE t1.stringField = t2.stringField """.stripMargin).show() spark.sql( s""" | SELECT * - | FROM carbon_table + | FROM carbonsession_table | WHERE stringField = 'spark' and floatField > 2.8 """.stripMargin).show() // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") - - spark.stop() + spark.sql("DROP TABLE IF EXISTS carbonsession_table") } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala index 8d0eabf..adcce68 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala @@ -23,30 +23,23 @@ import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils object CarbonSortColumnsExample { def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("CarbonSessionExample") + exampleBody(spark) + spark.close() + } + + def exampleBody(spark : SparkSession): Unit = { val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" - CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - import org.apache.spark.sql.CarbonSession._ - val spark = SparkSession - .builder() - .master("local") - .appName("CarbonSortColumnsExample") - .config("spark.sql.warehouse.dir", warehouse) - .config("spark.driver.host", "localhost") - .getOrCreateCarbonSession(storeLocation) - - spark.sparkContext.setLogLevel("ERROR") - spark.sql("DROP TABLE IF EXISTS no_sort_columns_table") // Create table with no sort columns @@ -115,11 +108,15 @@ object CarbonSortColumnsExample { spark.sql( s"""SELECT * FROM sort_columns_table""".stripMargin).show() + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + // Drop table spark.sql("DROP TABLE IF EXISTS no_sort_columns_table") spark.sql("DROP TABLE IF EXISTS sort_columns_table") - - spark.stop() } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CaseClassDataFrameAPIExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CaseClassDataFrameAPIExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CaseClassDataFrameAPIExample.scala index c926817..8bf391f 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CaseClassDataFrameAPIExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CaseClassDataFrameAPIExample.scala @@ -18,7 +18,9 @@ package org.apache.carbondata.examples import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +import org.apache.carbondata.examples.util.ExampleUtils case class People(name: String, occupation: String, id: Int) @@ -26,7 +28,11 @@ object CaseClassDataFrameAPIExample { def main(args: Array[String]) { val spark = ExampleUtils.createCarbonSession("CaseClassDataFrameAPIExample") + exampleBody(spark) + spark.close() + } + def exampleBody(spark : SparkSession): Unit = { val people = List(People("sangeeta", "engineer", 1), People("pallavi", "consultant", 2)) val peopleRDD: RDD[People] = spark.sparkContext.parallelize(people) import spark.implicits._ @@ -35,13 +41,13 @@ object CaseClassDataFrameAPIExample { // writing data to carbon table peopleDF.write .format("carbondata") - .option("tableName", "carbon2") + .option("tableName", "caseclass_table") .option("compress", "true") .mode(SaveMode.Overwrite) .save() - spark.sql("SELECT * FROM carbon2").show() + spark.sql("SELECT * FROM caseclass_table").show() - spark.sql("DROP TABLE IF EXISTS carbon2") + spark.sql("DROP TABLE IF EXISTS caseclass_table") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala index f82fa39..09921cb 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala @@ -223,7 +223,7 @@ object ConcurrencyTest { val sqlText = query.sqlText.replace("$table", tableName) val executorService = Executors.newFixedThreadPool(ThreadNum) - val tasks = new util.ArrayList[Callable[Results]]() + val tasks = new java.util.ArrayList[Callable[Results]]() for (num <- 1 to TaskNum) { tasks.add(new QueryTask(spark, sqlText)) @@ -240,7 +240,7 @@ object ConcurrencyTest { } } - def printResult(results: util.List[Future[Results]], sql: String = "") { + def printResult(results: java.util.List[Future[Results]], sql: String = "") { val timeArray = new Array[Double](results.size()) val sqlResult = results.get(0).get().sqlResult for (i <- 0 until results.size()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala deleted file mode 100644 index 7a7e74a..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -// scalastyle:off println -object DataFrameAPIExample { - - def main(args: Array[String]) { - val spark = ExampleUtils.createCarbonSession("DataFrameAPIExample") - ExampleUtils.writeSampleCarbonFile(spark, "carbon1", 1000) - - import spark.implicits._ - - // use datasource api to read - val in = spark.read - .format("carbondata") - .option("tableName", "carbon1") - .load() - var count = in.where($"c3" > 500).select($"*").count() - println(s"count after 1 load: $count") - - // append new data, query answer should be 1000 - ExampleUtils.appendSampleCarbonFile(spark, "carbon1") - count = in.where($"c3" > 500).select($"*").count() - println(s"count after 2 load: $count") - - // use SQL to read - spark.sql("SELECT c1, count(c3) FROM carbon1 where c3 > 500 group by c1 limit 10").show - - // delete carbondata file - ExampleUtils.cleanSampleCarbonFile(spark, "carbon1") - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala index b5ff49b..34b32f4 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala @@ -17,7 +17,9 @@ package org.apache.carbondata.examples -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.examples.util.ExampleUtils case class StructElement(school: Array[String], age: Int) case class ComplexTypeData(id: Int, name: String, city: String, salary: Float, file: StructElement) @@ -28,6 +30,11 @@ object DataFrameComplexTypeExample { def main(args: Array[String]) { val spark = ExampleUtils.createCarbonSession("DataFrameComplexTypeExample", 4) + exampleBody(spark) + spark.close() + } + + def exampleBody(spark : SparkSession): Unit = { val complexTableName = s"complex_type_table" import spark.implicits._ @@ -52,13 +59,13 @@ object DataFrameComplexTypeExample { val sc = spark.sparkContext // generate data val df = sc.parallelize(Seq( - ComplexTypeData(1, "index_1", "city_1", 10000.0f, - StructElement(Array("struct_11", "struct_12"), 10)), - ComplexTypeData(2, "index_2", "city_2", 20000.0f, - StructElement(Array("struct_21", "struct_22"), 20)), - ComplexTypeData(3, "index_3", "city_3", 30000.0f, - StructElement(Array("struct_31", "struct_32"), 30)) - )).toDF + ComplexTypeData(1, "index_1", "city_1", 10000.0f, + StructElement(Array("struct_11", "struct_12"), 10)), + ComplexTypeData(2, "index_2", "city_2", 20000.0f, + StructElement(Array("struct_21", "struct_22"), 20)), + ComplexTypeData(3, "index_3", "city_3", 30000.0f, + StructElement(Array("struct_31", "struct_32"), 30)) + )).toDF df.printSchema() df.write .format("carbondata") @@ -83,8 +90,6 @@ object DataFrameComplexTypeExample { // drop table spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }") - - spark.stop() } } // scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala index 1447a1a..fde66e1 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala @@ -19,18 +19,26 @@ package org.apache.carbondata.examples import java.io.File +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.examples.util.ExampleUtils + + object DataManagementExample { def main(args: Array[String]) { val spark = ExampleUtils.createCarbonSession("DataManagementExample") - spark.sparkContext.setLogLevel("WARN") + exampleBody(spark) + spark.close() + } - spark.sql("DROP TABLE IF EXISTS carbon_table") + def exampleBody(spark : SparkSession): Unit = { + spark.sql("DROP TABLE IF EXISTS datamanagement_table") // Create table spark.sql( s""" - | CREATE TABLE IF NOT EXISTS carbon_table( + | CREATE TABLE IF NOT EXISTS datamanagement_table( | ID Int, | date Date, | country String, @@ -43,7 +51,7 @@ object DataManagementExample { """.stripMargin) val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath + + "../../../..").getCanonicalPath val path = s"$rootPath/examples/spark2/src/main/resources/dataSample.csv" // load data 5 times, each load of data is called a segment in CarbonData @@ -51,56 +59,55 @@ object DataManagementExample { (1 to 5).foreach(_ => spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE datamanagement_table | OPTIONS('HEADER'='true') """.stripMargin)) // scalastyle:on // show all segments, there will be 5 segments - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table").show() // 50 rows loaded - spark.sql("SELECT count(*) FROM carbon_table").show() + spark.sql("SELECT count(*) FROM datamanagement_table").show() // delete the first segment - spark.sql("DELETE FROM TABLE carbon_table WHERE SEGMENT.ID IN (0)") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("DELETE FROM TABLE datamanagement_table WHERE SEGMENT.ID IN (0)") + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table").show() // this query will be executed on last 4 segments, it should return 40 rows - spark.sql("SELECT count(*) FROM carbon_table").show() + spark.sql("SELECT count(*) FROM datamanagement_table").show() // force a major compaction to compact all segments into one - spark.sql("ALTER TABLE carbon_table COMPACT 'MAJOR'") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("ALTER TABLE datamanagement_table COMPACT 'MAJOR'") + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table").show() // load again, add another 10 rows spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE datamanagement_table | OPTIONS('HEADER'='true') """.stripMargin) - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table").show() // this query will be executed on 2 segments, it should return 50 rows - spark.sql("SELECT count(*) FROM carbon_table").show() + spark.sql("SELECT count(*) FROM datamanagement_table").show() // delete all segments whose loading time is before '2099-01-01 01:00:00' - spark.sql("DELETE FROM TABLE carbon_table WHERE SEGMENT.STARTTIME BEFORE '2099-01-01 01:00:00'") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table ").show() + spark.sql("DELETE FROM TABLE datamanagement_table " + + "WHERE SEGMENT.STARTTIME BEFORE '2099-01-01 01:00:00'") + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table ").show() // this query will be executed on 0 segments, it should return 0 rows - spark.sql("SELECT count(*) FROM carbon_table").show() + spark.sql("SELECT count(*) FROM datamanagement_table").show() // force clean up all 'MARKED_FOR_DELETE' and 'COMPACTED' segments immediately - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() - spark.sql("CLEAN FILES FOR TABLE carbon_table") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table").show() + spark.sql("CLEAN FILES FOR TABLE datamanagement_table") + spark.sql("SHOW SEGMENTS FOR TABLE datamanagement_table").show() // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") - - spark.stop() + spark.sql("DROP TABLE IF EXISTS datamanagement_table") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala index 2646b8a..8af1d3e 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala @@ -25,148 +25,141 @@ import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils object DataUpdateDeleteExample { def main(args: Array[String]) { - // for local files - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - // for hdfs files - // var rootPath = "hdfs://hdfs-host/carbon" - - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" - - import org.apache.spark.sql.CarbonSession._ - val spark = SparkSession - .builder() - .master("local") - .appName("DataUpdateDeleteExample") - .config("spark.sql.warehouse.dir", warehouse) - .config("spark.driver.host", "localhost") - .config("spark.sql.crossJoin.enabled", "true") - .getOrCreateCarbonSession(storeLocation) - spark.sparkContext.setLogLevel("WARN") + val spark = ExampleUtils.createCarbonSession("DataUpdateDeleteExample") + exampleBody(spark) + spark.close() + } + def exampleBody(spark : SparkSession): Unit = { // Specify date format based on raw data CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") import spark.implicits._ // Drop table - spark.sql("DROP TABLE IF EXISTS t3") - spark.sql("DROP TABLE IF EXISTS t5") + spark.sql("DROP TABLE IF EXISTS IUD_table1") + spark.sql("DROP TABLE IF EXISTS IUD_table2") - // Simulate data and write to table t3 + // Simulate data and write to table IUD_table1 val sdf = new SimpleDateFormat("yyyy-MM-dd") var df = spark.sparkContext.parallelize(1 to 10) .map(x => (x, new java.sql.Date(sdf.parse("2015-07-" + (x % 10 + 10)).getTime), "china", "aaa" + x, "phone" + 555 * x, "ASD" + (60000 + x), 14999 + x)) - .toDF("t3_id", "t3_date", "t3_country", "t3_name", - "t3_phonetype", "t3_serialname", "t3_salary") + .toDF("IUD_table1_id", "IUD_table1_date", "IUD_table1_country", "IUD_table1_name", + "IUD_table1_phonetype", "IUD_table1_serialname", "IUD_table1_salary") df.write .format("carbondata") - .option("tableName", "t3") - .option("tempCSV", "true") + .option("tableName", "IUD_table1") .option("compress", "true") .mode(SaveMode.Overwrite) .save() - // Simulate data and write to table t5 + // Simulate data and write to table IUD_table2 df = spark.sparkContext.parallelize(1 to 10) .map(x => (x, new java.sql.Date(sdf.parse("2017-07-" + (x % 20 + 1)).getTime), "usa", "bbb" + x, "phone" + 100 * x, "ASD" + (1000 * x - x), 25000 + x)) - .toDF("t5_id", "t5_date", "t5_country", "t5_name", - "t5_phonetype", "t5_serialname", "t5_salary") + .toDF("IUD_table2_id", "IUD_table2_date", "IUD_table2_country", "IUD_table2_name", + "IUD_table2_phonetype", "IUD_table2_serialname", "IUD_table2_salary") df.write .format("carbondata") - .option("tableName", "t5") + .option("tableName", "IUD_table2") .option("tempCSV", "true") .option("compress", "true") .mode(SaveMode.Overwrite) .save() spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() spark.sql(""" - SELECT * FROM t5 ORDER BY t5_id + SELECT * FROM IUD_table2 ORDER BY IUD_table2_id """).show() // 1.Update data with simple SET // Update data where salary < 15003 val dateStr = "2018-08-08" spark.sql(s""" - UPDATE t3 SET (t3_date, t3_country) = ('$dateStr', 'india') WHERE t3_salary < 15003 + UPDATE IUD_table1 SET (IUD_table1_date, IUD_table1_country) = ('$dateStr', 'india') + WHERE IUD_table1_salary < 15003 """).show() // Query data again after the above update spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() spark.sql(""" - UPDATE t3 SET (t3_salary) = (t3_salary + 9) WHERE t3_name = 'aaa1' + UPDATE IUD_table1 SET (IUD_table1_salary) = (IUD_table1_salary + 9) + WHERE IUD_table1_name = 'aaa1' """).show() // Query data again after the above update spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() // 2.Update data with subquery result SET spark.sql(""" - UPDATE t3 - SET (t3_country, t3_name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5) - WHERE t3_id < 5""").show() + UPDATE IUD_table1 + SET (IUD_table1_country, IUD_table1_name) = (SELECT IUD_table2_country, IUD_table2_name + FROM IUD_table2 WHERE IUD_table2_id = 5) + WHERE IUD_table1_id < 5""").show() spark.sql(""" - UPDATE t3 - SET (t3_date, t3_serialname, t3_salary) = - (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5) - WHERE t3_id < 5""").show() + UPDATE IUD_table1 + SET (IUD_table1_date, IUD_table1_serialname, IUD_table1_salary) = + (SELECT '2099-09-09', IUD_table2_serialname, '9999' + FROM IUD_table2 WHERE IUD_table2_id = 5) + WHERE IUD_table1_id < 5""").show() // Query data again after the above update spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() // 3.Update data with join query result SET spark.sql(""" - UPDATE t3 - SET (t3_country, t3_salary) = - (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u - WHERE u.t3_id = t5_id and t5_id=6) WHERE t3_id >6""").show() + UPDATE IUD_table1 + SET (IUD_table1_country, IUD_table1_salary) = + (SELECT IUD_table2_country, IUD_table2_salary FROM IUD_table2 FULL JOIN IUD_table1 u + WHERE u.IUD_table1_id = IUD_table2_id and IUD_table2_id=6) + WHERE IUD_table1_id >6""").show() // Query data again after the above update spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() // 4.Delete data where salary > 15005 spark.sql(""" - DELETE FROM t3 WHERE t3_salary > 15005 + DELETE FROM IUD_table1 WHERE IUD_table1_salary > 15005 """).show() // Query data again after delete data spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() // 5.Delete data WHERE id in (1, 2, $key) val key = 3 spark.sql(s""" - DELETE FROM t3 WHERE t3_id in (1, 2, $key) + DELETE FROM IUD_table1 WHERE IUD_table1_id in (1, 2, $key) """).show() // Query data again after delete data spark.sql(""" - SELECT * FROM t3 ORDER BY t3_id + SELECT * FROM IUD_table1 ORDER BY IUD_table1_id """).show() - // Drop table - spark.sql("DROP TABLE IF EXISTS t3") - spark.sql("DROP TABLE IF EXISTS t5") + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) - spark.stop() + // Drop table + spark.sql("DROP TABLE IF EXISTS IUD_table1") + spark.sql("DROP TABLE IF EXISTS IUD_table2") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala deleted file mode 100644 index 0c725e1..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.File - -import org.apache.spark.sql.{SaveMode, SparkSession} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -// scalastyle:off println - -object ExampleUtils { - - def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../") - .getCanonicalPath - val storeLocation: String = currentPath + "/target/store" - - def createCarbonSession(appName: String, workThreadNum: Int = 1): SparkSession = { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" - val metastoredb = s"$rootPath/examples/spark2/target" - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") - .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") - .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "") - - val masterUrl = if (workThreadNum <= 1) { - "local" - } else { - "local[" + workThreadNum.toString() + "]" - } - import org.apache.spark.sql.CarbonSession._ - val spark = SparkSession - .builder() - .master(masterUrl) - .appName(appName) - .config("spark.sql.warehouse.dir", warehouse) - .getOrCreateCarbonSession(storeLocation, metastoredb) - spark.sparkContext.setLogLevel("WARN") - spark - } - - /** - * This func will write a sample CarbonData file containing following schema: - * c1: String, c2: String, c3: Double - * Returns table path - */ - def writeSampleCarbonFile(spark: SparkSession, tableName: String, numRows: Int = 1000): String = { - spark.sql(s"DROP TABLE IF EXISTS $tableName") - writeDataframe(spark, tableName, numRows, SaveMode.Overwrite) - s"$storeLocation/default/$tableName" - } - - /** - * This func will append data to the CarbonData file - * Returns table path - */ - def appendSampleCarbonFile( - spark: SparkSession, tableName: String, numRows: Int = 1000): String = { - writeDataframe(spark, tableName, numRows, SaveMode.Append) - s"$storeLocation/default/$tableName" - } - - /** - * create a new dataframe and write to CarbonData file, based on save mode - */ - private def writeDataframe( - spark: SparkSession, tableName: String, numRows: Int, mode: SaveMode): Unit = { - // use CarbonContext to write CarbonData files - import spark.implicits._ - val sc = spark.sparkContext - val df = sc.parallelize(1 to numRows, 2) - .map(x => ("a", "b", x)) - .toDF("c1", "c2", "c3") - - // save dataframe directl to carbon file without tempCSV - df.write - .format("carbondata") - .option("tableName", tableName) - .option("compress", "true") - .option("tempCSV", "false") - .mode(mode) - .save() - } - - def cleanSampleCarbonFile(spark: SparkSession, tableName: String): Unit = { - spark.sql(s"DROP TABLE IF EXISTS $tableName") - } -} -// scalastyle:on println - http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala index 465e660..7438638 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala @@ -17,8 +17,12 @@ package org.apache.carbondata.examples +import java.io.File + import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.carbondata.examples.util.ExampleUtils import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.CarbonProjection @@ -27,19 +31,37 @@ object HadoopFileExample { def main(args: Array[String]): Unit = { val spark = ExampleUtils.createCarbonSession("HadoopFileExample") - ExampleUtils.writeSampleCarbonFile(spark, "carbon1") + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation: String = rootPath + "/examples/spark2/target/store/default" + exampleBody(spark, storeLocation) + spark.close() + } + + def exampleBody(spark : SparkSession, storeLocation : String): Unit = { + + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 1000) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + + df.write.format("carbondata") + .option("tableName", "Hadoopfile_table") + .option("compress", "true") + .mode(SaveMode.Overwrite).save() // read two columns val projection = new CarbonProjection projection.addColumn("c1") // column c1 projection.addColumn("c3") // column c3 val conf = new Configuration() + CarbonInputFormat.setColumnProjection(conf, projection) CarbonInputFormat.setDatabaseName(conf, "default") - CarbonInputFormat.setTableName(conf, "carbon1") + CarbonInputFormat.setTableName(conf, "Hadoopfile_table") + - val sc = spark.sparkContext - val input = sc.newAPIHadoopFile(s"${ExampleUtils.storeLocation}/default/carbon1", + val input = spark.sparkContext.newAPIHadoopFile(s"${storeLocation}/Hadoopfile_table", classOf[CarbonTableInputFormat[Array[Object]]], classOf[Void], classOf[Array[Object]], @@ -48,7 +70,7 @@ object HadoopFileExample { result.foreach(x => println(x.mkString(", "))) // delete carbondata file - ExampleUtils.cleanSampleCarbonFile(spark, "carbon1") + ExampleUtils.cleanSampleCarbonFile(spark, "Hadoopfile_table") } } // scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateDataMapExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateDataMapExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateDataMapExample.scala index a76b4ab..367c011 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateDataMapExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateDataMapExample.scala @@ -19,7 +19,9 @@ package org.apache.carbondata.examples import java.io.File -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.examples.util.ExampleUtils /** * This example is for pre-aggregate tables. @@ -28,13 +30,15 @@ import org.apache.spark.sql.SaveMode object PreAggregateDataMapExample { def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("PreAggregateTableExample") + exampleBody(spark) + spark.close() + } + def exampleBody(spark : SparkSession): Unit = { val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv" - val spark = ExampleUtils.createCarbonSession("PreAggregateTableExample") - - spark.sparkContext.setLogLevel("ERROR") // 1. simple usage for Pre-aggregate tables creation and query spark.sql("DROP TABLE IF EXISTS mainTable") @@ -61,6 +65,10 @@ object PreAggregateDataMapExample { LOAD DATA LOCAL INPATH '$testData' into table mainTable """) + spark.sql(""" + select * from mainTable + """) + spark.sql(s""" LOAD DATA LOCAL INPATH '$testData' into table mainTable_other """) @@ -101,7 +109,7 @@ object PreAggregateDataMapExample { spark.sql("show datamap on table mainTable").show(false) // drop datamap - spark.sql("drop datamap preagg_count on table mainTable").show() + spark.sql("drop datamap preagg_count_age on table mainTable").show() spark.sql("show datamap on table mainTable").show(false) spark.sql( @@ -114,7 +122,7 @@ object PreAggregateDataMapExample { s"""create datamap preagg_count on table maintable using 'preaggregate' as | select name, count(*) from maintable group by name""".stripMargin) - spark.sql("show datamap on table maintable").show + spark.sql("show datamap on table maintable").show(false) spark.sql( s""" @@ -139,7 +147,7 @@ object PreAggregateDataMapExample { s""" | select t1.name,t1.city from mainTable_other t1 join | (select name as newnewname,sum(age) as sum - | from mainTable group by name )t2 on t1.name=t2.newnewname + | from mainTable group by name) t2 on t1.name=t2.newnewname """.stripMargin).show() // 2.compare the performance : with pre-aggregate VS main table @@ -218,17 +226,14 @@ object PreAggregateDataMapExample { } // scalastyle:off println("time for query with function sum on table with pre-aggregate table:" + - time_with_aggTable_sum.toString) + time_with_aggTable_sum.toString) println("time for query with function sum on table without pre-aggregate table:" + - time_without_aggTable_sum.toString) + time_without_aggTable_sum.toString) // scalastyle:on spark.sql("DROP TABLE IF EXISTS mainTable") spark.sql("DROP TABLE IF EXISTS mainTable_other") spark.sql("DROP TABLE IF EXISTS personTable") spark.sql("DROP TABLE IF EXISTS personTableWithoutAgg") - - spark.close() - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/QuerySegmentExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/QuerySegmentExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/QuerySegmentExample.scala index 03312a0..5719926 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/QuerySegmentExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/QuerySegmentExample.scala @@ -19,8 +19,12 @@ package org.apache.carbondata.examples import java.io.File +import org.apache.spark.sql.SparkSession + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils + /** * This example introduces how to query data with specified segments @@ -30,14 +34,19 @@ object QuerySegmentExample { def main(args: Array[String]) { val spark = ExampleUtils.createCarbonSession("QuerySegmentExample") - spark.sparkContext.setLogLevel("ERROR") + exampleBody(spark) + spark.close() + } - spark.sql("DROP TABLE IF EXISTS carbon_table") + def exampleBody(spark : SparkSession): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + spark.sql("DROP TABLE IF EXISTS querysegment_table") // Create table spark.sql( s""" - | CREATE TABLE carbon_table( + | CREATE TABLE querysegment_table( | shortField SHORT, | intField INT, | bigintField LONG, @@ -59,30 +68,30 @@ object QuerySegmentExample { // load 4 segments, each load has 10 rows data // scalastyle:off (1 to 4).foreach(_ => spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table - | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE querysegment_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') """.stripMargin)) // scalastyle:on // 1.Query data with specified segments without compaction - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE querysegment_table").show() // 40 rows spark.sql( s""" | SELECT count(*) - | FROM carbon_table + | FROM querysegment_table """.stripMargin).show() // specify segments to query - spark.sql("SET carbon.input.segments.default.carbon_table = 1,3") + spark.sql("SET carbon.input.segments.default.querysegment_table = 1,3") // 20 rows from segment1 and segment3 spark.sql( s""" | SELECT count(*) - | FROM carbon_table + | FROM querysegment_table """.stripMargin).show() // 2.Query data with specified segments after compaction @@ -90,49 +99,52 @@ object QuerySegmentExample { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "3,2") - spark.sql("ALTER TABLE carbon_table COMPACT 'MINOR'") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("ALTER TABLE querysegment_table COMPACT 'MINOR'") + spark.sql("SHOW SEGMENTS FOR TABLE querysegment_table").show() // Reset to query all segments data - spark.sql("SET carbon.input.segments.default.carbon_table = *") + spark.sql("SET carbon.input.segments.default.querysegment_table = *") // 40 rows from all segments spark.sql( s""" | SELECT count(*) - | FROM carbon_table + | FROM querysegment_table """.stripMargin).show() // After MINOR compaction, 0.1 has 30 rows data(compact 3 segments) - spark.sql("SET carbon.input.segments.default.carbon_table = 0.1") + spark.sql("SET carbon.input.segments.default.querysegment_table = 0.1") spark.sql( s""" | SELECT count(*) - | FROM carbon_table + | FROM querysegment_table """.stripMargin).show() - spark.sql("ALTER TABLE carbon_table COMPACT 'MAJOR'") - spark.sql("CLEAN FILES FOR TABLE carbon_table") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("ALTER TABLE querysegment_table COMPACT 'MAJOR'") + spark.sql("CLEAN FILES FOR TABLE querysegment_table") + spark.sql("SHOW SEGMENTS FOR TABLE querysegment_table").show() // Load 2 new segments (1 to 2).foreach(_ => spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE querysegment_table | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') """.stripMargin)) - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE querysegment_table").show() // 50 rows: segment0.2 has 40 rows after major compaction, plus segment5 with 10 rows - spark.sql("SET carbon.input.segments.default.carbon_table = 0.2,5") + spark.sql("SET carbon.input.segments.default.querysegment_table = 0.2,5") spark.sql( s""" | SELECT count(*) - | FROM carbon_table + | FROM querysegment_table """.stripMargin).show() + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") - spark.stop() + spark.sql("DROP TABLE IF EXISTS querysegment_table") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala index df1e68f..5155e36 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala @@ -25,6 +25,10 @@ import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +/** + * This example doesn't create carbonsession, but use CarbonSource when creating table + */ + object SparkSessionExample { def main(args: Array[String]): Unit = { @@ -42,7 +46,7 @@ object SparkSessionExample { clean(metastoredb) } - val spark = SparkSession + val sparksession = SparkSession .builder() .master("local") .appName("SparkSessionExample") @@ -53,18 +57,17 @@ object SparkSessionExample { .getOrCreate() CarbonProperties.getInstance() - .addProperty("carbon.storelocation", storeLocation) - - spark.sparkContext.setLogLevel("WARN") - - CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + .addProperty("carbon.storelocation", storeLocation) + + sparksession.sparkContext.setLogLevel("ERROR") // Create table - spark.sql( + sparksession.sql("DROP TABLE IF EXISTS sparksession_table") + sparksession.sql( s""" - | CREATE TABLE carbon_table( + | CREATE TABLE sparksession_table( | shortField SHORT, | intField INT, | bigintField LONG, @@ -77,12 +80,13 @@ object SparkSessionExample { | ) | USING org.apache.spark.sql.CarbonSource | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField', - | 'dbName'='default', 'tableName'='carbon_table') + | 'dbName'='default', 'tableName'='sparksession_table') """.stripMargin) val path = s"$rootPath/examples/spark2/src/main/resources/data.csv" - spark.sql( + sparksession.sql("DROP TABLE IF EXISTS csv_table") + sparksession.sql( s""" | CREATE TABLE csv_table( | shortField SHORT, @@ -97,77 +101,86 @@ object SparkSessionExample { | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' """.stripMargin) - spark.sql( + sparksession.sql( s""" | LOAD DATA LOCAL INPATH '$path' | INTO TABLE csv_table """.stripMargin) - spark.sql("SELECT * FROM csv_table").show() + sparksession.sql("SELECT * FROM csv_table").show() - spark.sql( + sparksession.sql( s""" - | INSERT INTO TABLE carbon_table + | INSERT INTO TABLE sparksession_table | SELECT shortField, intField, bigintField, doubleField, stringField, | from_unixtime(unix_timestamp(timestampField,'yyyy/MM/dd HH:mm:ss')) timestampField, | decimalField,from_unixtime(unix_timestamp(dateField,'yyyy/MM/dd')), charField | FROM csv_table """.stripMargin) - spark.sql( + sparksession.sql("SELECT * FROM sparksession_table").show() + + sparksession.sql( s""" | SELECT * - | FROM carbon_table + | FROM sparksession_table | WHERE stringfield = 'spark' AND decimalField > 40 """.stripMargin).show() // Shows with raw data's timestamp format - spark.sql( + sparksession.sql( s""" | SELECT | stringField, date_format(timestampField, "yyyy/MM/dd HH:mm:ss") AS | timestampField - | FROM carbon_table WHERE length(stringField) = 5 + | FROM sparksession_table WHERE length(stringField) = 5 """.stripMargin).show() - spark.sql( + sparksession.sql( s""" | SELECT * - | FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23" + | FROM sparksession_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23" """.stripMargin).show() - spark.sql("SELECT count(stringField) FROM carbon_table").show() + sparksession.sql("SELECT count(stringField) FROM sparksession_table").show() - spark.sql( + sparksession.sql( s""" | SELECT sum(intField), stringField - | FROM carbon_table + | FROM sparksession_table | GROUP BY stringField """.stripMargin).show() - spark.sql( + sparksession.sql( s""" | SELECT t1.*, t2.* - | FROM carbon_table t1, carbon_table t2 + | FROM sparksession_table t1, sparksession_table t2 | WHERE t1.stringField = t2.stringField """.stripMargin).show() - spark.sql( + sparksession.sql( s""" | WITH t1 AS ( - | SELECT * FROM carbon_table + | SELECT * FROM sparksession_table | UNION ALL - | SELECT * FROM carbon_table + | SELECT * FROM sparksession_table | ) | SELECT t1.*, t2.* - | FROM t1, carbon_table t2 + | FROM t1, sparksession_table t2 | WHERE t1.stringField = t2.stringField """.stripMargin).show() + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") - spark.sql("DROP TABLE IF EXISTS csv_table") + sparksession.sql("DROP TABLE IF EXISTS sparksession_table") + sparksession.sql("DROP TABLE IF EXISTS csv_table") - spark.stop() + sparksession.stop() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala index d819a3f..27ea893 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.examples.util.ExampleUtils import org.apache.carbondata.streaming.CarbonSparkStreamingListener import org.apache.carbondata.streaming.parser.CarbonStreamParser http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala index 485eb89..e04d934 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils /** * This example is for standard partition, same as hive and spark partition @@ -31,18 +32,18 @@ import org.apache.carbondata.core.util.CarbonProperties object StandardPartitionExample { def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("StandardPartitionExample") + exampleBody(spark) + spark.close() + } + def exampleBody(spark : SparkSession): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val testData = s"$rootPath/integration/spark-common-test/src/test/resources/" + s"partition_data_example.csv" - val spark = ExampleUtils.createCarbonSession("StandardPartitionExample") - - spark.sparkContext.setLogLevel("ERROR") - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - /** * 1. Partition basic usages */ @@ -186,12 +187,13 @@ object StandardPartitionExample { println("----time of with partition----:" + time_with_partition.toString) // scalastyle:on + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + spark.sql("DROP TABLE IF EXISTS partitiontable0") spark.sql("DROP TABLE IF EXISTS withoutpartition") spark.sql("DROP TABLE IF EXISTS withpartition") spark.sql("DROP TABLE IF EXISTS origintable") - - spark.close() - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala index 89883f8..57d4596 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala @@ -24,6 +24,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext, Time} +import org.apache.carbondata.examples.util.ExampleUtils + /** * This example introduces how to use CarbonData batch load to integrate * with Spark Streaming(it's DStream, not Spark Structured Streaming) http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala index a07d504..109629e 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.examples.util.ExampleUtils import org.apache.carbondata.streaming.parser.CarbonStreamParser case class FileElement(school: Array[String], age: Int)
