Repository: carbondata Updated Branches: refs/heads/master d9c3b4837 -> 26d2f1c83
modify compare test fix fix style change table Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/327b307f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/327b307f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/327b307f Branch: refs/heads/master Commit: 327b307fdddc7b0fffe5b86049d1a2d08dfb182a Parents: d9c3b48 Author: jackylk <jacky.li...@huawei.com> Authored: Mon Jul 3 21:54:39 2017 +0800 Committer: chenliang613 <chenliang...@apache.org> Committed: Wed Jul 5 21:34:56 2017 +0800 ---------------------------------------------------------------------- .../carbondata/examples/CompareTest.scala | 103 ++++++++++++------- 1 file changed, 67 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/327b307f/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala index ee53c31..ffc4b22 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala @@ -41,6 +41,7 @@ case class Query(sqlText: String, queryType: String, desc: String) object CompareTest { def parquetTableName: String = "comparetest_parquet" + def orcTableName: String = "comparetest_orc" def carbonTableName(version: String): String = s"comparetest_carbonV$version" // Table schema: @@ -63,7 +64,7 @@ object CompareTest { // +-------------+-----------+-------------+-------------+------------+ // | m4 | double | NA | measure | no | // +-------------+-----------+-------------+-------------+------------+ - // | m5 | double | NA | measure | no | + // | m5 | decimal | NA | measure | no | // +-------------+-----------+-------------+-------------+------------+ private def generateDataFrame(spark: SparkSession): DataFrame = { val r = new Random() @@ -71,10 +72,11 @@ object CompareTest { .parallelize(1 to 10 * 1000 * 1000, 4) .map { x => ("city" + x % 8, "country" + x % 1103, "planet" + x % 10007, "IDENTIFIER" + x.toString, - (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, x.toDouble / 11) + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) }.map { x => - Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) - } + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } val schema = StructType( Seq( @@ -86,7 +88,7 @@ object CompareTest { StructField("m2", IntegerType, nullable = false), StructField("m3", LongType, nullable = false), StructField("m4", DoubleType, nullable = false), - StructField("m5", DoubleType, nullable = false) + StructField("m5", DecimalType(30, 10), nullable = false) ) ) @@ -142,12 +144,12 @@ object CompareTest { // == FULL SCAN GROUP BY AGGREGATE == // =========================================================================== Query( - "select country, sum(m1) from $table group by country", + "select country, sum(m1) as metric from $table group by country order by metric", "aggregate", "group by on big data, on medium card column, medium result set," ), Query( - "select city, sum(m1) from $table group by city", + "select city, sum(m1) as metric from $table group by city order by metric", "aggregate", "group by on big data, on low card column, small result set," ), @@ -170,17 +172,20 @@ object CompareTest { // == FILTER SCAN GROUP BY AGGREGATION == // =========================================================================== Query( - "select country, sum(m1) from $table where city='city8' group by country ", + "select country, sum(m1) as metric from $table where city='city8' group by country " + + "order by metric", "filter scan and aggregate", "group by on large data, small result set" ), Query( - "select id, sum(m1) from $table where planet='planet10' group by id", + "select id, sum(m1) as metric from $table where planet='planet10' group by id " + + "order by metric", "filter scan and aggregate", "group by on medium data, large result set" ), Query( - "select city, sum(m1) from $table where country='country12' group by city ", + "select city, sum(m1) as metric from $table where country='country12' group by city " + + "order by metric", "filter scan and aggregate", "group by on medium data, small result set" ), @@ -244,25 +249,35 @@ object CompareTest { ) ) - private def loadParquetTable(spark: SparkSession, input: DataFrame): Double = time { + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { // partitioned by last 1 digit of id column val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) dfWithPartition.write .partitionBy("partitionCol") .mode(SaveMode.Overwrite) - .parquet(parquetTableName) + .parquet(table) + spark.read.parquet(table).registerTempTable(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).registerTempTable(table) } - private def loadCarbonTable(spark: SparkSession, input: DataFrame, version: String): Double = { + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { CarbonProperties.getInstance().addProperty( CarbonCommonConstants.CARBON_DATA_FILE_VERSION, - version + "3" ) - spark.sql(s"drop table if exists ${carbonTableName(version)}") + spark.sql(s"drop table if exists $tableName") time { input.write .format("carbondata") - .option("tableName", carbonTableName(version)) + .option("tableName", tableName) .option("tempCSV", "false") .option("single_pass", "true") .option("dictionary_exclude", "id") // id is high cardinality column @@ -273,18 +288,23 @@ object CompareTest { } // load data into parquet, carbonV2, carbonV3 - private def prepareTable(spark: SparkSession): Unit = { + private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { val df = generateDataFrame(spark).cache println(s"loading ${df.count} records, schema: ${df.schema}") - val loadParquetTime = loadParquetTable(spark, df) - val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3") - println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") df.unpersist() - spark.read.parquet(parquetTableName).registerTempTable(parquetTableName) } // Run all queries for the specified table - private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Int)] = { + private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = { println(s"start running queries for $tableName...") var result: Array[Row] = null queries.zipWithIndex.map { case (query, index) => @@ -294,37 +314,46 @@ object CompareTest { result = spark.sql(sqlText).collect() } println(s"=> $rt sec") - (rt, result.length) + (rt, result) + } + } + + private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row], + table2: String, result2: Array[Row]): Unit = { + if (!result1.sameElements(result2)) { + val num = index + 1 + println(s"$table1 result for query $num: ") + println(s"""${result1.mkString(",")}""") + println(s"$table2 result for query $num: ") + println(s"""${result2.mkString(",")}""") + sys.error(s"result not matching for query $num (${queries(index).desc})") } } // run testcases and print comparison result - private def runTest(spark: SparkSession): Unit = { + private def runTest(spark: SparkSession, table1: String, table2: String): Unit = { val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val date = new Date val timestamp = date.getTime // run queries on parquet and carbon - val parquetResult: Array[(Double, Int)] = runQueries(spark, parquetTableName) + val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1) // do GC and sleep for some time before running next table System.gc() Thread.sleep(1000) System.gc() Thread.sleep(1000) - val carbonResult: Array[(Double, Int)] = runQueries(spark, carbonTableName("3")) + val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2) // check result by comparing output from parquet and carbon - parquetResult.zipWithIndex.foreach { case (result, index) => - if (result._2 != carbonResult(index)._2) { - sys.error(s"result not matching for query ${index + 1} (${queries(index).desc}): " + - s"${result._2} and ${carbonResult(index)._2}") - } + table1Result.zipWithIndex.foreach { case (result, index) => + printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2) } // print all response time in JSON format, so that it can be analyzed later queries.zipWithIndex.foreach { case (query, index) => println("{" + s""""query":"${index + 1}", """ + - s""""parquetTime":${parquetResult(index)._1}, """ + - s""""carbonTime":${carbonResult(index)._1}, """ + - s""""fetched":${parquetResult(index)._2}, """ + + s""""$table1 time":${table1Result(index)._1}, """ + + s""""$table2 time":${table2Result(index)._1}, """ + + s""""fetched":${table1Result(index)._2.length}, """ + s""""type":"${query.queryType}", """ + s""""desc":"${query.desc}", """ + s""""date": "${formatter.format(date)}" """ + @@ -351,8 +380,10 @@ object CompareTest { .getOrCreateCarbonSession(storeLocation) spark.sparkContext.setLogLevel("warn") - prepareTable(spark) - runTest(spark) + val table1 = parquetTableName + val table2 = carbonTableName("3") + prepareTable(spark, table1, table2) + runTest(spark, table1, table2) spark.close() }