This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 902815d54 [KYUUBI #5507][FLINK] Support Initialize SQL in Flink Engine
902815d54 is described below

commit 902815d5404caa71c0e3c1b5c04df6a31d11798c
Author: Kang Wang <[email protected]>
AuthorDate: Thu Nov 2 18:19:22 2023 +0800

    [KYUUBI #5507][FLINK] Support Initialize SQL in Flink Engine
    
    ### _Why are the changes needed?_
    We shoud support initialize SQL to init session context the  in Flink SQL  
(e.g. setting up catalogs).
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No.
    
    Closes #5518 from hadoopkandy/KYUUBI-5507.
    
    Closes #5507
    
    b1720ecde [Cheng Pan] Update 
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
    078273824 [Paul Lin] [KYUUBI-5507] Improve codestyle
    13035f3e9 [Paul Lin] Update 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
    2ce803181 [Paul Lin] [KYUUBI-5507] Improve tests
    a29ac3860 [Paul Lin] [KYUUBI-5507] Run engine initial SQL at Engine start
    b864af54f [wangkang] Merge branch 'apache:master' into KYUUBI-5507
    b37d62d44 [kandy01.wang] [KYUUBI #5507] [FLINK] Support Initialize SQL in 
Flink Engine
    
    Lead-authored-by: Kang Wang <[email protected]>
    Co-authored-by: Paul Lin <[email protected]>
    Co-authored-by: kandy01.wang <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: wangkang <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  26 ++++--
 .../engine/flink/session/FlinkSessionImpl.scala    |  10 ++
 .../engine/flink/WithFlinkSQLEngineLocal.scala     |   3 +-
 .../operation/FlinkEngineInitializeSuite.scala     | 104 +++++++++++++++++++++
 4 files changed, 131 insertions(+), 12 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 7e4f31f8a..bf5bdc782 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.gateway.service.context.DefaultContext
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, 
FLINK_ENGINE_SHUTDOWN_PRIORITY}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL
 import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, 
KYUUBI_SESSION_USER_KEY}
 import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.service.Serverable
@@ -100,9 +101,7 @@ object FlinkSQLEngine extends Logging {
       startEngine(engineContext)
       info("Flink engine started")
 
-      if ("yarn-application".equalsIgnoreCase(executionTarget)) {
-        bootstrapFlinkApplicationExecutor()
-      }
+      bootstrap(executionTarget)
 
       // blocking main thread
       countDownLatch.await()
@@ -127,15 +126,22 @@ object FlinkSQLEngine extends Logging {
     }
   }
 
-  private def bootstrapFlinkApplicationExecutor() = {
-    // trigger an execution to initiate EmbeddedExecutor with the default 
flink conf
+  private def bootstrap(executionTarget: String) = {
     val flinkConf = new Configuration()
-    flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
-    debug(s"Running bootstrap Flink SQL in application mode with flink conf: 
$flinkConf.")
     val tableEnv = TableEnvironment.create(flinkConf)
-    val res = tableEnv.executeSql("select 'kyuubi'")
-    res.await()
-    info("Bootstrap Flink SQL finished.")
+
+    if ("yarn-application".equalsIgnoreCase(executionTarget)) {
+      // trigger an execution to initiate EmbeddedExecutor with the default 
flink conf
+      flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
+      debug(s"Running bootstrap Flink SQL in application mode with flink conf: 
$flinkConf.")
+      tableEnv.executeSql("select 'kyuubi'").await()
+    }
+
+    kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt =>
+      tableEnv.executeSql(stmt).await()
+    }
+
+    info("Bootstrap SQL finished.")
   }
 
   private def setDeploymentConf(executionTarget: String, flinkConf: 
Configuration): Unit = {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index b8d1f8569..5f8f0b8c0 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.gateway.service.session.{Session => FSession}
 import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, 
TProtocolVersion}
 
 import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
 import org.apache.kyuubi.engine.flink.udf.KDFRegistry
@@ -64,6 +65,15 @@ class FlinkSessionImpl(
   override def open(): Unit = {
     val executor = 
fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))
 
+    sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql =>
+      try {
+        executor.executeStatement(OperationHandle.create, sql)
+      } catch {
+        case NonFatal(e) =>
+          throw KyuubiSQLException(s"execute 
${ENGINE_SESSION_INITIALIZE_SQL.key}  $sql ", e)
+      }
+    }
+
     val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { 
case (k, _) =>
       Array(USE_CATALOG, USE_DATABASE).contains(k)
     }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index 92c1bcd83..ccaefb496 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with 
WithFlinkTestResources
 
   private var zkServer: EmbeddedZookeeper = _
 
-  protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+  protected val conf: KyuubiConf = new KyuubiConf(false)
 
   protected def engineRefId: String
 
@@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with 
WithFlinkTestResources
       }
     }
     withKyuubiConf.foreach { case (k, v) =>
-      System.setProperty(k, v)
       conf.set(k, v)
     }
 
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
new file mode 100644
index 000000000..db174e501
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util.UUID
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, 
WithFlinkSQLEngineLocal}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, 
HA_NAMESPACE}
+import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}
+
+class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
+  with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {
+
+  protected def jdbcUrl: String = getFlinkEngineServiceUrl
+
+  protected val ENGINE_INITIALIZE_SQL_VALUE: String =
+    "show databases;"
+
+  protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String =
+    """create catalog cat_b with ('type'='generic_in_memory');
+      |create table blackhole(i int) with ('connector'='blackhole');
+      |create table datagen(i int) with (
+      |'connector'='datagen',
+      |'fields.i.kind'='sequence',
+      |'fields.i.start'='1',
+      |'fields.i.end'='10')""".stripMargin
+
+  override def withKyuubiConf: Map[String, String] = {
+    Map(
+      "flink.execution.target" -> "remote",
+      "flink.high-availability.cluster-id" -> "flink-mini-cluster",
+      "flink.app.name" -> "kyuubi_connection_flink_kandy",
+      HA_NAMESPACE.key -> namespace,
+      HA_ENGINE_REF_ID.key -> engineRefId,
+      ENGINE_TYPE.key -> "FLINK_SQL",
+      ENGINE_SHARE_LEVEL.key -> shareLevel,
+      OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
+      ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
+      ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
+      KYUUBI_SESSION_USER_KEY -> "kandy")
+  }
+
+  override protected def engineRefId: String = UUID.randomUUID().toString
+
+  def namespace: String = "/kyuubi/flink-local-engine-test"
+
+  def shareLevel: String = ShareLevel.USER.toString
+
+  def engineType: String = "flink"
+
+  test("execute statement - kyuubi engine initialize") {
+    withJdbcStatement() { statement =>
+      var resultSet = statement.executeQuery("show catalogs")
+      val expectedCatalogs = Set("default_catalog", "cat_b")
+      var actualCatalogs = Set[String]()
+      while (resultSet.next()) {
+        actualCatalogs += resultSet.getString(1)
+      }
+      assert(expectedCatalogs.subsetOf(actualCatalogs))
+
+      resultSet = statement.executeQuery("show databases")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === "default_database")
+      assert(!resultSet.next())
+
+      val expectedTables = Set("blackhole", "datagen")
+      resultSet = statement.executeQuery("show tables")
+      while (resultSet.next()) {
+        assert(expectedTables.contains(resultSet.getString(1)))
+      }
+      assert(!resultSet.next())
+
+      var dropResult = statement.executeQuery("drop catalog cat_b")
+      assert(dropResult.next())
+      assert(dropResult.getString(1) === "OK")
+
+      dropResult = statement.executeQuery("drop table blackhole")
+      assert(dropResult.next())
+      assert(dropResult.getString(1) === "OK")
+
+      dropResult = statement.executeQuery("drop table datagen")
+      assert(dropResult.next())
+      assert(dropResult.getString(1) === "OK")
+    }
+  }
+}

Reply via email to