Repository: carbondata Updated Branches: refs/heads/master fca960e37 -> 52f8d7111
[CARBONDATA-1899] Optimize CarbonData concurrency test case This closes #1713 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/52f8d711 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/52f8d711 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/52f8d711 Branch: refs/heads/master Commit: 52f8d7111c0d29ced3974fb9e8a63b52fb57d5a4 Parents: fca960e Author: xubo245 <[email protected]> Authored: Fri Dec 22 16:55:01 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Mon Apr 2 14:47:06 2018 +0800 ---------------------------------------------------------------------- .../benchmark/ConcurrentQueryBenchmark.scala | 573 +++++++++++++++++++ .../carbondata/benchmark/DataGenerator.scala | 83 +++ .../org/apache/carbondata/benchmark/Query.scala | 27 + .../benchmark/SimpleQueryBenchmark.scala | 341 +++++++++++ .../carbondata/examples/CompareTest.scala | 397 ------------- .../carbondata/examples/ConcurrencyTest.scala | 358 ------------ 6 files changed, 1024 insertions(+), 755 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala new file mode 100644 index 0000000..7da8c29 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + * --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + * --master yarn \ + * --deploy-mode client \ + * --driver-memory 16g \ + * --executor-cores 4g \ + * --executor-memory 24g \ + * --num-executors 3 \ + * concurrencyTest.jar \ + * totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 10 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + // performance test queries, they are designed to test various data access type + val r = new Random() + lazy val tmpId = r.nextInt(cardinalityId) % totalNum + lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + // different query SQL + lazy val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ) + , Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select city from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select * from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension, medium result set, fetch all columns" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select id from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + /** + * generate parquet format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating parquet format table + */ + private def generateParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + } + + /** + * generate ORC format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating ORC format table + */ + private def generateOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = + time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + } + + /** + * generate carbon format table + * + * @param spark SparkSession + * @param input DataFrame + * @param tableName table name + * @return the time of generating carbon format table + */ + private def generateCarbonTable(spark: SparkSession, input: DataFrame, tableName: String) + : Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + /** + * load data into parquet, carbonV2, carbonV3 + * + * @param spark SparkSession + * @param table1 table1 name + * @param table2 table2 name + */ + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = if (generateFile) { + DataGenerator.generateDataFrame(spark, totalNum).cache + } else { + null + } + + val table1Time = time { + if (table1.endsWith("parquet")) { + if (generateFile) { + generateParquetTable(spark, df, table1) + } + spark.read.parquet(table1).createOrReplaceTempView(table1) + } else if (table1.endsWith("orc")) { + if (generateFile) { + generateOrcTable(spark, df, table1) + spark.read.orc(table1).createOrReplaceTempView(table1) + } + } else { + sys.error("invalid table: " + table1) + } + } + println(s"$table1 completed, time: $table1Time sec") + + val table2Time: Double = if (generateFile) { + generateCarbonTable(spark, df, table2) + } else { + 0.0 + } + println(s"$table2 completed, time: $table2Time sec") + if (null != df) { + df.unpersist() + } + } + + /** + * Run all queries for the specified table + * + * @param spark SparkSession + * @param tableName table name + */ + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println() + println(s"Start running queries for $tableName...") + println( + "Min: min time" + + "\tMax: max time" + + "\t90%: 90% time" + + "\t99%: 99% time" + + "\tAvg: average time" + + "\tCount: number of result" + + "\tQuery X: running different query sql" + + "\tResult: show it when ResultIsEmpty is false" + + "\tTotal execute time: total runtime") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(threadNum) + val tasks = new java.util.ArrayList[Callable[Results]]() + val tasksStartTime = System.nanoTime() + for (num <- 1 to taskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val tasksEndTime = System.nanoTime() + val sql = s"Query ${index + 1}: $sqlText " + printResults(results, sql, tasksStartTime) + val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 * 1000 * 1000) + println("Total execute time: " + taskTime.formatted("%.3f") + " s") + + val timeString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + writeResults(spark, results, sql, tasksStartTime, + path + s"/${tableName}_query${index + 1}_$timeString") + } + } + + /** + * save the result for subsequent analysis + * + * @param spark SparkSession + * @param results Results + * @param sql query sql + * @param start tasks start time + * @param filePath write file path + */ + def writeResults( + spark: SparkSession, + results: java.util.List[Future[Results]], + sql: String = "", + start: Long, + filePath: String): Unit = { + val timeArray = new Array[(Double, Double, Double)](results.size()) + for (i <- 0 until results.size()) { + timeArray(i) = + ((results.get(i).get().startTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - results.get(i).get().startTime) / (1000.0 * 1000)) + } + val timeArraySorted = timeArray.sortBy(x => x._1) + val timeArrayString = timeArraySorted.map { e => + e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" + e._3.formatted("%.3f") + } + val saveArray = Array(sql, "startTime, endTime, runtime, measure time by the microsecond", + s"${timeArrayString.length}") + .union(timeArrayString) + val rdd = spark.sparkContext.parallelize(saveArray, 1) + rdd.saveAsTextFile(filePath) + } + + /** + * print out results + * + * @param results Results + * @param sql query sql + * @param tasksStartTime tasks start time + */ + def printResults(results: util.List[Future[Results]], sql: String = "", tasksStartTime: Long) { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + // the time of 90 percent sql are finished + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + // the time of 99 percent sql are finished + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 + print( + "Min: " + sortTimeArray.head.formatted("%.3f") + " s," + + "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," + + "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," + + "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," + + "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," + + "\t\tCount: " + results.get(0).get.count + + "\t\t\t\t" + sql + + "\t" + sqlResult.mkString(",") + "\t") + } + + /** + * save result after finishing each task/thread + * + * @param time each task time of executing query sql and with millis time + * @param sqlResult query sql result + * @param count result count + * @param startTime task start time with nano time + * @param endTime task end time with nano time + */ + case class Results( + time: Double, + sqlResult: Array[Row], + count: Int, + startTime: Long, + endTime: Long) + + + class QueryTask(spark: SparkSession, query: String) + extends Callable[Results] with Serializable { + override def call(): Results = { + var result: Array[Row] = null + val startTime = System.nanoTime() + val rt = time { + result = spark.sql(query).collect() + } + val endTime = System.nanoTime() + if (resultIsEmpty) { + Results(rt, Array.empty[Row], count = result.length, startTime, endTime) + } else { + Results(rt, result, count = result.length, startTime, endTime) + } + } + } + + /** + * run testcases and print comparison result + * + * @param spark SparkSession + * @param table1 table1 name + * @param table2 table2 name + */ + def runTest(spark: SparkSession, table1: String, table2: String): Unit = { + // run queries on parquet and carbon + runQueries(spark, table1) + // do GC and sleep for some time before running next table + System.gc() + Thread.sleep(1000) + System.gc() + Thread.sleep(1000) + runQueries(spark, table2) + } + + /** + * the time of running code + * + * @param code the code + * @return the run time + */ + def time(code: => Unit): Double = { + val start = System.currentTimeMillis() + code + // return time in second + (System.currentTimeMillis() - start).toDouble / 1000 + } + + /** + * init parameters + * + * @param arr parameters + */ + def initParameters(arr: Array[String]): Unit = { + if (arr.length > 0) { + totalNum = arr(0).toInt + } + if (arr.length > 1) { + threadNum = arr(1).toInt + } + if (arr.length > 2) { + taskNum = arr(2).toInt + } + if (arr.length > 3) { + resultIsEmpty = if (arr(3).equalsIgnoreCase("true")) { + true + } else if (arr(3).equalsIgnoreCase("false")) { + false + } else { + throw new Exception("error parameter, should be true or false") + } + } + if (arr.length > 4) { + path = arr(4) + } + if (arr.length > 5) { + runInLocal = if (arr(5).equalsIgnoreCase("true")) { + true + } else if (arr(5).equalsIgnoreCase("false")) { + false + } else { + throw new Exception("error parameter, should be true or false") + } + } + if (arr.length > 6) { + generateFile = if (arr(6).equalsIgnoreCase("true")) { + true + } else if (arr(6).equalsIgnoreCase("false")) { + false + } else { + throw new Exception("error parameter, should be true or false") + } + } + if (arr.length > 7) { + deleteFile = if (arr(7).equalsIgnoreCase("true")) { + true + } else if (arr(7).equalsIgnoreCase("false")) { + false + } else { + throw new Exception("error parameter, should be true or false") + } + } + } + + /** + * main method of this benchmark + * + * @param args parameters + */ + def main(args: Array[String]): Unit = { + CarbonProperties.getInstance() + .addProperty("carbon.enable.vector.reader", "true") + .addProperty("enable.unsafe.sort", "true") + .addProperty("carbon.blockletgroup.size.in.mb", "32") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + import org.apache.spark.sql.CarbonSession._ + + // 1. initParameters + initParameters(args) + val table1 = parquetTableName + val table2 = carbonTableName("3") + val parameters = "totalNum: " + totalNum + + "\tthreadNum: " + threadNum + + "\ttaskNum: " + taskNum + + "\tresultIsEmpty: " + resultIsEmpty + + "\tfile path: " + path + + "\trunInLocal: " + runInLocal + + "\tgenerateFile: " + generateFile + + "\tdeleteFile: " + deleteFile + + val spark = if (runInLocal) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + SparkSession + .builder() + .appName(parameters) + .master("local[8]") + .enableHiveSupport() + .getOrCreateCarbonSession(storeLocation) + } else { + SparkSession + .builder() + .appName(parameters) + .enableHiveSupport() + .getOrCreateCarbonSession() + } + spark.sparkContext.setLogLevel("ERROR") + + println("\nEnvironment information:") + val env = Array( + "spark.master", + "spark.driver.cores", + "spark.driver.memory", + "spark.executor.cores", + "spark.executor.memory", + "spark.executor.instances") + env.foreach { each => + println(each + ":\t" + spark.conf.get(each, "default value") + "\t") + } + println("SPARK_VERSION:" + spark.version + "\t") + println("CARBONDATA_VERSION:" + CarbonVersionConstants.CARBONDATA_VERSION + "\t") + println("\nParameters information:") + println(parameters) + + // 2. prepareTable + prepareTable(spark, table1, table2) + + // 3. runTest + runTest(spark, table1, table2) + + if (deleteFile) { + CarbonUtil.deleteFoldersAndFiles(new File(table1)) + spark.sql(s"drop table $table2") + } + spark.close() + } +} + +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala new file mode 100644 index 0000000..e3e67b1 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +// scalastyle:off println +object DataGenerator { + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 10,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + /** + * generate DataFrame with above table schema + * + * @param spark SparkSession + * @return Dataframe of test data + */ + def generateDataFrame(spark: SparkSession, totalNum: Int): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + val df = spark.createDataFrame(rdd, schema) + println(s"Start generate ${df.count} records, schema: ${df.schema}") + df + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala new file mode 100644 index 0000000..9978bda --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +/** + * A query test case + * + * @param sqlText SQL statement + * @param queryType type of query: scan, filter, aggregate, topN + * @param desc description of the goal of this test case + */ +case class Query(sqlText: String, queryType: String, desc: String) http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala new file mode 100644 index 0000000..880f476 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util.Date + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + + + +// scalastyle:off println +object SimpleQueryBenchmark { + + def parquetTableName: String = "comparetest_parquet" + def orcTableName: String = "comparetest_orc" + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // performance test queries, they are designed to test various data access type + val queries: Array[Query] = Array( + // =========================================================================== + // == FULL SCAN AGGREGATION == + // =========================================================================== + Query( + "select sum(m1) from $table", + "full scan", + "full scan query, 1 aggregate" + ), + Query( + "select sum(m1), sum(m2) from $table", + "full scan", + "full scan query, 2 aggregate" + ), + Query( + "select sum(m1), sum(m2), sum(m3) from $table", + "full scan", + "full scan query, 3 aggregate" + ), + Query( + "select sum(m1), sum(m2), sum(m3), sum(m4) from $table", + "full scan", + "full scan query, 4 aggregate" + ), + Query( + "select sum(m1), sum(m2), sum(m3), sum(m4), avg(m5) from $table", + "full scan", + "full scan query, 5 aggregate" + ), + Query( + "select count(distinct id) from $table", + "full scan", + "full scan and count distinct of high card column" + ), + Query( + "select count(distinct country) from $table", + "full scan", + "full scan and count distinct of medium card column" + ), + Query( + "select count(distinct city) from $table", + "full scan", + "full scan and count distinct of low card column" + ), + // =========================================================================== + // == FULL SCAN GROUP BY AGGREGATE == + // =========================================================================== + Query( + "select country, sum(m1) as metric from $table group by country order by metric", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + Query( + "select city, sum(m1) as metric from $table group by city order by metric", + "aggregate", + "group by on big data, on low card column, small result set," + ), + Query( + "select id, sum(m1) as metric from $table group by id order by metric desc limit 100", + "topN", + "top N on high card column" + ), + Query( + "select country,sum(m1) as metric from $table group by country order by metric desc limit 10", + "topN", + "top N on medium card column" + ), + Query( + "select city,sum(m1) as metric from $table group by city order by metric desc limit 10", + "topN", + "top N on low card column" + ), + // =========================================================================== + // == FILTER SCAN GROUP BY AGGREGATION == + // =========================================================================== + Query( + "select country, sum(m1) as metric from $table where city='city8' group by country " + + "order by metric", + "filter scan and aggregate", + "group by on large data, small result set" + ), + Query( + "select id, sum(m1) as metric from $table where planet='planet10' group by id " + + "order by metric", + "filter scan and aggregate", + "group by on medium data, large result set" + ), + Query( + "select city, sum(m1) as metric from $table where country='country12' group by city " + + "order by metric", + "filter scan and aggregate", + "group by on medium data, small result set" + ), + // =========================================================================== + // == FILTER SCAN == + // =========================================================================== + Query( + "select * from $table where city = 'city3' limit 10000", + "filter scan", + "filter on low card dimension, limit, medium result set, fetch all columns" + ), + Query( + "select * from $table where country = 'country9' ", + "filter scan", + "filter on low card dimension, medium result set, fetch all columns" + ), + Query( + "select * from $table where planet = 'planet101' ", + "filter scan", + "filter on medium card dimension, small result set, fetch all columns" + ), + Query( + "select * from $table where id = '408938' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select * from $table where country='country10000' ", + "filter scan", + "filter on low card dimension, not exist" + ), + Query( + "select * from $table where country='country2' and city ='city8' ", + "filter scan", + "filter on 2 dimensions, small result set, fetch all columns" + ), + Query( + "select * from $table where city='city1' and country='country2' and planet ='planet3' ", + "filter scan", + "filter on 3 dimensions, small result set, fetch all columns" + ), + Query( + "select * from $table where m1 < 3", + "filter scan", + "filter on measure, small result set, fetch all columns" + ), + Query( + "select * from $table where id like '1%' ", + "fuzzy filter scan", + "like filter, big result set" + ), + Query( + "select * from $table where id like '%111'", + "fuzzy filter scan", + "like filter, medium result set" + ), + Query( + "select * from $table where id like 'xyz%' ", + "fuzzy filter scan", + "like filter, full scan but not exist" + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "V3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("single_pass", "true") + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = DataGenerator.generateDataFrame(spark, totalNum = 10 * 10 * 1000).cache + println(s"loading ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = { + println(s"start running queries for $tableName...") + var result: Array[Row] = null + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + print(s"running query ${index + 1}: $sqlText ") + val rt = time { + result = spark.sql(sqlText).collect() + } + println(s"=> $rt sec") + (rt, result) + } + } + + private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row], + table2: String, result2: Array[Row]): Unit = { + // check result size instead of result value, because some test case include + // aggregation on double column which will give different result since carbon + // records are sorted + if (result1.length != result2.length) { + val num = index + 1 + println(s"$table1 result for query $num: ") + println(s"""${result1.mkString(",")}""") + println(s"$table2 result for query $num: ") + println(s"""${result2.mkString(",")}""") + sys.error(s"result not matching for query $num (${queries(index).desc})") + } + } + + // run testcases and print comparison result + private def runTest(spark: SparkSession, table1: String, table2: String): Unit = { + val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val date = new Date + // run queries on parquet and carbon + val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1) + // do GC and sleep for some time before running next table + System.gc() + Thread.sleep(1000) + System.gc() + Thread.sleep(1000) + val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2) + // check result by comparing output from parquet and carbon + table1Result.zipWithIndex.foreach { case (result, index) => + printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2) + } + // print all response time in JSON format, so that it can be analyzed later + queries.zipWithIndex.foreach { case (query, index) => + println("{" + + s""""query":"${index + 1}", """ + + s""""$table1 time":${table1Result(index)._1}, """ + + s""""$table2 time":${table2Result(index)._1}, """ + + s""""fetched":${table1Result(index)._2.length}, """ + + s""""type":"${query.queryType}", """ + + s""""desc":"${query.desc}", """ + + s""""date": "${formatter.format(date)}" """ + + "}" + ) + } + } + + def main(args: Array[String]): Unit = { + CarbonProperties.getInstance() + .addProperty("carbon.enable.vector.reader", "true") + .addProperty("enable.unsafe.sort", "true") + .addProperty("carbon.blockletgroup.size.in.mb", "32") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + import org.apache.spark.sql.CarbonSession._ + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val spark = SparkSession + .builder() + .master("local") + .enableHiveSupport() + .config("spark.driver.host", "127.0.0.1") + .getOrCreateCarbonSession(storeLocation) + spark.sparkContext.setLogLevel("warn") + + val table1 = parquetTableName + val table2 = carbonTableName("3") + prepareTable(spark, table1, table2) + runTest(spark, table1, table2) + + CarbonUtil.deleteFoldersAndFiles(new File(table1)) + spark.sql(s"drop table if exists $table2") + spark.close() + } + + def time(code: => Unit): Double = { + val start = System.currentTimeMillis() + code + // return time in second + (System.currentTimeMillis() - start).toDouble / 1000 + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala deleted file mode 100644 index d27b1c4..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.File -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} -import org.apache.spark.sql.types._ - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} - -/** - * A query test case - * @param sqlText SQL statement - * @param queryType type of query: scan, filter, aggregate, topN - * @param desc description of the goal of this test case - */ -case class Query(sqlText: String, queryType: String, desc: String) - -// scalastyle:off println -object CompareTest { - - def parquetTableName: String = "comparetest_parquet" - def orcTableName: String = "comparetest_orc" - def carbonTableName(version: String): String = s"comparetest_carbonV$version" - - // Table schema: - // +-------------+-----------+-------------+-------------+------------+ - // | Column name | Data type | Cardinality | Column type | Dictionary | - // +-------------+-----------+-------------+-------------+------------+ - // | city | string | 8 | dimension | yes | - // +-------------+-----------+-------------+-------------+------------+ - // | country | string | 1103 | dimension | yes | - // +-------------+-----------+-------------+-------------+------------+ - // | planet | string | 10,007 | dimension | yes | - // +-------------+-----------+-------------+-------------+------------+ - // | id | string | 10,000,000 | dimension | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m1 | short | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m2 | int | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m3 | big int | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m4 | double | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m5 | decimal | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - private def generateDataFrame(spark: SparkSession): DataFrame = { - val rdd = spark.sparkContext - .parallelize(1 to 10 * 1000 * 1000, 4) - .map { x => - ("city" + x % 8, "country" + x % 1103, "planet" + x % 10007, "IDENTIFIER" + x.toString, - (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, - BigDecimal.valueOf(x.toDouble / 11)) - }.map { x => - Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) - } - - val schema = StructType( - Seq( - StructField("city", StringType, nullable = false), - StructField("country", StringType, nullable = false), - StructField("planet", StringType, nullable = false), - StructField("id", StringType, nullable = false), - StructField("m1", ShortType, nullable = false), - StructField("m2", IntegerType, nullable = false), - StructField("m3", LongType, nullable = false), - StructField("m4", DoubleType, nullable = false), - StructField("m5", DecimalType(30, 10), nullable = false) - ) - ) - - spark.createDataFrame(rdd, schema) - } - - // performance test queries, they are designed to test various data access type - val queries: Array[Query] = Array( - // =========================================================================== - // == FULL SCAN AGGREGATION == - // =========================================================================== - Query( - "select sum(m1) from $table", - "full scan", - "full scan query, 1 aggregate" - ), - Query( - "select sum(m1), sum(m2) from $table", - "full scan", - "full scan query, 2 aggregate" - ), - Query( - "select sum(m1), sum(m2), sum(m3) from $table", - "full scan", - "full scan query, 3 aggregate" - ), - Query( - "select sum(m1), sum(m2), sum(m3), sum(m4) from $table", - "full scan", - "full scan query, 4 aggregate" - ), - Query( - "select sum(m1), sum(m2), sum(m3), sum(m4), avg(m5) from $table", - "full scan", - "full scan query, 5 aggregate" - ), - Query( - "select count(distinct id) from $table", - "full scan", - "full scan and count distinct of high card column" - ), - Query( - "select count(distinct country) from $table", - "full scan", - "full scan and count distinct of medium card column" - ), - Query( - "select count(distinct city) from $table", - "full scan", - "full scan and count distinct of low card column" - ), - // =========================================================================== - // == FULL SCAN GROUP BY AGGREGATE == - // =========================================================================== - Query( - "select country, sum(m1) as metric from $table group by country order by metric", - "aggregate", - "group by on big data, on medium card column, medium result set," - ), - Query( - "select city, sum(m1) as metric from $table group by city order by metric", - "aggregate", - "group by on big data, on low card column, small result set," - ), - Query( - "select id, sum(m1) as metric from $table group by id order by metric desc limit 100", - "topN", - "top N on high card column" - ), - Query( - "select country,sum(m1) as metric from $table group by country order by metric desc limit 10", - "topN", - "top N on medium card column" - ), - Query( - "select city,sum(m1) as metric from $table group by city order by metric desc limit 10", - "topN", - "top N on low card column" - ), - // =========================================================================== - // == FILTER SCAN GROUP BY AGGREGATION == - // =========================================================================== - Query( - "select country, sum(m1) as metric from $table where city='city8' group by country " + - "order by metric", - "filter scan and aggregate", - "group by on large data, small result set" - ), - Query( - "select id, sum(m1) as metric from $table where planet='planet10' group by id " + - "order by metric", - "filter scan and aggregate", - "group by on medium data, large result set" - ), - Query( - "select city, sum(m1) as metric from $table where country='country12' group by city " + - "order by metric", - "filter scan and aggregate", - "group by on medium data, small result set" - ), - // =========================================================================== - // == FILTER SCAN == - // =========================================================================== - Query( - "select * from $table where city = 'city3' limit 10000", - "filter scan", - "filter on low card dimension, limit, medium result set, fetch all columns" - ), - Query( - "select * from $table where country = 'country9' ", - "filter scan", - "filter on low card dimension, medium result set, fetch all columns" - ), - Query( - "select * from $table where planet = 'planet101' ", - "filter scan", - "filter on medium card dimension, small result set, fetch all columns" - ), - Query( - "select * from $table where id = '408938' ", - "filter scan", - "filter on high card dimension" - ), - Query( - "select * from $table where country='country10000' ", - "filter scan", - "filter on low card dimension, not exist" - ), - Query( - "select * from $table where country='country2' and city ='city8' ", - "filter scan", - "filter on 2 dimensions, small result set, fetch all columns" - ), - Query( - "select * from $table where city='city1' and country='country2' and planet ='planet3' ", - "filter scan", - "filter on 3 dimensions, small result set, fetch all columns" - ), - Query( - "select * from $table where m1 < 3", - "filter scan", - "filter on measure, small result set, fetch all columns" - ), - Query( - "select * from $table where id like '1%' ", - "fuzzy filter scan", - "like filter, big result set" - ), - Query( - "select * from $table where id like '%111'", - "fuzzy filter scan", - "like filter, medium result set" - ), - Query( - "select * from $table where id like 'xyz%' ", - "fuzzy filter scan", - "like filter, full scan but not exist" - ) - ) - - private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) - : Double = time { - // partitioned by last 1 digit of id column - val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) - dfWithPartition.write - .partitionBy("partitionCol") - .mode(SaveMode.Overwrite) - .parquet(table) - spark.read.parquet(table).createOrReplaceTempView(table) - } - - private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { - // partitioned by last 1 digit of id column - input.write - .mode(SaveMode.Overwrite) - .orc(table) - spark.read.orc(table).createOrReplaceTempView(table) - } - - private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_DATA_FILE_VERSION, - "V3" - ) - spark.sql(s"drop table if exists $tableName") - time { - input.write - .format("carbondata") - .option("tableName", tableName) - .option("single_pass", "true") - .option("table_blocksize", "32") - .mode(SaveMode.Overwrite) - .save() - } - } - - // load data into parquet, carbonV2, carbonV3 - private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { - val df = generateDataFrame(spark).cache - println(s"loading ${df.count} records, schema: ${df.schema}") - val table1Time = if (table1.endsWith("parquet")) { - loadParquetTable(spark, df, table1) - } else if (table1.endsWith("orc")) { - loadOrcTable(spark, df, table1) - } else { - sys.error("invalid table: " + table1) - } - val table2Time = loadCarbonTable(spark, df, table2) - println(s"load completed, time: $table1Time, $table2Time") - df.unpersist() - } - - // Run all queries for the specified table - private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = { - println(s"start running queries for $tableName...") - var result: Array[Row] = null - queries.zipWithIndex.map { case (query, index) => - val sqlText = query.sqlText.replace("$table", tableName) - print(s"running query ${index + 1}: $sqlText ") - val rt = time { - result = spark.sql(sqlText).collect() - } - println(s"=> $rt sec") - (rt, result) - } - } - - private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row], - table2: String, result2: Array[Row]): Unit = { - // check result size instead of result value, because some test case include - // aggregation on double column which will give different result since carbon - // records are sorted - if (result1.length != result2.length) { - val num = index + 1 - println(s"$table1 result for query $num: ") - println(s"""${result1.mkString(",")}""") - println(s"$table2 result for query $num: ") - println(s"""${result2.mkString(",")}""") - sys.error(s"result not matching for query $num (${queries(index).desc})") - } - } - - // run testcases and print comparison result - private def runTest(spark: SparkSession, table1: String, table2: String): Unit = { - val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - val date = new Date - // run queries on parquet and carbon - val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1) - // do GC and sleep for some time before running next table - System.gc() - Thread.sleep(1000) - System.gc() - Thread.sleep(1000) - val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2) - // check result by comparing output from parquet and carbon - table1Result.zipWithIndex.foreach { case (result, index) => - printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2) - } - // print all response time in JSON format, so that it can be analyzed later - queries.zipWithIndex.foreach { case (query, index) => - println("{" + - s""""query":"${index + 1}", """ + - s""""$table1 time":${table1Result(index)._1}, """ + - s""""$table2 time":${table2Result(index)._1}, """ + - s""""fetched":${table1Result(index)._2.length}, """ + - s""""type":"${query.queryType}", """ + - s""""desc":"${query.desc}", """ + - s""""date": "${formatter.format(date)}" """ + - "}" - ) - } - } - - def main(args: Array[String]): Unit = { - CarbonProperties.getInstance() - .addProperty("carbon.enable.vector.reader", "true") - .addProperty("enable.unsafe.sort", "true") - .addProperty("carbon.blockletgroup.size.in.mb", "32") - .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") - import org.apache.spark.sql.CarbonSession._ - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - val spark = SparkSession - .builder() - .master("local") - .enableHiveSupport() - .config("spark.driver.host", "127.0.0.1") - .getOrCreateCarbonSession(storeLocation) - spark.sparkContext.setLogLevel("warn") - - val table1 = parquetTableName - val table2 = carbonTableName("3") - prepareTable(spark, table1, table2) - runTest(spark, table1, table2) - - CarbonUtil.deleteFoldersAndFiles(new File(table1)) - spark.sql(s"drop table if exists $table2") - spark.close() - } - - def time(code: => Unit): Double = { - val start = System.currentTimeMillis() - code - // return time in second - (System.currentTimeMillis() - start).toDouble / 1000 - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala deleted file mode 100644 index 09921cb..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.File -import java.util -import java.util.concurrent.{Callable, Executors, Future, TimeUnit} - -import scala.util.Random - -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} -import org.apache.spark.sql.types._ - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} - -// scalastyle:off println -object ConcurrencyTest { - - var totalNum = 100 * 1000 * 1000 - var ThreadNum = 16 - var TaskNum = 100 - var ResultIsEmpty = true - val cardinalityId = 10000 * 10000 - val cardinalityCity = 6 - - def parquetTableName: String = "comparetest_parquet" - - def orcTableName: String = "comparetest_orc" - - def carbonTableName(version: String): String = s"comparetest_carbonV$version" - - // Table schema: - // +-------------+-----------+-------------+-------------+------------+ - // | id | string | 100,000,000 | dimension | no | - // +-------------+-----------+-------------+-------------+------------+ - // | Column name | Data type | Cardinality | Column type | Dictionary | - // +-------------+-----------+-------------+-------------+------------+ - // | city | string | 6 | dimension | yes | - // +-------------+-----------+-------------+-------------+------------+ - // | country | string | 6 | dimension | yes | - // +-------------+-----------+-------------+-------------+------------+ - // | planet | string | 100,007 | dimension | yes | - // +-------------+-----------+-------------+-------------+------------+ - // | m1 | short | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m2 | int | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m3 | big int | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m4 | double | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - // | m5 | decimal | NA | measure | no | - // +-------------+-----------+-------------+-------------+------------+ - - private def generateDataFrame(spark: SparkSession): DataFrame = { - val rdd = spark.sparkContext - .parallelize(1 to totalNum, 4) - .map { x => - ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, - (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, - BigDecimal.valueOf(x.toDouble / 11)) - }.map { x => - Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) - } - - val schema = StructType( - Seq( - StructField("id", StringType, nullable = false), - StructField("city", StringType, nullable = false), - StructField("country", StringType, nullable = false), - StructField("planet", StringType, nullable = false), - StructField("m1", ShortType, nullable = false), - StructField("m2", IntegerType, nullable = false), - StructField("m3", LongType, nullable = false), - StructField("m4", DoubleType, nullable = false), - StructField("m5", DecimalType(30, 10), nullable = false) - ) - ) - - spark.createDataFrame(rdd, schema) - } - - // performance test queries, they are designed to test various data access type - val r = new Random() - val tmpId = r.nextInt(cardinalityId) % totalNum - val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) - val queries: Array[Query] = Array( - Query( - "select * from $table" + s" where id = '$tmpId' ", - "filter scan", - "filter on high card dimension" - ), - - Query( - "select id from $table" + s" where id = '$tmpId' ", - "filter scan", - "filter on high card dimension" - ), - - Query( - "select * from $table" + s" where city = '$tmpCity' ", - "filter scan", - "filter on high card dimension" - ), - - Query( - "select city from $table" + s" where city = '$tmpCity' ", - "filter scan", - "filter on high card dimension" - ), - - Query( - "select country, sum(m1) from $table group by country", - "aggregate", - "group by on big data, on medium card column, medium result set," - ), - - Query( - "select country, sum(m1) from $table" + - s" where id = '$tmpId' group by country", - "aggregate", - "group by on big data, on medium card column, medium result set," - ), - - Query( - "select t1.country, sum(t1.m1) from $table t1 join $table t2" - + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", - "aggregate", - "group by on big data, on medium card column, medium result set," - ) - , - Query( - "select t2.country, sum(t2.m1) " + - "from $table t1 join $table t2 join $table t3 " + - "join $table t4 join $table t5 join $table t6 join $table t7 " + - s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + - s"and t1.id=t5.id and t1.id=t6.id and " + - s"t1.id=t7.id " + - s" where t2.id = '$tmpId' " + - s" group by t2.country", - "aggregate", - "group by on big data, on medium card column, medium result set," - ) - ) - - private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) - : Double = time { - // partitioned by last 1 digit of id column - val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) - dfWithPartition.write - .partitionBy("partitionCol") - .mode(SaveMode.Overwrite) - .parquet(table) - spark.read.parquet(table).createOrReplaceTempView(table) - } - - private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { - // partitioned by last 1 digit of id column - input.write - .mode(SaveMode.Overwrite) - .orc(table) - spark.read.orc(table).createOrReplaceTempView(table) - } - - private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_DATA_FILE_VERSION, - "3" - ) - spark.sql(s"drop table if exists $tableName") - time { - input.write - .format("carbondata") - .option("tableName", tableName) - .option("tempCSV", "false") - .option("single_pass", "true") - .option("dictionary_exclude", "id") // id is high cardinality column - .option("table_blocksize", "32") - .mode(SaveMode.Overwrite) - .save() - } - } - - // load data into parquet, carbonV2, carbonV3 - def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { - val df = generateDataFrame(spark).cache - println(s"generating ${df.count} records, schema: ${df.schema}") - val table1Time = if (table1.endsWith("parquet")) { - loadParquetTable(spark, df, table1) - } else if (table1.endsWith("orc")) { - loadOrcTable(spark, df, table1) - } else { - sys.error("invalid table: " + table1) - } - val table2Time = loadCarbonTable(spark, df, table2) - println(s"load completed, time: $table1Time, $table2Time") - df.unpersist() - } - - // Run all queries for the specified table - private def runQueries(spark: SparkSession, tableName: String): Unit = { - println(s"start running queries for $tableName...") - val start = System.currentTimeMillis() - println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + - "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") - queries.zipWithIndex.map { case (query, index) => - val sqlText = query.sqlText.replace("$table", tableName) - - val executorService = Executors.newFixedThreadPool(ThreadNum) - val tasks = new java.util.ArrayList[Callable[Results]]() - - for (num <- 1 to TaskNum) { - tasks.add(new QueryTask(spark, sqlText)) - } - val results = executorService.invokeAll(tasks) - - val sql = s"query ${index + 1}: $sqlText " - printResult(results, sql) - executorService.shutdown() - executorService.awaitTermination(600, TimeUnit.SECONDS) - - val taskTime = (System.currentTimeMillis() - start).toDouble / 1000 - println("task time: " + taskTime.formatted("%.3f") + " s") - } - } - - def printResult(results: java.util.List[Future[Results]], sql: String = "") { - val timeArray = new Array[Double](results.size()) - val sqlResult = results.get(0).get().sqlResult - for (i <- 0 until results.size()) { - results.get(i).get() - } - for (i <- 0 until results.size()) { - timeArray(i) = results.get(i).get().time - } - val sortTimeArray = timeArray.sorted - - // the time of 90 percent sql are finished - val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 - // the time of 99 percent sql are finished - val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 - print("90%:" + sortTimeArray(time90).formatted("%.3f") + " s," + - "\t99%:" + sortTimeArray(time99).formatted("%.3f") + " s," + - "\tlast:" + sortTimeArray.last.formatted("%.3f") + " s," + - "\t" + sql + - "\taverage:" + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," + - "\t" + sqlResult.mkString(",") + "\t") - } - - case class Results(time: Double, sqlResult: Array[Row]) - - - class QueryTask(spark: SparkSession, query: String) - extends Callable[Results] with Serializable { - override def call(): Results = { - var result: Array[Row] = null - val rt = time { - result = spark.sql(query).head(1) - } - if (ResultIsEmpty) { - Results(rt, Array.empty[Row]) - } else { - Results(rt, result) - } - } - } - - // run testcases and print comparison result - def runTest(spark: SparkSession, table1: String, table2: String): Unit = { - // run queries on parquet and carbon - runQueries(spark, table1) - // do GC and sleep for some time before running next table - System.gc() - Thread.sleep(1000) - System.gc() - Thread.sleep(1000) - runQueries(spark, table2) - } - - def time(code: => Unit): Double = { - val start = System.currentTimeMillis() - code - // return time in second - (System.currentTimeMillis() - start).toDouble / 1000 - } - - def initParameters(arr: Array[String]): Unit = { - if (arr.length > 0) { - totalNum = arr(0).toInt - } - if (arr.length > 1) { - ThreadNum = arr(1).toInt - } - if (arr.length > 2) { - TaskNum = arr(2).toInt - } - if (arr.length > 3) { - ResultIsEmpty = if (arr(3).equalsIgnoreCase("true")) { - true - } else if (arr(3).equalsIgnoreCase("false")) { - true - } else { - throw new Exception("error parameter, should be true or false") - } - } - } - - def main(args: Array[String]): Unit = { - CarbonProperties.getInstance() - .addProperty("carbon.enable.vector.reader", "true") - .addProperty("enable.unsafe.sort", "true") - .addProperty("carbon.blockletgroup.size.in.mb", "32") - .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") - import org.apache.spark.sql.CarbonSession._ - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - - val spark = SparkSession - .builder() - .master("local[8]") - .enableHiveSupport() - .config("spark.driver.host", "127.0.0.1") - .getOrCreateCarbonSession(storeLocation) - spark.sparkContext.setLogLevel("warn") - - initParameters(args) - - val table1 = parquetTableName - val table2 = carbonTableName("3") - prepareTable(spark, table1, table2) - println("totalNum:" + totalNum + "\tThreadNum:" + ThreadNum + - "\tTaskNum:" + TaskNum + "\tResultIsEmpty:" + ResultIsEmpty) - runTest(spark, table1, table2) - - CarbonUtil.deleteFoldersAndFiles(new File(table1)) - spark.sql(s"drop table $table2") - spark.close() - } -} - -// scalastyle:on println
