This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 78e56afe1 [KYUUBI #6134] Support Flink 1.19
78e56afe1 is described below
commit 78e56afe122364ac1a0e0f70dc4bc78ebb9855a3
Author: wforget <[email protected]>
AuthorDate: Mon Mar 11 17:12:33 2024 +0800
[KYUUBI #6134] Support Flink 1.19
# :mag: Description
## Issue References ๐
This pull request fixes #6134
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6141 from wForget/KYUUBI-6134.
Closes #6134
c8ee7f09e [wforget] remove flink-1.19.0-rc2 resource and flink 1.19 ga
efb3d8aaf [wforget] fix
845b27fba [wforget] fix
08b8b1d23 [wforget] dev
c9f69175f [wforget] fix
b93744d44 [wforget] [KYUUBI #6134] Support Flink 1.19
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/flink/FlinkEngineUtils.scala | 28 ++++++++++++++--------
.../flink/operation/FlinkOperationSuite.scala | 2 ++
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 0a4188fda..d9c842759 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -48,7 +48,8 @@ object FlinkEngineUtils extends Logging {
val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new
Options)
- private def SUPPORTED_FLINK_VERSIONS = Set("1.16", "1.17",
"1.18").map(SemanticVersion.apply)
+ private def SUPPORTED_FLINK_VERSIONS =
+ Set("1.16", "1.17", "1.18", "1.19").map(SemanticVersion.apply)
val FLINK_RUNTIME_VERSION: SemanticVersion =
SemanticVersion(EnvironmentInformation.getVersion)
@@ -111,7 +112,22 @@ object FlinkEngineUtils extends Logging {
val libDirs: JList[URL] = Option(checkUrls(line,
CliOptionsParser.OPTION_LIBRARY))
.getOrElse(JCollections.emptyList())
val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
- if (FLINK_RUNTIME_VERSION === "1.16") {
+ if (FLINK_RUNTIME_VERSION >= "1.19") {
+ invokeAs[DefaultContext](
+ classOf[DefaultContext],
+ "load",
+ (classOf[Configuration], flinkConf),
+ (classOf[JList[URL]], dependencies),
+ (classOf[Boolean], JBoolean.TRUE))
+ } else if (FLINK_RUNTIME_VERSION >= "1.17") {
+ invokeAs[DefaultContext](
+ classOf[DefaultContext],
+ "load",
+ (classOf[Configuration], flinkConf),
+ (classOf[JList[URL]], dependencies),
+ (classOf[Boolean], JBoolean.TRUE),
+ (classOf[Boolean], JBoolean.FALSE))
+ } else if (FLINK_RUNTIME_VERSION === "1.16") {
val commandLines: JList[CustomCommandLine] =
Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
DynConstructors.builder()
@@ -122,14 +138,6 @@ object FlinkEngineUtils extends Logging {
.build()
.newInstance(flinkConf, commandLines)
.asInstanceOf[DefaultContext]
- } else if (FLINK_RUNTIME_VERSION >= "1.17") {
- invokeAs[DefaultContext](
- classOf[DefaultContext],
- "load",
- (classOf[Configuration], flinkConf),
- (classOf[JList[URL]], dependencies),
- (classOf[Boolean], JBoolean.TRUE),
- (classOf[Boolean], JBoolean.FALSE))
} else {
throw new KyuubiException(
s"Flink version ${EnvironmentInformation.getVersion} are not supported
currently.")
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 59d5fde34..0ca828482 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -993,6 +993,8 @@ abstract class FlinkOperationSuite extends
HiveJDBCTestHelper with WithFlinkTest
statement.getConnection.setSchema("db_a")
val changedSchema = statement.getConnection.getSchema
assert(changedSchema == "db_a")
+ // reset database to default
+ statement.getConnection.setSchema("default_database")
assert(statement.execute("drop database db_a"))
}
}