This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 214d23890 [KYUUBI #3514] Support Flink 1.16
214d23890 is described below

commit 214d238902712e3f2b6e49507c2fcf762f80ef9b
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Oct 27 01:54:07 2022 +0800

    [KYUUBI #3514] Support Flink 1.16
    
    ### _Why are the changes needed?_
    
    Close https://github.com/apache/incubator-kyuubi/issues/3513
    
    https://www.mail-archive.com/devflink.apache.org/msg61009.html
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3514 from pan3793/flink-1.16.
    
    Closes #3514
    
    184b3397 [Cheng Pan] url
    01e09268 [Cheng Pan] ga
    2d0978bd [Cheng Pan] Bump Flink 1.16.0 RC2
    ffd32845 [Cheng Pan] Support Flink 1.16
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .github/workflows/master.yml                                |  5 +++++
 .../org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala   |  2 +-
 .../kyuubi/engine/flink/operation/ExecuteStatement.scala    |  4 ++--
 .../kyuubi/engine/flink/operation/OperationUtils.scala      | 13 +++++++++++--
 pom.xml                                                     |  9 +++++++++
 5 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index eca6ef157..724bf36de 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -148,6 +148,7 @@ jobs:
         flink:
           - '1.14'
           - '1.15'
+          - '1.16'
         flink-archive: [ "" ]
         comment: [ "normal" ]
         include:
@@ -155,6 +156,10 @@ jobs:
             flink: '1.15'
             flink-archive: 
'-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.14.5 
-Dflink.archive.name=flink-1.14.5-bin-scala_2.12.tgz'
             comment: 'verify-on-flink-1.14-binary'
+          - java: 8
+            flink: '1.15'
+            flink-archive: 
'-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.0 
-Dflink.archive.name=flink-1.16.0-bin-scala_2.12.tgz'
+            comment: 'verify-on-flink-1.16-binary'
     steps:
       - uses: actions/checkout@v2
       - name: Tune Runner VM
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index a3f60192f..e271944a7 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -40,7 +40,7 @@ object FlinkEngineUtils extends Logging {
   val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new 
Options);
 
   val SUPPORTED_FLINK_VERSIONS: Array[SemanticVersion] =
-    Array("1.14", "1.15").map(SemanticVersion.apply)
+    Array("1.14", "1.15", "1.16").map(SemanticVersion.apply)
 
   def checkFlinkVersion(): Unit = {
     val flinkVersion = EnvironmentInformation.getVersion
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 182e3e3ef..93d013556 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -84,11 +84,11 @@ class ExecuteStatement(
           resultSet = OperationUtils.runSetOperation(setOperation, executor, 
sessionId)
         case resetOperation: ResetOperation =>
           resultSet = OperationUtils.runResetOperation(resetOperation, 
executor, sessionId)
-        case addJarOperation: AddJarOperation =>
+        case addJarOperation: AddJarOperation if isFlinkVersionAtMost("1.15") 
=>
           resultSet = OperationUtils.runAddJarOperation(addJarOperation, 
executor, sessionId)
         case removeJarOperation: RemoveJarOperation =>
           resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, 
executor, sessionId)
-        case showJarsOperation: ShowJarsOperation =>
+        case showJarsOperation: ShowJarsOperation if 
isFlinkVersionAtMost("1.15") =>
           resultSet = OperationUtils.runShowJarOperation(showJarsOperation, 
executor, sessionId)
         case operation: Operation => runOperation(operation)
       }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
index eb11a51d5..69e0b580c 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
@@ -30,6 +30,7 @@ import org.apache.flink.types.Row
 
 import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil}
 import org.apache.kyuubi.engine.flink.result.ResultSetUtil.successResultSet
+import org.apache.kyuubi.reflection.DynMethods
 
 object OperationUtils {
 
@@ -124,7 +125,11 @@ object OperationUtils {
       addJarOperation: AddJarOperation,
       executor: Executor,
       sessionId: String): ResultSet = {
-    executor.addJar(sessionId, addJarOperation.getPath)
+    // Removed by FLINK-27790
+    val addJar = DynMethods.builder("addJar")
+      .impl(executor.getClass, classOf[String], classOf[String])
+      .build(executor)
+    addJar.invoke[Void](sessionId, addJarOperation.getPath)
     successResultSet
   }
 
@@ -157,7 +162,11 @@ object OperationUtils {
       showJarsOperation: ShowJarsOperation,
       executor: Executor,
       sessionId: String): ResultSet = {
-    val jars = executor.listJars(sessionId)
+    // Removed by FLINK-27790
+    val listJars = DynMethods.builder("listJars")
+      .impl(executor.getClass, classOf[String])
+      .build(executor)
+    val jars = listJars.invoke[util.List[String]](sessionId)
     ResultSetUtil.stringListToResultSet(jars.asScala.toList, "jar")
   }
 }
diff --git a/pom.xml b/pom.xml
index ea354b6e0..de639d049 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2223,6 +2223,7 @@
                 
<flink.module.scala.suffix>_${scala.binary.version}</flink.module.scala.suffix>
             </properties>
         </profile>
+
         <profile>
             <id>flink-1.15</id>
             <properties>
@@ -2231,6 +2232,14 @@
             </properties>
         </profile>
 
+        <profile>
+            <id>flink-1.16</id>
+            <properties>
+                <flink.version>1.16.0</flink.version>
+                <flink.module.scala.suffix></flink.module.scala.suffix>
+            </properties>
+        </profile>
+
         <profile>
             <id>spark-provided</id>
             <properties>

Reply via email to