This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.10 by this push:
     new a1846d9fe0 [KYUUBI #7163][SPARK] Check whether engine context stopped 
in engine terminating checker
a1846d9fe0 is described below

commit a1846d9fe05b3010ea9c01943a5720e05ec37284
Author: Wang, Fei <fwan...@ebay.com>
AuthorDate: Thu Aug 7 01:27:55 2025 -0700

    [KYUUBI #7163][SPARK] Check whether engine context stopped in engine 
terminating checker
    
    ### Why are the changes needed?
    
    To close #7163, in this PR, it checks whether engine context stopped in 
engine terminating checker.
    1. Spark context stooped dut to OOM in `spark-listener-group-shared`, and 
call `tryOrStopSparkContext`.
    
    ```
    25/08/03 19:08:06 ERROR Utils: uncaught error in thread 
spark-listener-group-shared, stopping SparkContext
    java.lang.OutOfMemoryError: GC overhead limit exceeded
    25/08/03 19:08:06 INFO OperationAuditLogger: 
operation=a7f134b9-373b-402d-a82b-2d42df568807 opType=ExecuteStatement 
state=INITIALIZED   user=b_hrvst    session=6a90d01c-7627-4ae6-a506-7ba826355489
    ...
    25/08/03 19:08:23 INFO SparkSQLSessionManager: Opening session for 
b_hrvst10.147.254.115
    25/08/03 19:08:23 ERROR SparkTBinaryFrontendService: Error opening session:
    org.apache.kyuubi.KyuubiSQLException: Cannot call methods on a stopped 
SparkContext.
    This stopped SparkContext was created at:
    
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:951)
    
org.apache.kyuubi.engine.spark.SparkSQLEngine$.createSpark(SparkSQLEngine.scala:337)
    
org.apache.kyuubi.engine.spark.SparkSQLEngine$.main(SparkSQLEngine.scala:415)
    org.apache.kyuubi.engine.spark.SparkSQLEngine.main(SparkSQLEngine.scala)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:498)
    
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
    The currently active SparkContext was created at:
    
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:951)
    
org.apache.kyuubi.engine.spark.SparkSQLEngine$.createSpark(SparkSQLEngine.scala:337)
    
org.apache.kyuubi.engine.spark.SparkSQLEngine$.main(SparkSQLEngine.scala:415)
    org.apache.kyuubi.engine.spark.SparkSQLEngine.main(SparkSQLEngine.scala)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:498)
    
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
    
        at 
org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at 
org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:73)
    ```
    
    2. The kyuubi engine stop after 12 hours.
    ```
    25/08/04 07:13:25 ERROR ZookeeperDiscoveryClient: Zookeeper client 
connection state changed to: LOST, but failed to reconnect in 3 seconds. Give 
up retry and stop gracefully .
    25/08/04 07:13:25 INFO ClientCnxn: Session establishment complete on server 
zeus-slc-zk-3.vip.hadoop.ebay.com/10.147.141.240:2181, sessionid = 
0x3939e22c983032e, negotiated timeout = 40000
    25/08/04 07:13:25 INFO ConnectionStateManager: State change: RECONNECTED
    25/08/04 07:13:25 INFO ZookeeperDiscoveryClient: Zookeeper client 
connection state changed to: RECONNECTED
    25/08/04 07:13:25 INFO SparkSQLEngine: Service: [SparkTBinaryFrontend] is 
stopping.
    25/08/04 07:13:25 INFO SparkTBinaryFrontendService: Service: 
[EngineServiceDiscovery] is stopping.
    25/08/04 07:13:25 WARN EngineServiceDiscovery: The Zookeeper ensemble is 
LOST
    25/08/04 07:13:25 INFO EngineServiceDiscovery: 
Service[EngineServiceDiscovery] is stopped.
    25/08/04 07:13:25 INFO SparkTBinaryFrontendService: 
Service[SparkTBinaryFrontend] is stopped.
    25/08/04 07:13:25 INFO SparkTBinaryFrontendService: SparkTBinaryFrontend 
has stopped
    25/08/04 07:13:25 INFO SparkSQLEngine: Service: [SparkSQLBackendService] is 
stopping.
    25/08/04 07:13:25 INFO SparkSQLBackendService: Service: 
[SparkSQLSessionManager] is stopping.
    25/08/04 07:13:25 INFO SparkSQLSessionManager: Service: 
[SparkSQLOperationManager] is stopping.
    25/08/04 07:13:45 INFO SparkSQLOperationManager: 
Service[SparkSQLOperationManager] is stopped.
    25/08/04 07:13:45 INFO SparkSQLSessionManager: 
Service[SparkSQLSessionManager] is stopped.
    ```
    
    3. seem the shutdown hook does not work in such case
    
https://github.com/apache/kyuubi/blob/9a0c49e79135cd90368986176591a80d29634231/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala#L375-L376
    
    4. and `SparkSQLEngineListener` did not receive `ApplicationEnd` message, 
maybe due to `spark-listener-group-shared` OOM? I do not have jstack for that, 
and can not check whether the thread alive.
    
https://github.com/apache/kyuubi/blob/9a0c49e79135cd90368986176591a80d29634231/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala#L55-L63
    
    ### How was this patch tested?
    
    Existing GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7167 from turboFei/check_spark_stopped.
    
    Closes #7163
    
    835cb3dec [Wang, Fei] SparkContext
    cd542decb [Wang, Fei] Revert "no hard code"
    cf9e40ef6 [Wang, Fei] no hard code
    ca551c23d [Wang, Fei] check engine context stopped
    
    Authored-by: Wang, Fei <fwan...@ebay.com>
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
    (cherry picked from commit b31663f569d69a22dec41e8e8aa0dc60eaca86d9)
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
---
 .../engine/spark/session/SparkSQLSessionManager.scala     |  4 ++++
 .../scala/org/apache/kyuubi/session/SessionManager.scala  | 15 +++++++++++----
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 7144188a4d..e75ef3246e 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -222,4 +222,8 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
       opHandle: OperationHandle): Path = {
     new Path(getSessionResultSavePath(sessionHandle), 
opHandle.identifier.toString)
   }
+
+  override private[kyuubi] def isEngineContextStopped = {
+    spark.sparkContext.isStopped
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 7751b7298e..217cfc707c 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -345,10 +345,15 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
     if (idleTimeout > 0) {
       val checkTask = new Runnable {
         override def run(): Unit = {
-          if (!shutdown && System.currentTimeMillis() - latestLogoutTime > 
idleTimeout &&
-            getActiveUserSessionCount <= 0) {
-            info(s"Idled for more than $idleTimeout ms, terminating")
-            stop()
+          if (!shutdown) {
+            if (System.currentTimeMillis() - latestLogoutTime > idleTimeout &&
+              getActiveUserSessionCount <= 0) {
+              info(s"Idled for more than $idleTimeout ms, terminating")
+              stop()
+            } else if (isEngineContextStopped) {
+              error(s"Engine's SparkContext is stopped, terminating")
+              stop()
+            }
           }
         }
       }
@@ -360,4 +365,6 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
         TimeUnit.MILLISECONDS)
     }
   }
+
+  private[kyuubi] def isEngineContextStopped: Boolean = false
 }

Reply via email to