show partition function
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c3bfc4ad Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c3bfc4ad Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c3bfc4ad Branch: refs/heads/datamap Commit: c3bfc4ad87dfc66582b31b54ead2109a8e760bdb Parents: c2b39b2 Author: mayun <[email protected]> Authored: Sun Jun 25 12:12:06 2017 +0800 Committer: Venkata Ramana G <[email protected]> Committed: Thu Jun 29 13:15:51 2017 +0530 ---------------------------------------------------------------------- .../examples/CarbonPartitionExample.scala | 147 +++++++++++++ .../examples/CarbonPartitionExample.scala | 49 ++++- .../partition/TestShowPartitions.scala | 216 +++++++++++++++++++ .../carbondata/spark/util/CommonUtil.scala | 47 ++++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 + .../spark/sql/CarbonCatalystOperators.scala | 9 +- .../org/apache/spark/sql/CarbonSqlParser.scala | 10 +- .../execution/command/carbonTableSchema.scala | 28 +++ .../spark/sql/hive/CarbonStrategies.scala | 16 ++ .../sql/execution/command/DDLStrategy.scala | 8 + .../execution/command/carbonTableSchema.scala | 28 +++ 11 files changed, 553 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala new file mode 100644 index 0000000..2f55189 --- /dev/null +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import scala.collection.mutable.LinkedHashMap + +import org.apache.spark.sql.AnalysisException + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.util.ExampleUtils + +object CarbonPartitionExample { + + def main(args: Array[String]) { + val cc = ExampleUtils.createCarbonContext("CarbonPartitionExample") + val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv" + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + // none partition table + cc.sql("DROP TABLE IF EXISTS t0") + cc.sql(""" + | CREATE TABLE IF NOT EXISTS t0 + | ( + | vin String, + | logdate Timestamp, + | phonenumber Int, + | country String, + | area String + | ) + | STORED BY 'carbondata' + """.stripMargin) + + // range partition + cc.sql("DROP TABLE IF EXISTS t1") + cc.sql(""" + | CREATE TABLE IF NOT EXISTS t1( + | vin STRING, + | phonenumber INT, + | country STRING, + | area STRING + | ) + | PARTITIONED BY (logdate TIMESTAMP) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01,2015/01/01,2016/01/01') + """.stripMargin) + + // hash partition + cc.sql(""" + | CREATE TABLE IF NOT EXISTS t3( + | logdate Timestamp, + | phonenumber Int, + | country String, + | area String + | ) + | PARTITIONED BY (vin String) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5') + """.stripMargin) + + // list partition + cc.sql("DROP TABLE IF EXISTS t5") + cc.sql(""" + | CREATE TABLE IF NOT EXISTS t5( + | vin String, + | logdate Timestamp, + | phonenumber Int, + | area String + | ) + | PARTITIONED BY (country string) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ') + """.stripMargin) + + cc.sql(s"DROP TABLE IF EXISTS partitionDB.t9") + cc.sql(s"DROP DATABASE IF EXISTS partitionDB") + cc.sql(s"CREATE DATABASE partitionDB") + cc.sql(s""" + | CREATE TABLE IF NOT EXISTS partitionDB.t9( + | logdate Timestamp, + | phonenumber Int, + | country String, + | area String + | ) + | PARTITIONED BY (vin String) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5') + """.stripMargin) + // hive partition table + cc.sql("DROP TABLE IF EXISTS t7") + cc.sql(""" + | create table t7(id int, name string) partitioned by (city string) + | row format delimited fields terminated by ',' + """.stripMargin) + cc.sql("alter table t7 add partition (city = 'Hangzhou')") + // hive partition table + cc.sql(s"DROP TABLE IF EXISTS hiveDB.t7") + cc.sql(s"CREATE DATABASE IF NOT EXISTS hiveDB") + cc.sql(""" + | create table hiveDB.t7(id int, name string) partitioned by (city string) + | row format delimited fields terminated by ',' + """.stripMargin) + cc.sql("alter table hiveDB.t7 add partition (city = 'Shanghai')") + // show partitions + try { + cc.sql("SHOW PARTITIONS t0").show() + } catch { + case ex: AnalysisException => print(ex.getMessage()) + } + cc.sql("SHOW PARTITIONS t1").show() + cc.sql("SHOW PARTITIONS t3").show() + cc.sql("SHOW PARTITIONS t5").show() + cc.sql("SHOW PARTITIONS t7").show() + cc.sql("use hiveDB").show() + cc.sql("SHOW PARTITIONS t7").show() + cc.sql("use default").show() + cc.sql("SHOW PARTITIONS partitionDB.t9").show() + + cc.sql("DROP TABLE IF EXISTS t0") + cc.sql("DROP TABLE IF EXISTS t1") + cc.sql("DROP TABLE IF EXISTS t3") + cc.sql("DROP TABLE IF EXISTS t5") + cc.sql("DROP TABLE IF EXISTS t7") + cc.sql(s"DROP TABLE IF EXISTS hiveDb.t7") + cc.sql(s"DROP TABLE IF EXISTS partitionDB.t9") + cc.sql(s"DROP DATABASE IF EXISTS partitionDB") + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala index 8a0479f..4cdde42 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala @@ -19,6 +19,8 @@ package org.apache.carbondata.examples import java.io.File +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -50,7 +52,6 @@ object CarbonPartitionExample { // none partition table spark.sql("DROP TABLE IF EXISTS t0") - spark.sql(""" | CREATE TABLE IF NOT EXISTS t0 | ( @@ -65,7 +66,6 @@ object CarbonPartitionExample { // range partition spark.sql("DROP TABLE IF EXISTS t1") - spark.sql(""" | CREATE TABLE IF NOT EXISTS t1 | ( @@ -82,7 +82,6 @@ object CarbonPartitionExample { // hash partition spark.sql("DROP TABLE IF EXISTS t3") - spark.sql(""" | CREATE TABLE IF NOT EXISTS t3 | ( @@ -98,7 +97,6 @@ object CarbonPartitionExample { // list partition spark.sql("DROP TABLE IF EXISTS t5") - spark.sql(""" | CREATE TABLE IF NOT EXISTS t5 | ( @@ -113,14 +111,57 @@ object CarbonPartitionExample { | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ') """.stripMargin) + // hive partition table + spark.sql("DROP TABLE IF EXISTS t7") + spark.sql(""" + | create table t7(id int, name string) partitioned by (city string) + | row format delimited fields terminated by ',' + """.stripMargin) + spark.sql("alter table t7 add partition (city = 'Hangzhou')") + + // not default db partition table + try { + spark.sql(s"DROP TABLE IF EXISTS partitionDB.t9") + } catch { + case ex: NoSuchDatabaseException => print(ex.getMessage()) + } + spark.sql(s"DROP DATABASE IF EXISTS partitionDB") + spark.sql(s"CREATE DATABASE partitionDB") + spark.sql(s""" + | CREATE TABLE IF NOT EXISTS partitionDB.t9( + | logdate Timestamp, + | phonenumber Int, + | country String, + | area String + | ) + | PARTITIONED BY (vin String) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5') + """.stripMargin) + // show tables spark.sql("SHOW TABLES").show() + // show partitions + try { + spark.sql("""SHOW PARTITIONS t0""").show() + } catch { + case ex: AnalysisException => print(ex.getMessage()) + } + spark.sql("""SHOW PARTITIONS t1""").show() + spark.sql("""SHOW PARTITIONS t3""").show() + spark.sql("""SHOW PARTITIONS t5""").show() + spark.sql("""SHOW PARTITIONS t7""").show() + spark.sql("""SHOW PARTITIONS partitionDB.t9""").show() + // drop table spark.sql("DROP TABLE IF EXISTS t0") spark.sql("DROP TABLE IF EXISTS t1") spark.sql("DROP TABLE IF EXISTS t3") spark.sql("DROP TABLE IF EXISTS t5") + spark.sql("DROP TABLE IF EXISTS t7") + spark.sql("DROP TABLE IF EXISTS partitionDB.t9") + spark.sql(s"DROP DATABASE IF EXISTS partitionDB") spark.close() http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala new file mode 100644 index 0000000..7b53964 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.partition + +import java.sql.Timestamp + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestShowPartition extends QueryTest with BeforeAndAfterAll { + override def beforeAll = { + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + + sql("drop table if exists notPartitionTable") + sql(""" + | CREATE TABLE notPartitionTable + | ( + | vin String, + | logdate Timestamp, + | phonenumber Int, + | country String, + | area String + | ) + | STORED BY 'carbondata' + """.stripMargin) + + sql("drop table if exists hashTable") + sql( + """ + | CREATE TABLE hashTable (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3') + """.stripMargin) + + sql("drop table if exists rangeTable") + sql( + """ + | CREATE TABLE rangeTable (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='01-01-2010, 01-01-2015') + """.stripMargin) + + sql("drop table if exists listTable") + sql( + """ + | CREATE TABLE listTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (workgroupcategory int) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='0, 1, (2, 3)') + """.stripMargin) + + sql(s"CREATE DATABASE if not exists partitionDB") + sql("drop table if exists partitionDB.hashTable") + sql("drop table if exists partitionDB.rangeTable") + sql("drop table if exists partitionDB.listTable") + sql( + """ + | CREATE TABLE partitionDB.hashTable (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3') + """.stripMargin) + sql( + """ + | CREATE TABLE partitionDB.rangeTable (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='01-01-2010, 01-01-2015') + """.stripMargin) + sql( + """ + | CREATE TABLE partitionDB.listTable (empno int, empname String, designation String, + | doj Timestamp,workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (workgroupcategory int) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='0, 1, (2, 3)') + """.stripMargin) + + sql("DROP TABLE IF EXISTS hiveTable") + sql(""" + | create table hiveTable(id int, name string) partitioned by (city string) + | row format delimited fields terminated by ',' + """.stripMargin) + sql("alter table hiveTable add partition (city = 'Hangzhou')") + + sql(s"CREATE DATABASE if not exists hiveDB") + sql("DROP TABLE IF EXISTS hiveDB.hiveTable") + sql(""" + | create table hiveDB.hiveTable(id int, name string) partitioned by (city string) + | row format delimited fields terminated by ',' + """.stripMargin) + sql("alter table hiveDB.hiveTable add partition (city = 'Shanghai')") + } + + test("show partition table: exception when show not partition table") { + val errorMessage = + intercept[AnalysisException] { sql("show partitions notPartitionTable").show() } + assert(errorMessage.getMessage.contains( + "SHOW PARTITIONS is not allowed on a table that is not partitioned: notpartitiontable")) + } + + test("show partition table: hash table") { + // EqualTo + checkAnswer(sql("show partitions hashTable"), Seq(Row("empno=HASH_NUMBER(3)"))) + + } + + test("show partition table: range partition") { + // EqualTo + checkAnswer(sql("show partitions rangeTable"), Seq(Row("doj=default"), + Row("doj<01-01-2010"), Row("01-01-2010<=doj<01-01-2015"))) + } + + test("show partition table: list partition") { + // EqualTo + checkAnswer(sql("show partitions listTable"), Seq(Row("workgroupcategory=default"), + Row("workgroupcategory=0"), Row("workgroupcategory=1"), Row("workgroupcategory=2, 3"))) + + } + test("show partition table: not default db") { + // EqualTo + checkAnswer(sql("show partitions partitionDB.hashTable"), Seq(Row("empno=HASH_NUMBER(3)"))) + // EqualTo + checkAnswer(sql("show partitions partitionDB.rangeTable"), Seq(Row("doj=default"), + Row("doj<01-01-2010"), Row("01-01-2010<=doj<01-01-2015"))) + // EqualTo + checkAnswer(sql("show partitions partitionDB.listTable"), Seq(Row("workgroupcategory=default"), + Row("workgroupcategory=0"), Row("workgroupcategory=1"), Row("workgroupcategory=2, 3"))) + + } + + test("show partition table: hive partition table") { + // EqualTo + checkAnswer(sql("show partitions hiveTable"), Seq(Row("city=Hangzhou"))) + sql("use hiveDB").show() + checkAnswer(sql("show partitions hiveTable"), Seq(Row("city=Shanghai"))) + sql("use default").show() + } + + override def afterAll = { + sql("drop table if exists notPartitionTable") + sql("drop table if exists hashTable") + sql("drop table if exists listTable") + sql("drop table if exists rangeTable") + sql("drop table if exists hiveTable") + try { + sql("drop table if exists partitionDB.hashTable") + + } catch { + case ex: NoSuchDatabaseException => print(ex.getMessage()) + } + try { + sql("drop table if exists partitionDB.rangeTable") + } catch { + case ex: NoSuchDatabaseException => print(ex.getMessage()) + } + try { + sql("drop table if exists partitionDB.listTable") + } catch { + case ex: NoSuchDatabaseException => print(ex.getMessage()) + } + try { + sql("drop table if exists hiveDB.hiveTable") + } catch { + case ex: NoSuchDatabaseException => print(ex.getMessage()) + } + sql("DROP DATABASE if exists partitionDB") + sql("DROP DATABASE if exists hiveDB") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index d3b6f8d..ac2e311 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -27,12 +27,20 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} +import org.apache.spark.sql.Row +import org.apache.spark.sql.RowFactory +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.types.StringType import org.apache.spark.util.FileUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.processing.csvload.CSVInputFormat @@ -544,4 +552,43 @@ object CommonUtil { } } + def getPartitionInfo(columnName: String, partitionType: PartitionType, + partitionInfo: PartitionInfo): Seq[Row] = { + var result = Seq.newBuilder[Row] + partitionType match { + case PartitionType.RANGE => + result.+=(RowFactory.create(columnName + "=default")) + var rangeInfo = partitionInfo.getRangeInfo + var size = rangeInfo.size() - 1 + for (index <- 0 to size) { + if (index == 0) { + result.+=(RowFactory.create(columnName + "<" + rangeInfo.get(index))) + } else { + result.+=(RowFactory.create(rangeInfo.get(index - 1) + "<=" + + columnName + "<" + rangeInfo.get(index))) + } + } + case PartitionType.RANGE_INTERVAL => + result.+=(RowFactory.create(columnName + "=")) + case PartitionType.LIST => + result.+=(RowFactory.create(columnName + "=default")) + var listInfo = partitionInfo.getListInfo + listInfo.asScala.foreach { + f => + result.+=(RowFactory.create(columnName + "=" + + f.toArray().mkString(", "))) + } + case PartitionType.HASH => + var hashNumber = partitionInfo.getNumPartitions + result.+=(RowFactory.create(columnName + "=HASH_NUMBER(" + hashNumber.toString() + ")")) + case others => + result.+=(RowFactory.create(columnName + "=")) + } + result.result() + } + + def partitionInfoOutput: Seq[Attribute] = Seq( + AttributeReference("partition", StringType, nullable = false, + new MetadataBuilder().putString("comment", "partitions info").build())() + ) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 383d308..c565c31 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -112,6 +112,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT") protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA") protected val PARTITIONER = carbonKeyWord("PARTITIONER") + protected val PARTITIONS = carbonKeyWord("PARTITIONS") protected val QUOTECHAR = carbonKeyWord("QUOTECHAR") protected val RELATION = carbonKeyWord("RELATION") protected val SCHEMA = carbonKeyWord("SCHEMA") http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index c1a0dc2..024c54b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _} +import org.apache.spark.sql.catalyst.plans.logical.{ UnaryNode, _ } import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.types._ import org.apache.carbondata.spark.CarbonAliasDecoderRelation +import org.apache.carbondata.spark.util.CommonUtil /** * Top command @@ -137,6 +138,12 @@ case class DeleteRecords( override def output: Seq[AttributeReference] = Seq.empty } +case class ShowPartitions( + table: TableIdentifier) extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq.empty + override def output: Seq[Attribute] = CommonUtil.partitionInfoOutput +} + /** * A logical plan representing insertion into Hive table. * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala index f12e54b..a664104 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala @@ -61,7 +61,8 @@ class CarbonSqlParser() extends CarbonDDLSqlParser { protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase | loadManagement | describeTable | - showLoads | alterTable | updateTable | deleteRecords | useDatabase | createTable + showPartitions | showLoads | alterTable | updateTable | deleteRecords | useDatabase | + createTable protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew @@ -487,6 +488,13 @@ class CarbonSqlParser() extends CarbonDDLSqlParser { } UpdateTable(relation, columns, selectStmt, where) } + protected lazy val showPartitions: Parser[LogicalPlan] = + (SHOW ~> PARTITIONS ~> table) <~ opt(";") ^^ { + case table => + val tableName = getTableName(table.tableIdentifier) + val alias = table.alias.getOrElse("") + ShowPartitions(table.tableIdentifier) + } private def splitQuery(query: String): (String, String) = { val stack = scala.collection.mutable.Stack[Char]() http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index ba22c3c..3477abb 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -73,6 +73,34 @@ object Checker { } /** + * Command for show table partitions Command + * + * @param tableIdentifier + */ +private[sql] case class ShowCarbonPartitionsCommand( + tableIdentifier: TableIdentifier) extends RunnableCommand { + val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) + override val output = CommonUtil.partitionInfoOutput + override def run(sqlContext: SQLContext): Seq[Row] = { + val relation = CarbonEnv.get.carbonMetastore + .lookupRelation1(tableIdentifier)(sqlContext). + asInstanceOf[CarbonRelation] + val carbonTable = relation.tableMeta.carbonTable + var tableName = carbonTable.getFactTableName + var partitionInfo = carbonTable.getPartitionInfo( + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName) + if (partitionInfo == null) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName") + } + var partitionType = partitionInfo.getPartitionType + var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName + LOGGER.info("partition column name:" + columnName) + CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) + } +} + +/** * Command for the compaction in alter table command * * @param alterTableModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index f0cd33b..aba39f7 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -316,6 +316,22 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } else { ExecutedCommand(HiveNativeCommand(sql)) :: Nil } + case ShowPartitions(t) => + val isCarbonTable = CarbonEnv.get.carbonMetastore + .tableExists(t)(sqlContext) + if (isCarbonTable) { + ExecutedCommand(ShowCarbonPartitionsCommand(t)) :: Nil + } else { + var tableName = t.table + var database = t.database + var sql: String = null + if (database.isEmpty) { + sql = s"show partitions $tableName" + } else { + sql = s"show partitions $database.$tableName" + } + ExecutedCommand(HiveNativeCommand(sql)) :: Nil + } case _ => Nil } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 7d0215f..6087736 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -115,6 +115,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil + case ShowPartitionsCommand(t, cols) => + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(t)(sparkSession) + if (isCarbonTable) { + ExecutedCommandExec(ShowCarbonPartitionsCommand(t)) :: Nil + } else { + ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil + } case set@SetCommand(kv) => ExecutedCommandExec(CarbonSetCommand(set)) :: Nil case reset@ResetCommand => http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index f9f556d..8fe4bd7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -73,6 +73,34 @@ object Checker { } /** + * Command for show table partitions Command + * + * @param tableIdentifier + */ +private[sql] case class ShowCarbonPartitionsCommand( + tableIdentifier: TableIdentifier) extends RunnableCommand { + val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) + override val output = CommonUtil.partitionInfoOutput + override def run(sparkSession: SparkSession): Seq[Row] = { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(tableIdentifier)(sparkSession). + asInstanceOf[CarbonRelation] + val carbonTable = relation.tableMeta.carbonTable + var tableName = carbonTable.getFactTableName + var partitionInfo = carbonTable.getPartitionInfo( + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName) + if (partitionInfo == null) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName") + } + var partitionType = partitionInfo.getPartitionType + var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName + LOGGER.info("partition column name:" + columnName) + CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) + } +} + +/** * Command for the compaction in alter table command * * @param alterTableModel
