wolfboys commented on code in PR #3537:
URL: 
https://github.com/apache/incubator-streampark/pull/3537#discussion_r1539270341


##########
streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/util/SqlCommandParser.scala:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.spark.core.util
+
+import org.apache.streampark.common.conf.ConfigKeys.PARAM_PREFIX
+import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
+import org.apache.streampark.common.util.Logger
+
+import enumeratum.EnumEntry
+import org.apache.commons.lang3.StringUtils
+
+import java.lang.{Boolean => JavaBool}
+import java.util.Scanner
+import java.util.regex.{Matcher, Pattern}
+
+import scala.annotation.tailrec
+import scala.collection.{immutable, mutable}
+import scala.collection.mutable.ListBuffer
+import scala.util.control.Breaks.{break, breakable}
+
+object SqlCommandParser extends Logger {
+
+  def parseSQL(
+      sql: String,
+      validationCallback: SparkSqlValidationResult => Unit = null): 
List[SqlCommandCall] = {
+    val sqlEmptyError = "verify failed: spark sql cannot be empty."
+    require(StringUtils.isNotBlank(sql), sqlEmptyError)
+    val sqlSegments = SqlSplitter.splitSql(sql)
+    sqlSegments match {
+      case s if s.isEmpty =>
+        if (validationCallback != null) {
+          validationCallback(
+            SparkSqlValidationResult(
+              success = false,
+              failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
+              exception = sqlEmptyError))
+          null
+        } else {
+          throw new IllegalArgumentException(sqlEmptyError)
+        }
+      case segments =>
+        val calls = new ListBuffer[SqlCommandCall]
+        for (segment <- segments) {
+          parseLine(segment) match {
+            case Some(x) => calls += x
+            case _ =>
+              if (validationCallback != null) {
+                validationCallback(
+                  SparkSqlValidationResult(
+                    success = false,
+                    failedType = FlinkSqlValidationFailedType.UNSUPPORTED_SQL,
+                    lineStart = segment.start,
+                    lineEnd = segment.end,
+                    exception = s"unsupported sql",
+                    sql = segment.sql
+                  ))
+              } else {
+                throw new UnsupportedOperationException(s"unsupported sql: 
${segment.sql}")
+              }
+          }
+        }
+
+        calls.toList match {
+          case c if c.isEmpty =>
+            if (validationCallback != null) {
+              validationCallback(
+                SparkSqlValidationResult(
+                  success = false,
+                  failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
+                  exception = "spark sql syntax error, no executable sql"))
+              null
+            } else {
+              throw new UnsupportedOperationException("spark sql syntax error, 
no executable sql")
+            }
+          case r => r
+        }
+    }
+  }
+
+  private[this] def parseLine(sqlSegment: SqlSegment): Option[SqlCommandCall] 
= {
+    val sqlCommand = SqlCommand.get(sqlSegment.sql.trim)
+    if (sqlCommand == null) None
+    else {
+      val matcher = sqlCommand.matcher
+      val groups = new Array[String](matcher.groupCount)
+      for (i <- groups.indices) {
+        groups(i) = matcher.group(i + 1)
+      }
+      sqlCommand
+        .converter(groups)
+        .map(
+          x => SqlCommandCall(sqlSegment.start, sqlSegment.end, sqlCommand, x, 
sqlSegment.sql.trim))
+    }
+  }
+
+}
+
+object Converters {
+  val NO_OPERANDS = (_: Array[String]) => Some(Array.empty[String])
+}
+
+sealed abstract class SqlCommand(
+    val name: String,
+    private val regex: String,
+    val converter: Array[String] => Option[Array[String]] = (x: Array[String]) 
=>
+      Some(Array[String](x.head)))
+  extends EnumEntry {
+  var matcher: Matcher = _
+
+  def matches(input: String): Boolean = {
+    if (StringUtils.isBlank(regex)) false
+    else {
+      val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL)
+      matcher = pattern.matcher(input)
+      matcher.matches()
+    }
+  }
+}
+
+object SqlCommand extends enumeratum.Enum[SqlCommand] {
+
+  def get(stmt: String): SqlCommand = {
+    var cmd: SqlCommand = null
+    breakable {
+      this.values.foreach(
+        x => {
+          if (x.matches(stmt)) {
+            cmd = x
+            break()
+          }
+        })
+    }
+    cmd
+  }
+
+  val values: immutable.IndexedSeq[SqlCommand] = findValues
+
+  // ---- SELECT 
Statements--------------------------------------------------------------------------------------------------------------------------------
+  case object SELECT extends SqlCommand("select", "(SELECT\\s+.+)")
+
+  // ----CREATE 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /**
+   * <pre> CREATE [TEMPORARY] TABLE [IF NOT EXISTS] 
[catalog_name.][db_name.]table_name ( {
+   * <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> }[ ,
+   * ...n] [ <watermark_definition> ] [ <table_constraint> ][ , ...n] ) 
[COMMENT table_comment]
+   * [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] 
WITH (key1=val1,
+   * key2=val2, ...) [ LIKE source_table [( <like_options> )] ] </pre
+   */
+  case object CREATE_TABLE
+    extends SqlCommand("create table", 
"(CREATE\\s+(TEMPORARY\\s+|)TABLE\\s+.+)")
+
+  /** <pre> CREATE CATALOG catalog_name WITH (key1=val1, key2=val2, ...) 
</pre> */
+  case object CREATE_CATALOG extends SqlCommand("create catalog", 
"(CREATE\\s+CATALOG\\s+.+)")
+
+  /**
+   * <pre> CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name<br> [COMMENT 
database_comment]<br>
+   * WITH (key1=val1, key2=val2, ...)<br> </pre>
+   */
+  case object CREATE_DATABASE extends SqlCommand("create database", 
"(CREATE\\s+DATABASE\\s+.+)")
+
+  /**
+   * <pre> CREATE [TEMPORARY] VIEW [IF NOT EXISTS] 
[catalog_name.][db_name.]view_name [( columnName
+   * [, columnName ]* )] [COMMENT view_comment] AS query_expression< </pre
+   */
+  case object CREATE_VIEW
+    extends SqlCommand(
+      "create view",
+      
"(CREATE\\s+(TEMPORARY\\s+|)VIEW\\s+(IF\\s+NOT\\s+EXISTS\\s+|)(\\S+)\\s+AS\\s+SELECT\\s+.+)")
+
+  /**
+   * <pre> CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS]
+   * [catalog_name.][db_name.]function_name AS identifier [LANGUAGE 
JAVA|SCALA|PYTHON] </pre
+   */
+  case object CREATE_FUNCTION
+    extends SqlCommand(
+      "create function",
+      
"(CREATE\\s+(TEMPORARY\\s+|TEMPORARY\\s+SYSTEM\\s+|)FUNCTION\\s+(IF\\s+NOT\\s+EXISTS\\s+|)(\\S+)\\s+AS\\s+.*)")
+
+  // ----DROP 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /**
+   * <pre> DROP statements are used to remove a catalog with the given catalog 
name or to remove a
+   * registered table/view/function from the current or specified Catalog.
+   *
+   * Spark SQL supports the following DROP statements for now: * DROP CATALOG 
* DROP TABLE * DROP
+   * DATABASE * DROP VIEW * DROP FUNCTION </pre>
+   */
+
+  /** <strong>DROP CATALOG [IF EXISTS] catalog_name</strong> */
+  case object DROP_CATALOG extends SqlCommand("drop catalog", 
"(DROP\\s+CATALOG\\s+.+)")
+
+  /** <strong>DROP [TEMPORARY] TABLE [IF EXISTS] 
[catalog_name.][db_name.]table_name</strong> */
+  case object DROP_TABLE extends SqlCommand("drop table", 
"(DROP\\s+(TEMPORARY\\s+|)TABLE\\s+.+)")
+
+  /** <strong>DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | 
CASCADE) ]</strong> */
+  case object DROP_DATABASE extends SqlCommand("drop database", 
"(DROP\\s+DATABASE\\s+.+)")
+
+  /** <strong>DROP [TEMPORARY] VIEW  [IF EXISTS] 
[catalog_name.][db_name.]view_name</strong> */
+  case object DROP_VIEW extends SqlCommand("drop view", 
"(DROP\\s+(TEMPORARY\\s+|)VIEW\\s+.+)")
+
+  /**
+   * <strong>DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS]
+   * [catalog_name.][db_name.]function_name</strong>
+   */
+  case object DROP_FUNCTION
+    extends SqlCommand(
+      "drop function",
+      "(DROP\\s+(TEMPORARY\\s+|TEMPORARY\\s+SYSTEM\\s+|)FUNCTION\\s+.+)")
+
+  // ----ALTER 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /**
+   * <strong>ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO 
new_table_name</strong>
+   *
+   * <strong>ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, 
key2=val2,
+   * ...)</strong>
+   */
+  case object ALTER_TABLE extends SqlCommand("alter table", 
"(ALTER\\s+TABLE\\s+.+)")
+
+  /**
+   * <strong>ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO 
new_view_name</strong>
+   *
+   * <strong>ALTER VIEW [catalog_name.][db_name.]view_name AS 
new_query_expression</strong>
+   */
+  case object ALTER_VIEW extends SqlCommand("alter view", 
"(ALTER\\s+VIEW\\s+.+)")
+
+  /** <strong>ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, 
...)</strong> */
+  case object ALTER_DATABASE extends SqlCommand("alter database", 
"(ALTER\\s+DATABASE\\s+.+)")
+
+  /**
+   * <strong> ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS]
+   * [catalog_name.][db_name.]function_name AS identifier [LANGUAGE 
JAVA|SCALA|PYTHON] </strong>
+   */
+  case object ALTER_FUNCTION
+    extends SqlCommand(
+      "alter function",
+      "(ALTER\\s+(TEMPORARY\\s+|TEMPORARY\\s+SYSTEM\\s+|)FUNCTION\\s+.+)")
+
+  // ---- INSERT 
Statement--------------------------------------------------------------------------------------------------------------------------------
+
+  /**
+   * INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name 
[PARTITION part_spec]
+   * [column_list] select_statement INSERT { INTO | OVERWRITE } 
[catalog_name.][db_name.]table_name
+   * VALUES values_row [, values_row ...]
+   */
+  case object INSERT extends SqlCommand("insert", 
"(INSERT\\s+(INTO|OVERWRITE)\\s+.+)")
+
+  // ---- DESCRIBE 
Statement--------------------------------------------------------------------------------------------------------------------------------
+
+  /** { DESCRIBE | DESC } [catalog_name.][db_name.]table_name */
+  case object DESC extends SqlCommand("desc", "(DESC\\s+.+)")
+
+  /** { DESCRIBE | DESC } [catalog_name.][db_name.]table_name */
+  case object DESCRIBE extends SqlCommand("describe", "(DESCRIBE\\s+.+)")
+
+  // ---- EXPLAIN 
Statement--------------------------------------------------------------------------------------------------------------------------------
+
+  case object EXPLAIN extends SqlCommand("explain", "(EXPLAIN\\s+.+)")
+
+  // ---- USE 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /** USE CATALOG catalog_name */
+  case object USE_CATALOG extends SqlCommand("use catalog", 
"(USE\\s+CATALOG\\s+.+)")
+
+  /** USE MODULES module_name1[, module_name2, ...] */
+  case object USE_MODULES extends SqlCommand("use modules", 
"(USE\\s+MODULES\\s+.+)")
+
+  /** USE [catalog_name.]database_name */
+  case object USE_DATABASE extends SqlCommand("use database", 
"(USE\\s+(?!(CATALOG|MODULES)).+)")
+
+  // ----SHOW 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /** SHOW CATALOGS */
+  case object SHOW_CATALOGS extends SqlCommand("show catalogs", 
"(SHOW\\s+CATALOGS\\s*)")
+
+  /** SHOW CURRENT CATALOG */
+  case object SHOW_CURRENT_CATALOG
+    extends SqlCommand("show current catalog", 
"(SHOW\\s+CURRENT\\s+CATALOG\\s*)")
+
+  /** SHOW DATABASES */
+  case object SHOW_DATABASES extends SqlCommand("show databases", 
"(SHOW\\s+DATABASES\\s*)")
+
+  /** SHOW CURRENT DATABASE */
+  case object SHOW_CURRENT_DATABASE
+    extends SqlCommand("show current database", 
"(SHOW\\s+CURRENT\\s+DATABASE\\s*)")
+
+  case object SHOW_TABLES extends SqlCommand("show tables", 
"(SHOW\\s+TABLES.*)")
+
+  case object SHOW_CREATE_TABLE
+    extends SqlCommand("show create table", "(SHOW\\s+CREATE\\s+TABLE\\s+.+)")
+
+  case object SHOW_COLUMNS extends SqlCommand("show columns", 
"(SHOW\\s+COLUMNS\\s+.+)")
+
+  /** SHOW VIEWS */
+  case object SHOW_VIEWS extends SqlCommand("show views", 
"(SHOW\\s+VIEWS\\s*)")
+
+  /** SHOW CREATE VIEW */
+  case object SHOW_CREATE_VIEW
+    extends SqlCommand("show create view", "(SHOW\\s+CREATE\\s+VIEW\\s+.+)")
+
+  /** SHOW [USER] FUNCTIONS */
+  case object SHOW_FUNCTIONS
+    extends SqlCommand("show functions", "(SHOW\\s+(USER\\s+|)FUNCTIONS\\s*)")
+
+  /** SHOW [FULL] MODULES */
+  case object SHOW_MODULES extends SqlCommand("show modules", 
"(SHOW\\s+(FULL\\s+|)MODULES\\s*)")
+
+  // ----LOAD 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /** LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)] */
+  case object LOAD_MODULE extends SqlCommand("load module", 
"(LOAD\\s+MODULE\\s+.+)")
+
+  // ----UNLOAD 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /** UNLOAD MODULE module_name */
+  case object UNLOAD_MODULE extends SqlCommand("unload module", 
"(UNLOAD\\s+MODULE\\s+.+)")
+
+  // ----SET 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /** SET ('key' = 'value') */
+  case object SET
+    extends SqlCommand(
+      "set",
+      "SET(\\s+(\\S+)\\s*=(.*))?",
+      {
+        case a if a.length < 3 => None
+        case a if a.head == null => Some(Array[String](cleanUp(a.head)))
+        case a => Some(Array[String](cleanUp(a(1)), cleanUp(a(2))))
+      })
+
+  // ----RESET 
Statements--------------------------------------------------------------------------------------------------------------------------------
+
+  /** RESET ('key') */
+  case object RESET extends SqlCommand("reset", "RESET\\s+'(.*)'")
+
+  /** RESET */
+  case object RESET_ALL extends SqlCommand("reset all", "RESET", _ => 
Some(Array[String]("ALL")))
+
+  // ----INSERT SET 
Statements--------------------------------------------------------------------------------------------------------------------------------
+  /*
+   * <pre>
+   * SQL Client execute each INSERT INTO statement as a single Spark job. 
However,
+   * this is sometimes not optimal because some part of the pipeline can be 
reused.
+   * SQL Client supports STATEMENT SET syntax to execute a set of SQL 
statements.
+   * This is an equivalent feature with StatementSet in Table API.
+   * The STATEMENT SET syntax encloses one or more INSERT INTO statements.
+   * All statements in a STATEMENT SET block are holistically optimized and 
executed as a single Spark job.
+   * Joint optimization and execution allows for reusing common intermediate 
results and can therefore significantly
+   * improve the efficiency of executing multiple queries.
+   * </pre>
+   */
+  /** This is SQL Client's syntax, don't use in our platform. */
+  @Deprecated
+  case object BEGIN_STATEMENT_SET

Review Comment:
   I see that many syntax is in FlinkSQL, is SparkSQL the same syntax?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to