Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178246812
--- Diff:
examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
---
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
+
+import scala.util.Random
+
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonVersionConstants}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+// scalastyle:off println
+/**
+ * Test concurrent query performance of CarbonData
+ *
+ * This benchmark will print out some information:
+ * 1.Environment information
+ * 2.Parameters information
+ * 3.concurrent query performance result using parquet format
+ * 4.concurrent query performance result using CarbonData format
+ *
+ * This benchmark default run in local model,
+ * user can change 'runInLocal' to false if want to run in cluster,
+ * user can change variables like:
+ *
+ * spark-submit \
+ --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \
+ --master yarn \
+ --deploy-mode client \
+ --driver-memory 16g \
+ --executor-cores 4g \
+ --executor-memory 24g \
+ --num-executors 3 \
+ concurrencyTest.jar \
+ totalNum threadNum taskNum resultIsEmpty runInLocal generateFile
deleteFile
+ * details in initParameters method of this benchmark
+ */
+object ConcurrentQueryBenchmark {
+
+ // generate number of data
+ var totalNum = 1 * 1000 * 1000
+ // the number of thread pool
+ var threadNum = 16
+ // task number of spark sql query
+ var taskNum = 100
+ // whether is result empty, if true then result is empty
+ var resultIsEmpty = true
+ // the store path of task details
+ var path: String = "/tmp/carbondata"
+ // whether run in local or cluster
+ var runInLocal = true
+ // whether generate new file
+ var generateFile = true
+ // whether delete file
+ var deleteFile = true
+
+ val cardinalityId = 100 * 1000 * 1000
+ val cardinalityCity = 6
+
+ def parquetTableName: String = "Num" + totalNum + "_" +
"comparetest_parquet"
+
+ def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc"
+
+ def carbonTableName(version: String): String =
+ "Num" + totalNum + "_" + s"comparetest_carbonV$version"
+
+ // Table schema:
+ // +-------------+-----------+-------------+-------------+------------+
+ // | Column name | Data type | Cardinality | Column type | Dictionary |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | id | string | 100,000,000 | dimension | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | city | string | 6 | dimension | yes |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | country | string | 6 | dimension | yes |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | planet | string | 10,007 | dimension | yes |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m1 | short | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m2 | int | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m3 | big int | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m4 | double | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m5 | decimal | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ /**
+ * generate DataFrame with above table schema
+ *
+ * @param spark SparkSession
+ * @return Dataframe of test data
+ */
+ private def generateDataFrame(spark: SparkSession): DataFrame = {
+ val rdd = spark.sparkContext
+ .parallelize(1 to totalNum, 4)
+ .map { x =>
+ ((x % 100000000).toString, "city" + x % 6, "country" + x % 6,
"planet" + x % 10007,
+ (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+ BigDecimal.valueOf(x.toDouble / 11))
+ }.map { x =>
+ Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+ }
+
+ val schema = StructType(
+ Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("city", StringType, nullable = false),
+ StructField("country", StringType, nullable = false),
+ StructField("planet", StringType, nullable = false),
+ StructField("m1", ShortType, nullable = false),
+ StructField("m2", IntegerType, nullable = false),
+ StructField("m3", LongType, nullable = false),
+ StructField("m4", DoubleType, nullable = false),
+ StructField("m5", DecimalType(30, 10), nullable = false)
+ )
+ )
+
+ val df = spark.createDataFrame(rdd, schema)
+ println(s"Start generate ${df.count} records, schema: ${df.schema}")
+ df
+ }
+
+ // performance test queries, they are designed to test various data
access type
+ val r = new Random()
+ lazy val tmpId = r.nextInt(cardinalityId) % totalNum
+ lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum)
+ // different query SQL
+ lazy val queries: Array[Query] = Array(
+ Query(
+ "select * from $table" + s" where id = '$tmpId' ",
+ "filter scan",
+ "filter on high card dimension"
+ )
+ , Query(
+ "select id from $table" + s" where id = '$tmpId' ",
+ "filter scan",
+ "filter on high card dimension"
+ ),
+ Query(
+ "select city from $table" + s" where id = '$tmpId' ",
+ "filter scan",
+ "filter on high card dimension"
+ ),
+ Query(
+ "select * from $table" + s" where city = '$tmpCity' limit 100",
+ "filter scan",
+ "filter on low card dimension, medium result set, fetch all columns"
+ ),
+
+ Query(
+ "select city from $table" + s" where city = '$tmpCity' limit 100",
+ "filter scan",
+ "filter on low card dimension"
+ ),
+
+ Query(
+ "select id from $table" + s" where city = '$tmpCity' limit 100",
+ "filter scan",
+ "filter on low card dimension"
+ ),
+
+ Query(
+ "select country, sum(m1) from $table group by country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ ),
+
+ Query(
+ "select country, sum(m1) from $table" +
+ s" where id = '$tmpId' group by country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ ),
+
+ Query(
+ "select t1.country, sum(t1.m1) from $table t1 join $table t2"
+ + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ ),
+
+ Query(
+ "select t2.country, sum(t2.m1) " +
+ "from $table t1 join $table t2 join $table t3 " +
+ "join $table t4 join $table t5 join $table t6 join $table t7 " +
+ s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " +
+ s"and t1.id=t5.id and t1.id=t6.id and " +
+ s"t1.id=t7.id " +
+ s" where t2.id = '$tmpId' " +
+ s" group by t2.country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ )
+ )
+
+ /**
+ * generate parquet format table
+ *
+ * @param spark SparkSession
+ * @param input DataFrame
+ * @param table table name
+ * @return the time of generating parquet format table
+ */
+ private def generateParquetTable(spark: SparkSession, input: DataFrame,
table: String)
+ : Double = time {
+ // partitioned by last 1 digit of id column
+ val dfWithPartition = input.withColumn("partitionCol",
input.col("id").%(10))
+ dfWithPartition.write
+ .partitionBy("partitionCol")
+ .mode(SaveMode.Overwrite)
+ .parquet(table)
+ }
+
+ /**
+ * generate ORC format table
+ *
+ * @param spark SparkSession
+ * @param input DataFrame
+ * @param table table name
+ * @return the time of generating ORC format table
+ */
+ private def generateOrcTable(spark: SparkSession, input: DataFrame,
table: String): Double =
+ time {
+ // partitioned by last 1 digit of id column
+ input.write
+ .mode(SaveMode.Overwrite)
+ .orc(table)
+ }
+
+ /**
+ * generate carbon format table
+ *
+ * @param spark SparkSession
+ * @param input DataFrame
+ * @param tableName table name
+ * @return the time of generating carbon format table
+ */
+ private def generateCarbonTable(spark: SparkSession, input: DataFrame,
tableName: String)
+ : Double = {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ "3"
+ )
+ spark.sql(s"drop table if exists $tableName")
+ time {
+ input.write
+ .format("carbondata")
+ .option("tableName", tableName)
+ .option("tempCSV", "false")
+ .option("single_pass", "true")
+ .option("dictionary_exclude", "id") // id is high cardinality
column
+ .option("table_blocksize", "32")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ }
+
+ /**
+ * load data into parquet, carbonV2, carbonV3
+ *
+ * @param spark SparkSession
+ * @param table1 table1 name
+ * @param table2 table2 name
+ */
+ def prepareTable(spark: SparkSession, table1: String, table2: String):
Unit = {
+ val df = if (generateFile) {
+ generateDataFrame(spark).cache
+ } else {
+ null
+ }
+
+ val table1Time = time {
+ if (table1.endsWith("parquet")) {
+ if (generateFile) {
+ generateParquetTable(spark, df, table1)
+ }
+ spark.read.parquet(table1).createOrReplaceTempView(table1)
+ } else if (table1.endsWith("orc")) {
+ if (generateFile) {
+ generateOrcTable(spark, df, table1)
+ spark.read.orc(table1).createOrReplaceTempView(table1)
+ }
+ } else {
+ sys.error("invalid table: " + table1)
+ }
+ }
+ println(s"$table1 completed, time: $table1Time sec")
+
+ val table2Time: Double = if (generateFile) {
+ generateCarbonTable(spark, df, table2)
+ } else {
+ 0.0
+ }
+ println(s"$table2 completed, time: $table2Time sec")
+ if (null != df) {
+ df.unpersist()
+ }
+ }
+
+ /**
+ * Run all queries for the specified table
+ *
+ * @param spark SparkSession
+ * @param tableName table name
+ */
+ private def runQueries(spark: SparkSession, tableName: String): Unit = {
+ println()
+ println(s"Start running queries for $tableName...")
+ println(
+ "Min: min time" +
+ "\tMax: max time" +
+ "\t90%: 90% time" +
+ "\t99%: 99% time" +
+ "\tAvg: average time" +
+ "\tCount: number of result" +
+ "\tQuery X: running different query sql" +
+ "\tResult: show it when ResultIsEmpty is false" +
+ "\tTotal execute time: total runtime")
+ queries.zipWithIndex.map { case (query, index) =>
+ val sqlText = query.sqlText.replace("$table", tableName)
+
+ val executorService = Executors.newFixedThreadPool(threadNum)
+ val tasks = new java.util.ArrayList[Callable[Results]]()
+ val tasksStartTime = System.nanoTime()
+ for (num <- 1 to taskNum) {
+ tasks.add(new QueryTask(spark, sqlText))
+ }
+ val results = executorService.invokeAll(tasks)
+
+ executorService.shutdown()
+ executorService.awaitTermination(600, TimeUnit.SECONDS)
+
+ val tasksEndTime = System.nanoTime()
+ val sql = s"Query ${index + 1}: $sqlText "
+ printResults(results, sql, tasksStartTime)
+ val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 *
1000 * 1000)
+ println("Total execute time: " + taskTime.formatted("%.3f") + " s")
+
+ val timeString = new
SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
+ writeResults(spark, results, sql, tasksStartTime,
+ path + s"/${tableName}_query${index + 1}_$timeString")
+ }
+ }
+
+ /**
+ * save the result for subsequent analysis
+ *
+ * @param spark SparkSession
+ * @param results Results
+ * @param sql query sql
+ * @param start tasks start time
+ * @param filePath write file path
+ */
+ def writeResults(
+ spark: SparkSession,
+ results: java.util.List[Future[Results]],
+ sql: String = "",
+ start: Long,
+ filePath: String): Unit = {
+ val timeArray = new Array[(Double, Double, Double)](results.size())
+ for (i <- 0 until results.size()) {
+ timeArray(i) =
+ ((results.get(i).get().startTime - start) / (1000.0 * 1000),
+ (results.get(i).get().endTime - start) / (1000.0 * 1000),
+ (results.get(i).get().endTime - results.get(i).get().startTime)
/ (1000.0 * 1000))
+ }
+ val timeArraySorted = timeArray.sortBy(x => x._1)
+ val timeArrayString = timeArraySorted.map { e =>
+ e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" +
e._3.formatted("%.3f")
+ }
+ val saveArray = Array(sql, "startTime, endTime, runtime, measure time
by the microsecond",
+ s"${timeArrayString.length}")
+ .union(timeArrayString)
+ val rdd = spark.sparkContext.parallelize(saveArray, 1)
+ rdd.saveAsTextFile(filePath)
+ }
+
+ /**
+ * print out results
+ *
+ * @param results Results
+ * @param sql query sql
+ * @param tasksStartTime tasks start time
+ */
+ def printResults(results: util.List[Future[Results]], sql: String = "",
tasksStartTime: Long) {
+ val timeArray = new Array[Double](results.size())
+ val sqlResult = results.get(0).get().sqlResult
+ for (i <- 0 until results.size()) {
+ results.get(i).get()
+ }
+ for (i <- 0 until results.size()) {
+ timeArray(i) = results.get(i).get().time
+ }
+ val sortTimeArray = timeArray.sorted
+
+ // the time of 90 percent sql are finished
+ val time90 = ((sortTimeArray.length) * 0.9).toInt - 1
+ // the time of 99 percent sql are finished
+ val time99 = ((sortTimeArray.length) * 0.99).toInt - 1
+ print(
+ "Min: " + sortTimeArray.head.formatted("%.3f") + " s," +
+ "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," +
+ "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," +
+ "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," +
+ "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") +
" s," +
+ "\t\tCount: " + results.get(0).get.count +
+ "\t\t\t\t" + sql +
+ "\t" + sqlResult.mkString(",") + "\t")
+ }
+
+ /**
+ * save result after finishing each task/thread
+ *
+ * @param time each task time of executing query sql and with
millis time
+ * @param sqlResult query sql result
+ * @param count result count
+ * @param startTime task start time with nano time
+ * @param endTime task end time with nano time
+ */
+ case class Results(
+ time: Double,
+ sqlResult: Array[Row],
+ count: Int,
+ startTime: Long,
+ endTime: Long)
+
+
+ class QueryTask(spark: SparkSession, query: String)
+ extends Callable[Results] with Serializable {
+ override def call(): Results = {
+ var result: Array[Row] = null
+ val startTime = System.nanoTime()
+ val rt = time {
+ result = spark.sql(query).collect()
--- End diff --
It is better not to collect the result, it will impact gc a lot.
I think doing a `count` is better.
---