This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new b66b8d1a1 [KYUUBI #6130] Stop engine immediately after close session 
for `CONNECTION` level FlinkSQLEngine
b66b8d1a1 is described below

commit b66b8d1a15d59add0104bcbe0e8fb4a060352bea
Author: wforget <[email protected]>
AuthorDate: Fri Mar 8 20:40:05 2024 +0800

    [KYUUBI #6130] Stop engine immediately after close session for `CONNECTION` 
level FlinkSQLEngine
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6130
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Stop engine immediately after close session for `CONNECTION` level 
FlinkSQLEngine
    
    ## Types of changes :bookmark:
    
    - [X] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6132 from wForget/KYUUBI-6130.
    
    Closes #6130
    
    033c0fff5 [wforget] assert exception
    c0ce68e35 [wforget] debug
    07e0320af [wforget] add test
    a3c4ae319 [wforget] [KYUUBI-6130] Stop engine immediately after close 
session for `CONNECTION` level FlinkSQLEngine
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 9c18a7e38d3ea650d8ae3f57cd8c9600b82bea36)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../flink/session/FlinkSQLSessionManager.scala     | 26 ++++++++++++---
 .../operation/FlinkEngineInitializeSuite.scala     | 38 ++++++++++++++++++++--
 2 files changed, 57 insertions(+), 7 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index b7cd46217..5bc58b172 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -25,7 +25,10 @@ import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
 import org.apache.flink.table.gateway.service.context.DefaultContext
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.FlinkSQLEngine
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
 import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
 import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
@@ -35,6 +38,8 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
 
   override protected def isServer: Boolean = false
 
+  private lazy val shareLevel = 
ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
+
   val operationManager = new FlinkSQLOperationManager()
   val sessionManager = new FlinkSessionManager(engineContext)
 
@@ -77,10 +82,23 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
-    val fSession = super.getSessionOption(sessionHandle)
-    fSession.foreach(s =>
-      
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
-    super.closeSession(sessionHandle)
+    try {
+      val fSession = super.getSessionOption(sessionHandle)
+      fSession.foreach(s =>
+        
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
+      super.closeSession(sessionHandle)
+    } catch {
+      case t: Throwable =>
+        warn(s"Error closing session $sessionHandle", t)
+    }
+    if (shareLevel == ShareLevel.CONNECTION) {
+      info("Session stopped due to shared level is Connection.")
+      stopSession()
+    }
+  }
+
+  private def stopSession(): Unit = {
+    FlinkSQLEngine.currentEngine.foreach(_.stop())
   }
 
   override def stop(): Unit = synchronized {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
index c98d07cc4..f852ad4db 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
@@ -19,14 +19,18 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.util.UUID
 
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
 import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, 
WithFlinkSQLEngineLocal}
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, 
HA_NAMESPACE}
 import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}
 
-class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
+trait FlinkEngineInitializeSuite extends HiveJDBCTestHelper
   with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {
 
   protected def jdbcUrl: String = getFlinkEngineServiceUrl
@@ -51,7 +55,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
       HA_NAMESPACE.key -> namespace,
       HA_ENGINE_REF_ID.key -> engineRefId,
       ENGINE_TYPE.key -> "FLINK_SQL",
-      ENGINE_SHARE_LEVEL.key -> shareLevel,
+      ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
       OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
       ENGINE_FLINK_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
       ENGINE_SESSION_FLINK_INITIALIZE_SQL.key -> 
ENGINE_SESSION_INITIALIZE_SQL_VALUE,
@@ -62,7 +66,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
 
   def namespace: String = "/kyuubi/flink-local-engine-test"
 
-  def shareLevel: String = ShareLevel.USER.toString
+  def shareLevel: ShareLevel
 
   def engineType: String = "flink"
 
@@ -100,5 +104,33 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
       assert(dropResult.next())
       assert(dropResult.getString(1) === "OK")
     }
+    // check engine alive status after close session with connection level 
engine
+    if (shareLevel == ShareLevel.CONNECTION) {
+      eventually(Timeout(10.seconds)) {
+        assert(!engineProcess.isAlive)
+      }
+      val e = intercept[Exception] {
+        withJdbcStatement() { statement =>
+          statement.executeQuery("select 1")
+        }
+      }
+      assert(e.getMessage() == "Time out retrieving Flink engine service url.")
+    }
+    // check engine alive status after close session with user level engine
+    if (shareLevel == ShareLevel.USER) {
+      assert(engineProcess.isAlive)
+      withJdbcStatement() { statement =>
+        val resultSet = statement.executeQuery("select 1")
+        assert(resultSet.next())
+      }
+    }
   }
 }
+
+class FlinkConnectionLevelEngineInitializeSuite extends 
FlinkEngineInitializeSuite {
+  def shareLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class FlinkUserLevelEngineInitializeSuite extends FlinkEngineInitializeSuite {
+  def shareLevel: ShareLevel = ShareLevel.USER
+}

Reply via email to