This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 214d23890 [KYUUBI #3514] Support Flink 1.16
214d23890 is described below
commit 214d238902712e3f2b6e49507c2fcf762f80ef9b
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Oct 27 01:54:07 2022 +0800
[KYUUBI #3514] Support Flink 1.16
### _Why are the changes needed?_
Close https://github.com/apache/incubator-kyuubi/issues/3513
https://www.mail-archive.com/devflink.apache.org/msg61009.html
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3514 from pan3793/flink-1.16.
Closes #3514
184b3397 [Cheng Pan] url
01e09268 [Cheng Pan] ga
2d0978bd [Cheng Pan] Bump Flink 1.16.0 RC2
ffd32845 [Cheng Pan] Support Flink 1.16
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.github/workflows/master.yml | 5 +++++
.../org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala | 2 +-
.../kyuubi/engine/flink/operation/ExecuteStatement.scala | 4 ++--
.../kyuubi/engine/flink/operation/OperationUtils.scala | 13 +++++++++++--
pom.xml | 9 +++++++++
5 files changed, 28 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index eca6ef157..724bf36de 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -148,6 +148,7 @@ jobs:
flink:
- '1.14'
- '1.15'
+ - '1.16'
flink-archive: [ "" ]
comment: [ "normal" ]
include:
@@ -155,6 +156,10 @@ jobs:
flink: '1.15'
flink-archive:
'-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.14.5
-Dflink.archive.name=flink-1.14.5-bin-scala_2.12.tgz'
comment: 'verify-on-flink-1.14-binary'
+ - java: 8
+ flink: '1.15'
+ flink-archive:
'-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.0
-Dflink.archive.name=flink-1.16.0-bin-scala_2.12.tgz'
+ comment: 'verify-on-flink-1.16-binary'
steps:
- uses: actions/checkout@v2
- name: Tune Runner VM
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index a3f60192f..e271944a7 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -40,7 +40,7 @@ object FlinkEngineUtils extends Logging {
val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new
Options);
val SUPPORTED_FLINK_VERSIONS: Array[SemanticVersion] =
- Array("1.14", "1.15").map(SemanticVersion.apply)
+ Array("1.14", "1.15", "1.16").map(SemanticVersion.apply)
def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 182e3e3ef..93d013556 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -84,11 +84,11 @@ class ExecuteStatement(
resultSet = OperationUtils.runSetOperation(setOperation, executor,
sessionId)
case resetOperation: ResetOperation =>
resultSet = OperationUtils.runResetOperation(resetOperation,
executor, sessionId)
- case addJarOperation: AddJarOperation =>
+ case addJarOperation: AddJarOperation if isFlinkVersionAtMost("1.15")
=>
resultSet = OperationUtils.runAddJarOperation(addJarOperation,
executor, sessionId)
case removeJarOperation: RemoveJarOperation =>
resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation,
executor, sessionId)
- case showJarsOperation: ShowJarsOperation =>
+ case showJarsOperation: ShowJarsOperation if
isFlinkVersionAtMost("1.15") =>
resultSet = OperationUtils.runShowJarOperation(showJarsOperation,
executor, sessionId)
case operation: Operation => runOperation(operation)
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
index eb11a51d5..69e0b580c 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
@@ -30,6 +30,7 @@ import org.apache.flink.types.Row
import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil}
import org.apache.kyuubi.engine.flink.result.ResultSetUtil.successResultSet
+import org.apache.kyuubi.reflection.DynMethods
object OperationUtils {
@@ -124,7 +125,11 @@ object OperationUtils {
addJarOperation: AddJarOperation,
executor: Executor,
sessionId: String): ResultSet = {
- executor.addJar(sessionId, addJarOperation.getPath)
+ // Removed by FLINK-27790
+ val addJar = DynMethods.builder("addJar")
+ .impl(executor.getClass, classOf[String], classOf[String])
+ .build(executor)
+ addJar.invoke[Void](sessionId, addJarOperation.getPath)
successResultSet
}
@@ -157,7 +162,11 @@ object OperationUtils {
showJarsOperation: ShowJarsOperation,
executor: Executor,
sessionId: String): ResultSet = {
- val jars = executor.listJars(sessionId)
+ // Removed by FLINK-27790
+ val listJars = DynMethods.builder("listJars")
+ .impl(executor.getClass, classOf[String])
+ .build(executor)
+ val jars = listJars.invoke[util.List[String]](sessionId)
ResultSetUtil.stringListToResultSet(jars.asScala.toList, "jar")
}
}
diff --git a/pom.xml b/pom.xml
index ea354b6e0..de639d049 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2223,6 +2223,7 @@
<flink.module.scala.suffix>_${scala.binary.version}</flink.module.scala.suffix>
</properties>
</profile>
+
<profile>
<id>flink-1.15</id>
<properties>
@@ -2231,6 +2232,14 @@
</properties>
</profile>
+ <profile>
+ <id>flink-1.16</id>
+ <properties>
+ <flink.version>1.16.0</flink.version>
+ <flink.module.scala.suffix></flink.module.scala.suffix>
+ </properties>
+ </profile>
+
<profile>
<id>spark-provided</id>
<properties>