Repository: carbondata Updated Branches: refs/heads/master ee9df2e2c -> 26976a816
http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala index 1d4bedf..38a1941 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.examples.util.ExampleUtils // scalastyle:off println object StructuredStreamingExample { http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala index 8b34a8b..17b299e 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala @@ -19,20 +19,33 @@ package org.apache.carbondata.examples import java.io.File +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils + + object TableLevelCompactionOptionExample { def main(args: Array[String]) { - val spark = ExampleUtils.createCarbonSession("DataManagementExample") - spark.sparkContext.setLogLevel("WARN") + val spark = ExampleUtils.createCarbonSession("TableLevelCompactionOptionExample") + exampleBody(spark) + spark.close() + } - spark.sql("DROP TABLE IF EXISTS carbon_table") + def exampleBody(spark : SparkSession): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + + spark.sql("DROP TABLE IF EXISTS tablecompaction_table") // Create table with table level compaction options // while loading and compacting, table level compaction options will be used instead of // options specified in carbon.properties spark.sql( s""" - | CREATE TABLE IF NOT EXISTS carbon_table( + | CREATE TABLE IF NOT EXISTS tablecompaction_table( | ID Int, | date Date, | country String, @@ -51,7 +64,7 @@ object TableLevelCompactionOptionExample { """.stripMargin) val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath + + "../../../..").getCanonicalPath val path = s"$rootPath/examples/spark2/src/main/resources/dataSample.csv" // load 6 segments @@ -59,48 +72,53 @@ object TableLevelCompactionOptionExample { (1 to 6).foreach(_ => spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE tablecompaction_table | OPTIONS('HEADER'='true') """.stripMargin)) // scalastyle:on // show all segments, existing segments are 0.1,3,4,5, compacted segments are 0,1,2 // because of 2 segments are preserved, only one level-1 minor compaction is triggered - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE tablecompaction_table").show() // load another 2 segments // scalastyle:off (1 to 2).foreach(_ => spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE tablecompaction_table | OPTIONS('HEADER'='true') """.stripMargin)) // scalastyle:on // show all segments, existing segments will be 0.2,6,7, // compacted segments are 0,1,2,3,4,5,0.1,3.1 - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("SHOW SEGMENTS FOR TABLE tablecompaction_table").show() // load another 2 segments // scalastyle:off (1 to 2).foreach(_ => spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbon_table + | INTO TABLE tablecompaction_table | OPTIONS('HEADER'='true') """.stripMargin)) // scalastyle:on // do major compaction, there will be 3 segment left(2 preserved segments) - spark.sql("ALTER TABLE carbon_table COMPACT 'MAJOR'") - spark.sql("CLEAN FILES FOR TABLE carbon_table") - spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + spark.sql("ALTER TABLE tablecompaction_table COMPACT 'MAJOR'") + spark.sql("CLEAN FILES FOR TABLE tablecompaction_table") + spark.sql("SHOW SEGMENTS FOR TABLE tablecompaction_table").show() - // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) - spark.stop() + // Drop table + spark.sql("DROP TABLE IF EXISTS tablecompaction_table") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala index 470d9ff..5a79e0b 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala @@ -19,10 +19,11 @@ package org.apache.carbondata.examples import java.io.File -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils /** * This example is for time series pre-aggregate tables. @@ -31,28 +32,28 @@ import org.apache.carbondata.core.util.CarbonProperties object TimeSeriesPreAggregateTableExample { def main(args: Array[String]) { - - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val testData = s"$rootPath/integration/spark-common-test/src/test/resources/timeseriestest.csv" val spark = ExampleUtils.createCarbonSession("TimeSeriesPreAggregateTableExample") + exampleBody(spark) + spark.close() + } - spark.sparkContext.setLogLevel("ERROR") - + def exampleBody(spark : SparkSession): Unit = { + val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath + val testData = s"$rootPath/integration/spark-common-test/src/test/resources/timeseriestest.csv" import spark.implicits._ import scala.util.Random val r = new Random() val df = spark.sparkContext.parallelize(1 to 10 * 1000 ) .map(x => ("" + 20 + "%02d".format(r.nextInt(20)) + "-" + "%02d".format(r.nextInt(11) + 1) + - "-" + "%02d".format(r.nextInt(27) + 1) + " " + "%02d".format(r.nextInt(12)) + ":" + - "%02d".format(r.nextInt(59)) + ":" + "%02d".format(r.nextInt(59)), "name" + x % 8, + "-" + "%02d".format(r.nextInt(27) + 1) + " " + "%02d".format(r.nextInt(12)) + ":" + + "%02d".format(r.nextInt(59)) + ":" + "%02d".format(r.nextInt(59)), "name" + x % 8, r.nextInt(60))).toDF("mytime", "name", "age") // 1. usage for time series Pre-aggregate tables creation and query spark.sql("drop table if exists timeSeriesTable") spark.sql("CREATE TABLE timeSeriesTable(mytime timestamp," + - " name string, age int) STORED BY 'org.apache.carbondata.format'") + " name string, age int) STORED BY 'org.apache.carbondata.format'") spark.sql( s""" | CREATE DATAMAP agg0_hour ON TABLE timeSeriesTable @@ -95,9 +96,13 @@ object TimeSeriesPreAggregateTableExample { 'year') """.stripMargin).show() - spark.sql("DROP TABLE IF EXISTS timeSeriesTable") - - spark.close() + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + spark.sql("DROP TABLE IF EXISTS timeSeriesTable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala new file mode 100644 index 0000000..d5308c3 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples.util + +import java.io.DataOutputStream + +import scala.collection.mutable.{ArrayBuffer, HashSet} + +import org.apache.spark.SparkContext + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory + + +object AllDictionaryUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + def extractDictionary(sc: SparkContext, + srcData: String, + outputPath: String, + fileHeader: String, + dictCol: String): Unit = { + val fileHeaderArr = fileHeader.split(",") + val isDictCol = new Array[Boolean](fileHeaderArr.length) + for (i <- 0 until fileHeaderArr.length) { + if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) { + isDictCol(i) = true + } else { + isDictCol(i) = false + } + } + val dictionaryRdd = sc.textFile(srcData).flatMap(x => { + val tokens = x.split(",") + val result = new ArrayBuffer[(Int, String)]() + for (i <- 0 until isDictCol.length) { + if (isDictCol(i)) { + try { + result += ((i, tokens(i))) + } catch { + case _: ArrayIndexOutOfBoundsException => + LOGGER.error("Read a bad record: " + x) + } + } + } + result + }).groupByKey().flatMap(x => { + val distinctValues = new HashSet[(Int, String)]() + for (value <- x._2) { + distinctValues.add(x._1, value) + } + distinctValues + }) + val dictionaryValues = dictionaryRdd.map(x => x._1 + "," + x._2).collect() + saveToFile(dictionaryValues, outputPath) + } + + def cleanDictionary(outputPath: String): Unit = { + try { + val fileType = FileFactory.getFileType(outputPath) + val file = FileFactory.getCarbonFile(outputPath, fileType) + if (file.exists()) { + file.delete() + } + } catch { + case ex: Exception => + LOGGER.error("Clean dictionary catching exception:" + ex) + } + } + + def saveToFile(contents: Array[String], outputPath: String): Unit = { + var writer: DataOutputStream = null + try { + val fileType = FileFactory.getFileType(outputPath) + val file = FileFactory.getCarbonFile(outputPath, fileType) + if (!file.exists()) { + file.createNewFile() + } + writer = FileFactory.getDataOutputStream(outputPath, fileType) + for (content <- contents) { + writer.writeBytes(content + "\n") + } + } catch { + case ex: Exception => + LOGGER.error("Save dictionary to file catching exception:" + ex) + } finally { + if (writer != null) { + try { + writer.close() + } catch { + case ex: Exception => + LOGGER.error("Close output stream catching exception:" + ex) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala new file mode 100644 index 0000000..1cdaafe --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples.util + +import java.io.File + +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + + +// scalastyle:off println + +object ExampleUtils { + + def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../") + .getCanonicalPath + val storeLocation: String = currentPath + "/target/store" + + def createCarbonSession(appName: String, workThreadNum: Int = 1): SparkSession = { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "") + + val masterUrl = if (workThreadNum <= 1) { + "local" + } else { + "local[" + workThreadNum.toString() + "]" + } + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master(masterUrl) + .appName(appName) + .config("spark.sql.warehouse.dir", warehouse) + .config("spark.driver.host", "localhost") + .config("spark.sql.crossJoin.enabled", "true") + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("ERROR") + spark + } + + /** + * This func will write a sample CarbonData file containing following schema: + * c1: String, c2: String, c3: Double + * Returns table path + */ + def writeSampleCarbonFile(spark: SparkSession, tableName: String, numRows: Int = 1000): String = { + spark.sql(s"DROP TABLE IF EXISTS $tableName") + writeDataframe(spark, tableName, numRows, SaveMode.Overwrite) + s"$storeLocation/default/$tableName" + } + + /** + * This func will append data to the CarbonData file + * Returns table path + */ + def appendSampleCarbonFile( + spark: SparkSession, tableName: String, numRows: Int = 1000): String = { + writeDataframe(spark, tableName, numRows, SaveMode.Append) + s"$storeLocation/default/$tableName" + } + + /** + * create a new dataframe and write to CarbonData file, based on save mode + */ + private def writeDataframe( + spark: SparkSession, tableName: String, numRows: Int, mode: SaveMode): Unit = { + // use CarbonContext to write CarbonData files + import spark.implicits._ + val sc = spark.sparkContext + val df = sc.parallelize(1 to numRows, 2) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + + // save dataframe directl to carbon file without tempCSV + df.write + .format("carbondata") + .option("tableName", tableName) + .option("compress", "true") + .option("tempCSV", "false") + .mode(mode) + .save() + } + + def cleanSampleCarbonFile(spark: SparkSession, tableName: String): Unit = { + spark.sql(s"DROP TABLE IF EXISTS $tableName") + } +} +// scalastyle:on println + http://git-wip-us.apache.org/repos/asf/carbondata/blob/26976a81/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala new file mode 100644 index 0000000..ddf8ee5 --- /dev/null +++ b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examplesCI + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.examples._ +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +/** + * Test suite for examples + */ + +class RunExamples extends QueryTest with BeforeAndAfterAll { + + private val spark = sqlContext.sparkSession + + override def beforeAll: Unit = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + } + + override def afterAll { + sql("USE default") + + } + + test("AllDictionaryExample") { + AllDictionaryExample.exampleBody(spark) + } + + test("AlterTableExample") { + AlterTableExample.exampleBody(spark) + } + + test("CarbonDataFrameExample") { + CarbonDataFrameExample.exampleBody(spark) + } + + test("CarbonPartitionExample") { + CarbonPartitionExample.exampleBody(spark) + } + + test("CarbonSessionExample") { + CarbonSessionExample.exampleBody(spark) + } + + test("CarbonSortColumnsExample") { + CarbonSortColumnsExample.exampleBody(spark) + } + + test("CaseClassDataFrameAPIExample") { + CaseClassDataFrameAPIExample.exampleBody(spark) + } + + test("DataFrameComplexTypeExample") { + DataFrameComplexTypeExample.exampleBody(spark) + } + + test("DataManagementExample") { + DataManagementExample.exampleBody(spark) + } + + test("DataUpdateDeleteExample") { + DataUpdateDeleteExample.exampleBody(spark) + } + + test("PreAggregateDataMapExample") { + PreAggregateDataMapExample.exampleBody(spark) + } + + test("QuerySegmentExample") { + QuerySegmentExample.exampleBody(spark) + } + + test("StandardPartitionExample") { + StandardPartitionExample.exampleBody(spark) + } + + test("TableLevelCompactionOptionExample") { + TableLevelCompactionOptionExample.exampleBody(spark) + } + + test("TimeSeriesPreAggregateTableExample") { + TimeSeriesPreAggregateTableExample.exampleBody(spark) + } +} \ No newline at end of file
