Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189764478
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
---
@@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser
{
CarbonAlterTableFinishStreaming(dbName, table)
}
+ /**
+ * The syntax of CREATE STREAM SOURCE
+ * CREATE STREAM SOURCE [dbName.]tableName (schema list)
+ * [TBLPROPERTIES('KEY'='VALUE')]
+ */
+ protected lazy val createStreamSource: Parser[LogicalPlan] =
+ CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~
+ ("(" ~> repsep(anyFieldDef, ",") <~ ")") ~
+ (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~
opt(";") ^^ {
+ case dbName ~ tableName ~ fields ~ map =>
+ val tblProperties = map.getOrElse(List[(String,
String)]()).toMap[String, String]
+ CarbonCreateStreamSourceCommand(dbName, tableName, fields,
tblProperties)
+ }
+
+ /**
+ * The syntax of CREATE STREAM
+ * CREATE STREAM ON TABLE [dbName.]tableName
+ * [STMPROPERTIES('KEY'='VALUE')]
+ * AS SELECT COUNT(COL1) FROM tableName
+ */
+ protected lazy val createStream: Parser[LogicalPlan] =
+ CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+ (AS ~> restInput) <~ opt(";") ^^ {
+ case dbName ~ tableName ~ options ~ query =>
+ val optionMap = options.getOrElse(List[(String,
String)]()).toMap[String, String]
+ CarbonCreateStreamCommand(dbName, tableName, optionMap, query)
+ }
+
+ /**
+ * The syntax of KILL STREAM
+ * KILL STREAM ON TABLE [dbName].tableName
+ */
+ protected lazy val killStream: Parser[LogicalPlan] =
--- End diff --
What will happen if I call `Kill Stream`?
Should I (re)`create stream` if I want the stream start again? If so,
better to change the grammar to `DROP STREAM ON TABLE ...`
---