This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new c97f4fcd4 [Bug] support flinksql set syntax (#1926)
c97f4fcd4 is described below

commit c97f4fcd48fd93a0f61325e0927ffcda9df1ebfb
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 30 19:23:19 2022 +0800

    [Bug] support flinksql set syntax (#1926)
---
 .../streampark/flink/core/FlinkSqlExecutor.scala   | 45 ++--------------------
 .../streampark/flink/core/FlinkSqlValidator.scala  | 10 -----
 2 files changed, 3 insertions(+), 52 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index b810ea636..143487b45 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -17,56 +17,21 @@
 package org.apache.streampark.flink.core
 
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{ConfigOption, Configuration, 
ExecutionOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.config.{ExecutionConfigOptions, 
OptimizerConfigOptions, TableConfigOptions}
 import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.core.SqlCommand._
 
 import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import java.util.{HashMap => JavaHashMap, Map => JavaMap}
 import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
 
 object FlinkSqlExecutor extends Logger {
 
   private[this] val lock = new ReentrantReadWriteLock().writeLock
 
-  /**
-   * all the available sql config options.
-   * see: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config
-   */
-  lazy val tableConfigOptions: JavaMap[String, ConfigOption[_]] = {
-    def extractConfig(clazz: Class[_]): JavaMap[String, ConfigOption[_]] = {
-      val configOptions = new JavaHashMap[String, ConfigOption[_]]
-      clazz.getDeclaredFields.foreach(field => {
-        if (field.getType.isAssignableFrom(classOf[ConfigOption[_]])) {
-          Try {
-            val configOption = 
field.get(classOf[ConfigOption[_]]).asInstanceOf[ConfigOption[_]]
-            configOptions.put(configOption.key, configOption)
-          } match {
-            case Success(_) =>
-            case Failure(e) => logError("Fail to get ConfigOption", e)
-          }
-        }
-      })
-      configOptions
-    }
-
-    val configOptions = new JavaHashMap[String, ConfigOption[_]]
-    val configList = List(
-      //classOf[PythonOptions],
-      classOf[ExecutionOptions],
-      classOf[ExecutionConfigOptions],
-      classOf[OptimizerConfigOptions],
-      classOf[TableConfigOptions]
-    )
-    configList.foreach(x => configOptions.putAll(extractConfig(x)))
-    configOptions
-  }
-
   private[streampark] def executeSql(sql: String, parameter: ParameterTool, 
context: TableEnvironment)(implicit callbackFunc: String => Unit = null): Unit 
= {
     val flinkSql: String = if (sql == null || sql.isEmpty) 
parameter.get(KEY_FLINK_SQL()) else parameter.get(sql)
     require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink 
sql cannot be empty")
@@ -118,15 +83,11 @@ object FlinkSqlExecutor extends Logger {
           val tableResult = context.executeSql(x.originSql)
           val r = tableResult.collect().next().getField(0).toString
           callback(r)
-
         // For specific statement, such as: SET/RESET/INSERT/SELECT
         case SET =>
-          if (!tableConfigOptions.containsKey(args)) {
-            throw new IllegalArgumentException(s"$args is not a valid 
table/sql config, please check link: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config";)
-          }
           val operand = x.operands(1)
-          context.getConfig.getConfiguration.setString(args, operand)
           logInfo(s"$command: $args --> $operand")
+          context.getConfig.getConfiguration.setString(args, operand)
         case RESET | RESET_ALL =>
           val confDataField = 
classOf[Configuration].getDeclaredField("confData")
           confDataField.setAccessible(true)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index ac39c3e1d..760586358 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -72,16 +72,6 @@ object FlinkSqlValidator extends Logger {
       lazy val command = call.command
       command match {
         case SET | RESET =>
-          // if (!FlinkSqlExecutor.tableConfigOptions.containsKey(args)) {
-          //   return FlinkSqlValidationResult(
-          //     success = false,
-          //     failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
-          //     lineStart = call.lineStart,
-          //     lineEnd = call.lineEnd,
-          //     sql = sql.replaceFirst(";|$", ";"),
-          //     exception = s"$args is not a valid table/sql config"
-          //   )
-          // }
           if (command == SET && args == 
TableConfigOptions.TABLE_SQL_DIALECT.key()) {
             sqlDialect = call.operands.last
           }

Reply via email to