This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 902815d54 [KYUUBI #5507][FLINK] Support Initialize SQL in Flink Engine
902815d54 is described below
commit 902815d5404caa71c0e3c1b5c04df6a31d11798c
Author: Kang Wang <[email protected]>
AuthorDate: Thu Nov 2 18:19:22 2023 +0800
[KYUUBI #5507][FLINK] Support Initialize SQL in Flink Engine
### _Why are the changes needed?_
We shoud support initialize SQL to init session context the in Flink SQL
(e.g. setting up catalogs).
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5518 from hadoopkandy/KYUUBI-5507.
Closes #5507
b1720ecde [Cheng Pan] Update
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
078273824 [Paul Lin] [KYUUBI-5507] Improve codestyle
13035f3e9 [Paul Lin] Update
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
2ce803181 [Paul Lin] [KYUUBI-5507] Improve tests
a29ac3860 [Paul Lin] [KYUUBI-5507] Run engine initial SQL at Engine start
b864af54f [wangkang] Merge branch 'apache:master' into KYUUBI-5507
b37d62d44 [kandy01.wang] [KYUUBI #5507] [FLINK] Support Initialize SQL in
Flink Engine
Lead-authored-by: Kang Wang <[email protected]>
Co-authored-by: Paul Lin <[email protected]>
Co-authored-by: kandy01.wang <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: wangkang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 26 ++++--
.../engine/flink/session/FlinkSessionImpl.scala | 10 ++
.../engine/flink/WithFlinkSQLEngineLocal.scala | 3 +-
.../operation/FlinkEngineInitializeSuite.scala | 104 +++++++++++++++++++++
4 files changed, 131 insertions(+), 12 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 7e4f31f8a..bf5bdc782 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -30,6 +30,7 @@ import
org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser,
FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME,
KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.service.Serverable
@@ -100,9 +101,7 @@ object FlinkSQLEngine extends Logging {
startEngine(engineContext)
info("Flink engine started")
- if ("yarn-application".equalsIgnoreCase(executionTarget)) {
- bootstrapFlinkApplicationExecutor()
- }
+ bootstrap(executionTarget)
// blocking main thread
countDownLatch.await()
@@ -127,15 +126,22 @@ object FlinkSQLEngine extends Logging {
}
}
- private def bootstrapFlinkApplicationExecutor() = {
- // trigger an execution to initiate EmbeddedExecutor with the default
flink conf
+ private def bootstrap(executionTarget: String) = {
val flinkConf = new Configuration()
- flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
- debug(s"Running bootstrap Flink SQL in application mode with flink conf:
$flinkConf.")
val tableEnv = TableEnvironment.create(flinkConf)
- val res = tableEnv.executeSql("select 'kyuubi'")
- res.await()
- info("Bootstrap Flink SQL finished.")
+
+ if ("yarn-application".equalsIgnoreCase(executionTarget)) {
+ // trigger an execution to initiate EmbeddedExecutor with the default
flink conf
+ flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
+ debug(s"Running bootstrap Flink SQL in application mode with flink conf:
$flinkConf.")
+ tableEnv.executeSql("select 'kyuubi'").await()
+ }
+
+ kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt =>
+ tableEnv.executeSql(stmt).await()
+ }
+
+ info("Bootstrap SQL finished.")
}
private def setDeploymentConf(executionTarget: String, flinkConf:
Configuration): Unit = {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index b8d1f8569..5f8f0b8c0 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -28,6 +28,7 @@ import
org.apache.flink.table.gateway.service.session.{Session => FSession}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue,
TProtocolVersion}
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.udf.KDFRegistry
@@ -64,6 +65,15 @@ class FlinkSessionImpl(
override def open(): Unit = {
val executor =
fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))
+ sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql =>
+ try {
+ executor.executeStatement(OperationHandle.create, sql)
+ } catch {
+ case NonFatal(e) =>
+ throw KyuubiSQLException(s"execute
${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e)
+ }
+ }
+
val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition {
case (k, _) =>
Array(USE_CATALOG, USE_DATABASE).contains(k)
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index 92c1bcd83..ccaefb496 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with
WithFlinkTestResources
private var zkServer: EmbeddedZookeeper = _
- protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+ protected val conf: KyuubiConf = new KyuubiConf(false)
protected def engineRefId: String
@@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with
WithFlinkTestResources
}
}
withKyuubiConf.foreach { case (k, v) =>
- System.setProperty(k, v)
conf.set(k, v)
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
new file mode 100644
index 000000000..db174e501
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util.UUID
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine,
WithFlinkSQLEngineLocal}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID,
HA_NAMESPACE}
+import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}
+
+class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
+ with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {
+
+ protected def jdbcUrl: String = getFlinkEngineServiceUrl
+
+ protected val ENGINE_INITIALIZE_SQL_VALUE: String =
+ "show databases;"
+
+ protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String =
+ """create catalog cat_b with ('type'='generic_in_memory');
+ |create table blackhole(i int) with ('connector'='blackhole');
+ |create table datagen(i int) with (
+ |'connector'='datagen',
+ |'fields.i.kind'='sequence',
+ |'fields.i.start'='1',
+ |'fields.i.end'='10')""".stripMargin
+
+ override def withKyuubiConf: Map[String, String] = {
+ Map(
+ "flink.execution.target" -> "remote",
+ "flink.high-availability.cluster-id" -> "flink-mini-cluster",
+ "flink.app.name" -> "kyuubi_connection_flink_kandy",
+ HA_NAMESPACE.key -> namespace,
+ HA_ENGINE_REF_ID.key -> engineRefId,
+ ENGINE_TYPE.key -> "FLINK_SQL",
+ ENGINE_SHARE_LEVEL.key -> shareLevel,
+ OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
+ ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
+ ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
+ KYUUBI_SESSION_USER_KEY -> "kandy")
+ }
+
+ override protected def engineRefId: String = UUID.randomUUID().toString
+
+ def namespace: String = "/kyuubi/flink-local-engine-test"
+
+ def shareLevel: String = ShareLevel.USER.toString
+
+ def engineType: String = "flink"
+
+ test("execute statement - kyuubi engine initialize") {
+ withJdbcStatement() { statement =>
+ var resultSet = statement.executeQuery("show catalogs")
+ val expectedCatalogs = Set("default_catalog", "cat_b")
+ var actualCatalogs = Set[String]()
+ while (resultSet.next()) {
+ actualCatalogs += resultSet.getString(1)
+ }
+ assert(expectedCatalogs.subsetOf(actualCatalogs))
+
+ resultSet = statement.executeQuery("show databases")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "default_database")
+ assert(!resultSet.next())
+
+ val expectedTables = Set("blackhole", "datagen")
+ resultSet = statement.executeQuery("show tables")
+ while (resultSet.next()) {
+ assert(expectedTables.contains(resultSet.getString(1)))
+ }
+ assert(!resultSet.next())
+
+ var dropResult = statement.executeQuery("drop catalog cat_b")
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+
+ dropResult = statement.executeQuery("drop table blackhole")
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+
+ dropResult = statement.executeQuery("drop table datagen")
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+ }
+ }
+}