[
https://issues.apache.org/jira/browse/KYLIN-5271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xiaoxiang Yu resolved KYLIN-5271.
---------------------------------
Resolution: Fixed
> Query memory leaks
> ------------------
>
> Key: KYLIN-5271
> URL: https://issues.apache.org/jira/browse/KYLIN-5271
> Project: Kylin
> Issue Type: Bug
> Components: Query Engine
> Affects Versions: v4.0.1
> Reporter: Liu Zhao
> Priority: Major
> Fix For: v4.1.0
>
>
> The query thread will clone a SparkSession and put it into ThreadLocal.
> However, if an exception occurs in the Calcite To SparkPlan, the SparkSession
> in ThreadLocal will not be removed. More importantly, if the Spark restarts
> later, the SparkSession left in ThreadLocal will be unavailable, and the
> query on this thread will fail, throwing an exception: Caused by:
> java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
> This stopped SparkContext was created at:
> org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)
> java.lang.Thread.run(Thread.java:748)
> // put SparkSession toThreadLocal
> {code:java}
> object SparderContextFacade extends Logging {
> final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession,
> UdfManager]] =
> new InternalThreadLocal[Pair[SparkSession, UdfManager]]()
> def current(): Pair[SparkSession, UdfManager] = {
> if (CURRENT_SPARKSESSION.get() == null) {
> val spark = SparderContext.getOriginalSparkSession.cloneSession()
> CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
> UdfManager.createWithoutBuildInFunc(spark)))
> }
> CURRENT_SPARKSESSION.get()
> }
> def remove(): Unit = {
> CURRENT_SPARKSESSION.remove()
> }
> }
> {code}
> // remove SparkSession from ThreadLocal
> // org.apache.kylin.query.runtime.plans.ResultPlan
> {code:java}
> def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType)
> : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) {
> val result: Either[Enumerable[Array[Any]], Enumerable[Any]] =
> resultType match {
> case ResultType.NORMAL =>
> if (SparderContext.needCompute()) {
> Left(ResultPlan.collectEnumerable(df, rowType))
> } else {
> Left(Linq4j.asEnumerable(Array.empty[Array[Any]]))
> }
> case ResultType.SCALA =>
> if (SparderContext.needCompute()) {
> Right(ResultPlan.collectScalarEnumerable(df, rowType))
> } else {
> Right(Linq4j.asEnumerable(Lists.newArrayList[Any]()))
> }
> }
> SparderContext.cleanQueryInfo()
> SparderContext.closeThreadSparkSession()
> result
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)