Repository: carbondata Updated Branches: refs/heads/master 110f9b21b -> fa7077bc4
[CARBONDATA-1382] Add more test cases for bucket feature Add more test cases for bucket feature 2.Optimize some test cases's data load method. This closes #1260 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa7077bc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa7077bc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa7077bc Branch: refs/heads/master Commit: fa7077bc4ac2e5fdffd24f6a59ffa686e5765922 Parents: 110f9b2 Author: chenliang613 <[email protected]> Authored: Wed Aug 16 10:38:06 2017 +0800 Committer: Ravindra Pesala <[email protected]> Committed: Sun Aug 20 21:35:58 2017 +0530 ---------------------------------------------------------------------- .../src/test/resources/testdatafileslist.txt | 3 +- .../sdv/generated/BucketingTestCase.scala | 189 +++++++++++++++++++ .../cluster/sdv/suite/SDVSuites.scala | 1 + .../sql/test/ResourceRegisterAndCopier.scala | 4 +- .../bucketing/TableBucketingTestCase.scala | 88 +++------ .../vectorreader/VectorReaderTestCase.scala | 13 +- 6 files changed, 221 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa7077bc/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt b/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt index faec837..924756e 100644 --- a/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt +++ b/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt @@ -227,4 +227,5 @@ Data/badrecords_3.csv Data/badrecords_4.csv Data/badrecords_5.csv Data/emptyLoad.csv -Data/splchar.csv \ No newline at end of file +Data/splchar.csv +source.csv \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa7077bc/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala new file mode 100644 index 0000000..78f8945 --- /dev/null +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.spark.sql.common.util._ +import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.scalatest.BeforeAndAfterAll + +class BucketingTestCase extends QueryTest with BeforeAndAfterAll { + + var threshold: Int = _ + var timeformat = CarbonProperties.getInstance() + .getProperty("carbon.timestamp.format", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + + override def beforeAll { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt + sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1") + sql("DROP TABLE IF EXISTS bucket_table") + } + + test("test exception if bucketcolumns be measure column") { + try { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='ID')") + assert(false) + } + catch { + case _ => assert(true) + } + } + + test("test exception if bucketcolumns be complex data type column") { + try { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (Id int, number double, name string, " + + "gamePoint array<double>, mac struct<num:double>) STORED BY 'carbondata' TBLPROPERTIES" + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='gamePoint')") + assert(false) + } + catch { + case _ => assert(true) + } + } + + test("test Int column as bucketcolumns through dictionary_include") { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('DICTIONARY_INCLUDE'='ID','BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='ID')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") + val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_bucket_table") + if (table != null && table.getBucketingInfo("bucket_table") != null) { + assert(true) + } else { + assert(false, "Bucketing info does not exist") + } + } + + test("test multi columns as bucketcolumns") { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name,phonetype')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") + val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_bucket_table") + if (table != null && table.getBucketingInfo("bucket_table") != null) { + assert(true) + } else { + assert(false, "Bucketing info does not exist") + } + } + + test("test multi columns as bucketcolumns with bucket join") { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country,name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") + + val plan = sql( + """ + |select t1.*, t2.* + |from bucket_table t1, bucket_table t2 + |where t1.country = t2.country and t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket column join") + } + + test("test non bucket column join") { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") + + val plan = sql( + """ + |select t1.*, t2.* + |from bucket_table t1, bucket_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(shuffleExists, "shuffle should exist on non-bucket column join") + } + + test("test bucketcolumns through multi data loading plus compaction") { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + val numOfLoad = 10 + for (j <- 0 until numOfLoad) { + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") + } + sql("ALTER TABLE bucket_table COMPACT 'MAJOR'") + + val plan = sql( + """ + |select t1.*, t2.* + |from bucket_table t1, bucket_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } + + test("drop non-bucket column, test bucket column join") { + sql("DROP TABLE IF EXISTS bucket_table") + sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") + + sql("ALTER TABLE bucket_table DROP COLUMNS (ID,country)") + + val plan = sql( + """ + |select t1.*, t2.* + |from bucket_table t1, bucket_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } + + override def afterAll { + sql("DROP TABLE IF EXISTS bucket_table") + sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeformat) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa7077bc/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala index b9908ea..6bf71d0 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala @@ -113,6 +113,7 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll { val suites = new AlterTableTestCase :: new BatchSortLoad2TestCase :: + new BucketingTestCase :: new InvertedindexTestCase :: new OffheapQuery1TestCase :: new OffheapQuery2TestCase :: http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa7077bc/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala index 87a60c5..b99884d 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala @@ -59,7 +59,9 @@ object ResourceRegisterAndCopier { val rsFile = FileFactory.getCarbonFile(hdfsDataPath, fileType) if (!rsFile.exists()) { val target = resourcePath + "/" + file - new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/"))).mkdirs() + if (file.lastIndexOf("/") > -1) { + new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/"))).mkdirs() + } downloadFile(link, file, target) // copy it copyLocalFile(hdfsDataPath, target) http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa7077bc/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 42d3e36..790f3f9 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -18,7 +18,6 @@ package org.apache.spark.carbondata.bucketing import org.apache.spark.sql.common.util.Spark2QueryTest -import org.apache.spark.sql.execution.command.LoadTable import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.scalatest.BeforeAndAfterAll @@ -38,7 +37,6 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1") - sql("DROP TABLE IF EXISTS t3") sql("DROP TABLE IF EXISTS t4") sql("DROP TABLE IF EXISTS t5") sql("DROP TABLE IF EXISTS t6") @@ -49,16 +47,10 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test create table with buckets") { - sql( - """ - CREATE TABLE t4 - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4") - """) - LoadTable(Some("default"), "t4", s"$resourcesPath/source.csv", Nil, - Map(), false).run(sqlContext.sparkSession) + sql("CREATE TABLE t4 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4") val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4") if (table != null && table.getBucketingInfo("t4") != null) { assert(true) @@ -69,16 +61,10 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { test("test create table with buckets unsafe") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") - sql( - """ - CREATE TABLE t10 - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t10") - """) - LoadTable(Some("default"), "t10", s"$resourcesPath/source.csv", Nil, - Map(("use_kettle", "false")), false).run(sqlContext.sparkSession) + sql("CREATE TABLE t10 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t10") CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t10") if (table != null && table.getBucketingInfo("t10") != null) { @@ -106,17 +92,9 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test create table with no bucket join of carbon tables") { - sql( - """ - CREATE TABLE t5 - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("tableName"="t5") - """) - LoadTable(Some("default"), "t5", s"$resourcesPath/source.csv", Nil, - Map(), false).run(sqlContext.sparkSession) - + sql("CREATE TABLE t5 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata'") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t5") val plan = sql( """ |select t1.*, t2.* @@ -131,17 +109,10 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test create table with bucket join of carbon tables") { - sql( - """ - CREATE TABLE t6 - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6") - """) - LoadTable(Some("default"), "t6", s"$resourcesPath/source.csv", Nil, - Map(), false).run(sqlContext.sparkSession) - + sql("CREATE TABLE t6 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6") val plan = sql( """ |select t1.*, t2.* @@ -156,16 +127,10 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test create table with bucket join of carbon table and parquet table") { - sql( - """ - CREATE TABLE t7 - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7") - """) - LoadTable(Some("default"), "t7", s"$resourcesPath/source.csv", Nil, - Map(), false).run(sqlContext.sparkSession) + sql("CREATE TABLE t7 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t7") sql("DROP TABLE IF EXISTS bucketed_parquet_table") sql("select * from t7").write @@ -187,16 +152,10 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test create table with bucket join of carbon table and non bucket parquet table") { - sql( - """ - CREATE TABLE t8 - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8") - """) - LoadTable(Some("default"), "t8", s"$resourcesPath/source.csv", Nil, - Map(), false).run(sqlContext.sparkSession) + sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8") sql("DROP TABLE IF EXISTS parquet_table") sql("select * from t8").write @@ -231,7 +190,6 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { } override def afterAll { - sql("DROP TABLE IF EXISTS t3") sql("DROP TABLE IF EXISTS t4") sql("DROP TABLE IF EXISTS t5") sql("DROP TABLE IF EXISTS t6") http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa7077bc/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala index cafd520..db62eb5 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala @@ -35,16 +35,9 @@ class VectorReaderTestCase extends Spark2QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") - sql( - """ - CREATE TABLE vectorreader - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - USING org.apache.spark.sql.CarbonSource - OPTIONS("tableName"="vectorreader") - """) - LoadTable(Some("default"), "vectorreader", s"$resourcesPath/source.csv", Nil, - Map(), false).run(sqlContext.sparkSession) + sql("CREATE TABLE vectorreader (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED BY 'carbondata'") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE vectorreader") } test("test vector reader") {
