Repository: carbondata Updated Branches: refs/heads/master 910f26171 -> 3fb406618
[CARBONDATA-2237] Removing parsers thread local objects after parsing of carbon query This closes #2040 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fb40661 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fb40661 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fb40661 Branch: refs/heads/master Commit: 3fb406618b3cd68be680ae217f33478d87b74eb8 Parents: 910f261 Author: ravipesala <ravi.pes...@gmail.com> Authored: Wed Mar 7 16:58:30 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Thu Mar 8 10:42:12 2018 +0530 ---------------------------------------------------------------------- .../carbondata/spark/util/CarbonScalaUtil.scala | 42 ++++++++++++++++++++ .../sql/parser/CarbonSpark2SqlParser.scala | 21 +++++----- 2 files changed, 54 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fb40661/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 33263d6..773ea16 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.spark.util import java.{lang, util} +import java.lang.ref.Reference import java.nio.charset.Charset import java.text.SimpleDateFormat import java.util.Date @@ -585,4 +586,45 @@ object CarbonScalaUtil { String.valueOf(Math.pow(10, 5).toInt + taskId) + String.valueOf(partitionNumber + Math.pow(10, 5).toInt) } + + /** + * Use reflection to clean the parser objects which are set in thread local to avoid memory issue + */ + def cleanParserThreadLocals(): Unit = { + try { + // Get a reference to the thread locals table of the current thread + val thread = Thread.currentThread + val threadLocalsField = classOf[Thread].getDeclaredField("inheritableThreadLocals") + threadLocalsField.setAccessible(true) + val threadLocalTable = threadLocalsField.get(thread) + // Get a reference to the array holding the thread local variables inside the + // ThreadLocalMap of the current thread + val threadLocalMapClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap") + val tableField = threadLocalMapClass.getDeclaredField("table") + tableField.setAccessible(true) + val table = tableField.get(threadLocalTable) + // The key to the ThreadLocalMap is a WeakReference object. The referent field of this object + // is a reference to the actual ThreadLocal variable + val referentField = classOf[Reference[Thread]].getDeclaredField("referent") + referentField.setAccessible(true) + var i = 0 + while (i < lang.reflect.Array.getLength(table)) { + // Each entry in the table array of ThreadLocalMap is an Entry object + val entry = lang.reflect.Array.get(table, i) + if (entry != null) { + // Get a reference to the thread local object and remove it from the table + val threadLocal = referentField.get(entry).asInstanceOf[ThreadLocal[_]] + if (threadLocal != null && + threadLocal.getClass.getName.startsWith("scala.util.DynamicVariable")) { + threadLocal.remove() + } + } + i += 1 + } + table + } catch { + case e: Exception => + // ignore it + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fb40661/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 86790ba..3896ce9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -39,7 +39,7 @@ import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} /** * TODO remove the duplicate code and add the common methods to common class. @@ -52,16 +52,19 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { // Initialize the Keywords. initLexical phrase(start)(new lexical.Scanner(input)) match { - case Success(plan, _) => plan match { - case x: CarbonLoadDataCommand => - x.inputSqlString = input - x - case x: CarbonAlterTableCompactionCommand => - x.alterTableModel.alterSql = input - x - case logicalPlan => logicalPlan + case Success(plan, _) => + CarbonScalaUtil.cleanParserThreadLocals() + plan match { + case x: CarbonLoadDataCommand => + x.inputSqlString = input + x + case x: CarbonAlterTableCompactionCommand => + x.alterTableModel.alterSql = input + x + case logicalPlan => logicalPlan } case failureOrError => + CarbonScalaUtil.cleanParserThreadLocals() CarbonException.analysisException(failureOrError.toString) } }