Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2328#discussion_r194665653 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM SOURCE + * CREATE STREAM SOURCE [dbName.]tableName (schema list) + * [TBLPROPERTIES('KEY'='VALUE')] + */ + protected lazy val createStreamSource: Parser[LogicalPlan] = + CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~ + ("(" ~> repsep(anyFieldDef, ",") <~ ")") ~ + (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { + case dbName ~ tableName ~ fields ~ map => + val tblProperties = map.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamSourceCommand(dbName, tableName, fields, tblProperties) + } + + /** + * The syntax of CREATE STREAM + * CREATE STREAM ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ + (AS ~> restInput) <~ opt(";") ^^ { + case dbName ~ tableName ~ options ~ query => + val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamCommand(dbName, tableName, optionMap, query) + } + + /** + * The syntax of KILL STREAM + * KILL STREAM ON TABLE [dbName].tableName + */ + protected lazy val killStream: Parser[LogicalPlan] = --- End diff -- If the stream is dropped, user need to trigger CREATE STREAM again
---