[ 
https://issues.apache.org/jira/browse/KYLIN-5271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoxiang Yu resolved KYLIN-5271.
---------------------------------
    Resolution: Fixed

> Query memory leaks
> ------------------
>
>                 Key: KYLIN-5271
>                 URL: https://issues.apache.org/jira/browse/KYLIN-5271
>             Project: Kylin
>          Issue Type: Bug
>          Components: Query Engine
>    Affects Versions: v4.0.1
>            Reporter: Liu Zhao
>            Priority: Major
>             Fix For: v4.1.0
>
>
> The query thread will clone a SparkSession and put it into ThreadLocal. 
> However, if an exception occurs in the Calcite To SparkPlan, the SparkSession 
> in ThreadLocal will not be removed. More importantly, if the Spark restarts 
> later, the SparkSession left in ThreadLocal will be unavailable, and the 
> query on this thread will fail, throwing an exception: Caused by: 
> java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
> This stopped SparkContext was created at:
> org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)
> java.lang.Thread.run(Thread.java:748)
> // put SparkSession toThreadLocal
> {code:java}
> object SparderContextFacade extends Logging {
>   final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession, 
> UdfManager]] =
>     new InternalThreadLocal[Pair[SparkSession, UdfManager]]()
>   def current(): Pair[SparkSession, UdfManager] = {
>     if (CURRENT_SPARKSESSION.get() == null) {
>       val spark = SparderContext.getOriginalSparkSession.cloneSession()
>       CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
>         UdfManager.createWithoutBuildInFunc(spark)))
>     }
>     CURRENT_SPARKSESSION.get()
>   }
>   def remove(): Unit = {
>     CURRENT_SPARKSESSION.remove()
>   }
> }
> {code}
> // remove SparkSession from ThreadLocal
> // org.apache.kylin.query.runtime.plans.ResultPlan
> {code:java}
>     def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType)
>   : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) {
>     val result: Either[Enumerable[Array[Any]], Enumerable[Any]] =
>       resultType match {
>         case ResultType.NORMAL =>
>           if (SparderContext.needCompute()) {
>             Left(ResultPlan.collectEnumerable(df, rowType))
>           } else {
>             Left(Linq4j.asEnumerable(Array.empty[Array[Any]]))
>           }
>         case ResultType.SCALA =>
>           if (SparderContext.needCompute()) {
>             Right(ResultPlan.collectScalarEnumerable(df, rowType))
>           } else {
>             Right(Linq4j.asEnumerable(Lists.newArrayList[Any]()))
>           }
>       }
>     SparderContext.cleanQueryInfo()
>     SparderContext.closeThreadSparkSession()
>     result
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to