Repository: carbondata Updated Branches: refs/heads/master a89587e70 -> 577a8b0d5
[CARBONDATA-1924][PARTITION] Restrict streaming on Partitioned table and support PARTITION syntax to the LOAD TABLE command. This closes #1699 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/577a8b0d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/577a8b0d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/577a8b0d Branch: refs/heads/master Commit: 577a8b0d59d760a35626779fe45dd4d91c6b3328 Parents: a89587e Author: ravipesala <[email protected]> Authored: Tue Dec 19 20:01:30 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Dec 21 18:46:50 2017 +0530 ---------------------------------------------------------------------- .../StandardPartitionTableLoadingTestCase.scala | 57 ++++++++++++++++++++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 6 +++ .../management/CarbonLoadDataCommand.scala | 10 +++- .../sql/parser/CarbonSpark2SqlParser.scala | 22 +++++--- .../spark/sql/parser/CarbonSparkSqlParser.scala | 3 ++ 5 files changed, 89 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 11e95d8..b7010e5 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.spark.testsuite.standardpartition import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -200,6 +201,59 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte validateDataFiles("default_singlepasspartitionone", "0", 1) } + test("data loading for partition table for one static partition column with load syntax") { + sql( + """ + | CREATE TABLE loadstaticpartitionone (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitionone PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql("select distinct empno from loadstaticpartitionone"), Seq(Row(1))) + } + + test("overwrite partition table for one static partition column with load syntax") { + sql( + """ + | CREATE TABLE loadstaticpartitiononeoverwrite (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val rows = sql("select count(*) from loadstaticpartitiononeoverwrite").collect() + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql("select count(*) from loadstaticpartitiononeoverwrite"), rows) + } + + test("Restrict streaming on partitioned table") { + intercept[AnalysisException] { + sql( + """ + | CREATE TABLE streamingpartitionedtable (empname String, designation String, doj + | Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('streaming'='true') + """.stripMargin) + } + } + override def afterAll = { dropTable @@ -215,6 +269,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("drop table if exists insertpartitionthree") sql("drop table if exists staticpartitionone") sql("drop table if exists singlepasspartitionone") + sql("drop table if exists loadstaticpartitionone") + sql("drop table if exists loadstaticpartitiononeoverwrite") + sql("drop table if exists streamingpartitionedtable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 6e9b36c..129e6b3 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -978,6 +978,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { case _ => ("", "") } + protected lazy val partitions: Parser[(String, String)] = + (ident <~ "=") ~ stringLit ^^ { + case opt ~ optvalue => (opt.trim, optvalue) + case _ => ("", "") + } + protected lazy val valueOptions: Parser[(Int, Int)] = (numericLit <~ ",") ~ numericLit ^^ { case opt ~ optvalue => (opt.toInt, optvalue.toInt) http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 7492951..f96c0a7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -558,8 +558,14 @@ case class CarbonLoadDataCommand( } // Only select the required columns - Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get), - LogicalRDD(attributes, rdd)(sparkSession)) + val output = if (partition.nonEmpty) { + relation.output.map{ attr => + attributes.find(_.name.equalsIgnoreCase(attr.name)).get + }.filter(attr => partition.get(attr.name).isEmpty) + } else { + relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) + } + Project(output, LogicalRDD(attributes, rdd)(sparkSession)) } val convertRelation = relation match { case l: LogicalRelation => http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index dad0e3e..5d00a0c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -351,8 +351,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val loadDataNew: Parser[LogicalPlan] = LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~ (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~ + (PARTITION ~>"("~> repsep(partitions, ",") <~ ")").? ~ (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { - case filePath ~ isOverwrite ~ table ~ optionsList => + case filePath ~ isOverwrite ~ table ~ partitions ~ optionsList => val (databaseNameOp, tableName) = table match { case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) } @@ -360,13 +361,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { validateOptions(optionsList) } val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap + val partitionSpec = partitions.getOrElse(List.empty[(String, String)]).toMap CarbonLoadDataCommand( - convertDbNameToLowerCase(databaseNameOp), - tableName, - filePath, - Seq(), - optionsMap, - isOverwrite.isDefined) + databaseNameOp = convertDbNameToLowerCase(databaseNameOp), + tableName = tableName, + factPathFromUser = filePath, + dimFilesPath = Seq(), + options = optionsMap, + isOverwriteTable = isOverwrite.isDefined, + inputSqlString = null, + dataFrame = None, + updateModel = None, + tableInfoOp = None, + internalOptions = Map.empty, + partition = partitionSpec.map { case (key, value) => (key, Some(value))}) } protected lazy val deleteLoadsByID: Parser[LogicalPlan] = http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 3597208..211e0ef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -233,6 +233,9 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, case _ => // ignore this case } + if (partitionFields.nonEmpty && options.isStreaming) { + operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns) + } // validate tblProperties val bucketFields = parser.getBucketFields(tableProperties, fields, options) // prepare table model of the collected tokens
