This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e56afe1 [KYUUBI #6134] Support Flink 1.19
78e56afe1 is described below

commit 78e56afe122364ac1a0e0f70dc4bc78ebb9855a3
Author: wforget <[email protected]>
AuthorDate: Mon Mar 11 17:12:33 2024 +0800

    [KYUUBI #6134] Support Flink 1.19
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6134
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6141 from wForget/KYUUBI-6134.
    
    Closes #6134
    
    c8ee7f09e [wforget] remove flink-1.19.0-rc2 resource and flink 1.19 ga
    efb3d8aaf [wforget] fix
    845b27fba [wforget] fix
    08b8b1d23 [wforget] dev
    c9f69175f [wforget] fix
    b93744d44 [wforget] [KYUUBI #6134] Support Flink 1.19
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/flink/FlinkEngineUtils.scala     | 28 ++++++++++++++--------
 .../flink/operation/FlinkOperationSuite.scala      |  2 ++
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 0a4188fda..d9c842759 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -48,7 +48,8 @@ object FlinkEngineUtils extends Logging {
 
   val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new 
Options)
 
-  private def SUPPORTED_FLINK_VERSIONS = Set("1.16", "1.17", 
"1.18").map(SemanticVersion.apply)
+  private def SUPPORTED_FLINK_VERSIONS =
+    Set("1.16", "1.17", "1.18", "1.19").map(SemanticVersion.apply)
 
   val FLINK_RUNTIME_VERSION: SemanticVersion = 
SemanticVersion(EnvironmentInformation.getVersion)
 
@@ -111,7 +112,22 @@ object FlinkEngineUtils extends Logging {
     val libDirs: JList[URL] = Option(checkUrls(line, 
CliOptionsParser.OPTION_LIBRARY))
       .getOrElse(JCollections.emptyList())
     val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
-    if (FLINK_RUNTIME_VERSION === "1.16") {
+    if (FLINK_RUNTIME_VERSION >= "1.19") {
+      invokeAs[DefaultContext](
+        classOf[DefaultContext],
+        "load",
+        (classOf[Configuration], flinkConf),
+        (classOf[JList[URL]], dependencies),
+        (classOf[Boolean], JBoolean.TRUE))
+    } else if (FLINK_RUNTIME_VERSION >= "1.17") {
+      invokeAs[DefaultContext](
+        classOf[DefaultContext],
+        "load",
+        (classOf[Configuration], flinkConf),
+        (classOf[JList[URL]], dependencies),
+        (classOf[Boolean], JBoolean.TRUE),
+        (classOf[Boolean], JBoolean.FALSE))
+    } else if (FLINK_RUNTIME_VERSION === "1.16") {
       val commandLines: JList[CustomCommandLine] =
         Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
       DynConstructors.builder()
@@ -122,14 +138,6 @@ object FlinkEngineUtils extends Logging {
         .build()
         .newInstance(flinkConf, commandLines)
         .asInstanceOf[DefaultContext]
-    } else if (FLINK_RUNTIME_VERSION >= "1.17") {
-      invokeAs[DefaultContext](
-        classOf[DefaultContext],
-        "load",
-        (classOf[Configuration], flinkConf),
-        (classOf[JList[URL]], dependencies),
-        (classOf[Boolean], JBoolean.TRUE),
-        (classOf[Boolean], JBoolean.FALSE))
     } else {
       throw new KyuubiException(
         s"Flink version ${EnvironmentInformation.getVersion} are not supported 
currently.")
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 59d5fde34..0ca828482 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -993,6 +993,8 @@ abstract class FlinkOperationSuite extends 
HiveJDBCTestHelper with WithFlinkTest
         statement.getConnection.setSchema("db_a")
         val changedSchema = statement.getConnection.getSchema
         assert(changedSchema == "db_a")
+        // reset database to default
+        statement.getConnection.setSchema("default_database")
         assert(statement.execute("drop database db_a"))
       }
     }

Reply via email to