This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new d11fafe2f [bug] [fix] [streampark-flink-shims-base] Repair multiple 
dialect set statement have no effect error. (#1826)
d11fafe2f is described below

commit d11fafe2f804a6f96d46c54c1b4cd91bd47949c7
Author: 第一片心意 <[email protected]>
AuthorDate: Tue Oct 25 08:51:17 2022 +0800

    [bug] [fix] [streampark-flink-shims-base] Repair multiple dialect set 
statement have no effect error. (#1826)
    
    * Fix the problem that resources of the flink job manager role are not 
automatically released after the flink sql batch task is successfully run in 
yarn per job mode.
    
    * Fixed an issue where hive sql could not be run due to the platform 
changing the flink sql order, which could not load hive dialect correctly.
    
    * Remove the validity check for keys in the set statement, because the 
platform is not yet able to aggregate all keys
---
 .../streampark/flink/core/FlinkSqlExecutor.scala   | 18 ++++++-------
 .../streampark/flink/core/FlinkSqlValidator.scala  | 30 ++++++++++++----------
 .../streampark/flink/core/SqlCommandParser.scala   |  2 +-
 .../flink/submit/impl/YarnPerJobSubmit.scala       |  2 +-
 4 files changed, 27 insertions(+), 25 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index d65d5c46b..b810ea636 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -16,19 +16,18 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.core.SqlCommand._
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.{ConfigOption, Configuration, 
ExecutionOptions}
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.config.{ExecutionConfigOptions, 
OptimizerConfigOptions, TableConfigOptions}
+import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.flink.core.SqlCommand._
 
 import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.{HashMap => JavaHashMap, Map => JavaMap}
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
 object FlinkSqlExecutor extends Logger {
@@ -79,7 +78,8 @@ object FlinkSqlExecutor extends Logger {
       }
     }
 
-    val insertArray = new ArrayBuffer[String]()
+    var hasInsert = false
+    val statementSet = context.createStatementSet()
     SqlCommandParser.parseSQL(flinkSql).foreach(x => {
       val args = if (x.operands.isEmpty) null else x.operands.head
       val command = x.command.name
@@ -141,7 +141,9 @@ object FlinkSqlExecutor extends Logger {
           logInfo(s"$command: $args")
         case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
           logWarn(s"SQL Client Syntax: ${x.command.name} ")
-        case INSERT => insertArray += x.originSql
+        case INSERT =>
+          statementSet.addInsertSql(x.originSql)
+          hasInsert = true
         case SELECT =>
           logError("StreamPark dose not support 'SELECT' statement now!")
           throw new RuntimeException("StreamPark dose not support 'select' 
statement now!")
@@ -157,9 +159,7 @@ object FlinkSqlExecutor extends Logger {
       }
     })
 
-    if (insertArray.nonEmpty) {
-      val statementSet = context.createStatementSet()
-      insertArray.foreach(statementSet.addInsertSql)
+    if (hasInsert) {
       statementSet.execute() match {
         case t if t != null =>
           Try(t.getJobClient.get.getJobID).getOrElse(null) match {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index cfd92eb14..ac39c3e1d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
-import org.apache.streampark.common.util.{ExceptionUtils, Logger}
-import org.apache.streampark.flink.core.SqlCommand._
 import org.apache.calcite.config.Lex
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.parser.SqlParser.Config
@@ -26,9 +23,12 @@ import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.configuration.ExecutionOptions
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.table.api.SqlDialect.{DEFAULT, HIVE}
-import org.apache.flink.table.api.{SqlDialect, TableConfig}
 import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.api.{SqlDialect, TableConfig}
 import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
+import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
+import org.apache.streampark.common.util.{ExceptionUtils, Logger}
+import org.apache.streampark.flink.core.SqlCommand._
 
 import scala.util.{Failure, Try}
 
@@ -72,16 +72,16 @@ object FlinkSqlValidator extends Logger {
       lazy val command = call.command
       command match {
         case SET | RESET =>
-          if (!FlinkSqlExecutor.tableConfigOptions.containsKey(args)) {
-            return FlinkSqlValidationResult(
-              success = false,
-              failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
-              lineStart = call.lineStart,
-              lineEnd = call.lineEnd,
-              sql = sql.replaceFirst(";|$", ";"),
-              exception = s"$args is not a valid table/sql config"
-            )
-          }
+          // if (!FlinkSqlExecutor.tableConfigOptions.containsKey(args)) {
+          //   return FlinkSqlValidationResult(
+          //     success = false,
+          //     failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
+          //     lineStart = call.lineStart,
+          //     lineEnd = call.lineEnd,
+          //     sql = sql.replaceFirst(";|$", ";"),
+          //     exception = s"$args is not a valid table/sql config"
+          //   )
+          // }
           if (command == SET && args == 
TableConfigOptions.TABLE_SQL_DIALECT.key()) {
             sqlDialect = call.operands.last
           }
@@ -98,10 +98,12 @@ object FlinkSqlValidator extends Logger {
               case _ =>
                 throw new UnsupportedOperationException(s"unsupported dialect: 
${sqlDialect}")
             }
+
             val parser = calciteClass.getConstructor(Array(classOf[Config]): 
_*).newInstance(sqlParserConfigMap(sqlDialect.toUpperCase()))
             val method = parser.getClass.getDeclaredMethod("parse", 
classOf[String])
             method.setAccessible(true)
             method.invoke(parser, call.originSql)
+
           } match {
             case Failure(e) =>
               val exception = ExceptionUtils.stringifyException(e)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 7274e8823..7df3c8acf 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -84,7 +84,7 @@ object SqlCommandParser extends Logger {
             } else {
               throw new UnsupportedOperationException("flink sql syntax error, 
no executable sql")
             }
-          case r => r.sortWith((a, b) => a.lineStart < b.lineStart)
+          case r => r
         }
     }
   }
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
index 664954030..e72e4a8e0 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
@@ -96,7 +96,7 @@ object YarnPerJobSubmit extends YarnSubmitTrait {
           submitRequest.effectiveAppName,
           classOf[YarnJobClusterEntrypoint].getName,
           jobGraph,
-          false
+          true
         ).getClusterClient
 
       }

Reply via email to