Repository: carbondata Updated Branches: refs/heads/master 5a82232a8 -> 90aeaa65c
[CARBONDATA-2040] Add standard partiton example and optimize partition test case Add standard partiton example and optimize partition test cases. This closes #1817 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/90aeaa65 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/90aeaa65 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/90aeaa65 Branch: refs/heads/master Commit: 90aeaa65c0ef281795fe5618ee82ff5182eb66f4 Parents: 5a82232 Author: chenliang613 <[email protected]> Authored: Wed Jan 17 13:03:56 2018 +0800 Committer: manishgupta88 <[email protected]> Committed: Mon Jan 29 11:30:31 2018 +0530 ---------------------------------------------------------------------- .../examples/StandardPartitionExample.scala | 130 +++++++++++++++++++ ...andardPartitionTableCompactionTestCase.scala | 4 +- .../StandardPartitionTableQueryTestCase.scala | 3 + 3 files changed, 135 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/90aeaa65/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala new file mode 100644 index 0000000..5a8e3f5 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StandardPartitionExample.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.spark.sql.{SaveMode, SparkSession} + +/** + * This example is dynamic partition, same as spark partition. + */ + +object StandardPartitionExample { + + def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val testData = s"$rootPath/integration/spark-common-test/src/test/resources/partition_data.csv" + val spark = ExampleUtils.createCarbonSession("StandardPartitionExample") + + spark.sparkContext.setLogLevel("ERROR") + + // 1. simple usage for StandardPartition + spark.sql("DROP TABLE IF EXISTS partitiontable0") + spark.sql(""" + | CREATE TABLE partitiontable0 + | (id Int, + | vin String, + | phonenumber Long, + | area String, + | salary Int) + | PARTITIONED BY (country String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + spark.sql(s""" + LOAD DATA LOCAL INPATH '$testData' into table partitiontable0 + """) + + spark.sql( + s""" + | SELECT * + | FROM partitiontable0 + """.stripMargin).show() + + // 2.compare the performance : with partition VS without partition + + // build test data, if set the data is larger than 100M, it will take 10+ mins. + import scala.util.Random + import spark.implicits._ + val r = new Random() + val df = spark.sparkContext.parallelize(1 to 10 * 1000 * 1000) + .map(x => ("No." + r.nextInt(100000), "country" + x % 8, "city" + x % 50, x % 300)) + .toDF("ID", "country", "city", "population") + + // Create table without partition + df.write.format("carbondata") + .option("tableName", "withoutpartition") + .option("compress", "true") + .mode(SaveMode.Overwrite).save() + + // Create table with partition + spark.sql("DROP TABLE IF EXISTS withpartition") + spark.sql(""" + | CREATE TABLE withpartition + | (ID String, + | city String, + | population Int) + | PARTITIONED BY (country String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + df.write.format("carbondata") + .option("tableName", "withpartition") + .option("compress", "true") + .mode(SaveMode.Overwrite).save() + + // define time function + def time(code: => Unit): Double = { + val start = System.currentTimeMillis() + code + // return time in second + (System.currentTimeMillis() - start).toDouble / 1000 + } + + val time_without_partition = time { + spark.sql( + s""" + | SELECT * + | FROM withoutpartition WHERE country='country3' + """.stripMargin).count() + } + + val time_with_partition = time { + spark.sql( + s""" + | SELECT * + | FROM withpartition WHERE country='country3' + """.stripMargin).count() + } + // scalastyle:off + println("time of without partition:" + time_without_partition.toString) + println("time of with partition:" + time_with_partition.toString) + // scalastyle:on + + spark.sql("DROP TABLE IF EXISTS partitiontable0") + spark.sql("DROP TABLE IF EXISTS withoutpartition") + spark.sql("DROP TABLE IF EXISTS withpartition") + + spark.close() + + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/90aeaa65/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala index 295922d..22ebd80 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala @@ -189,8 +189,8 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA sql(s"""insert into staticpartitioncompaction PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""") } sql("CLEAN FILES FOR TABLE staticpartitioncompaction").show() - var segments = sql("SHOW SEGMENTS FOR TABLE staticpartitioncompaction") - var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + val segments = sql("SHOW SEGMENTS FOR TABLE staticpartitioncompaction") + val segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } assert(segmentSequenceIds.size==1) assert(segmentSequenceIds.contains("0.1")) CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/90aeaa65/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index db873d1..d1ef94c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -237,6 +237,9 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA | STORED BY 'org.apache.carbondata.format' """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionload") + verifyPartitionInfo(frame, Seq("empname=ravi")) + } test("Creation of partition table should fail if the colname in table schema and partition column is same even if both are case sensitive"){
