Repository: carbondata
Updated Branches:
  refs/heads/master 910f26171 -> 3fb406618


[CARBONDATA-2237] Removing parsers thread local objects after parsing of carbon 
query

This closes #2040


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fb40661
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fb40661
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fb40661

Branch: refs/heads/master
Commit: 3fb406618b3cd68be680ae217f33478d87b74eb8
Parents: 910f261
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Wed Mar 7 16:58:30 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Thu Mar 8 10:42:12 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/util/CarbonScalaUtil.scala | 42 ++++++++++++++++++++
 .../sql/parser/CarbonSpark2SqlParser.scala      | 21 +++++-----
 2 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fb40661/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 33263d6..773ea16 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.util
 
 import java.{lang, util}
+import java.lang.ref.Reference
 import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 import java.util.Date
@@ -585,4 +586,45 @@ object CarbonScalaUtil {
     String.valueOf(Math.pow(10, 5).toInt + taskId) +
     String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
   }
+
+  /**
+   * Use reflection to clean the parser objects which are set in thread local 
to avoid memory issue
+   */
+  def cleanParserThreadLocals(): Unit = {
+    try {
+      // Get a reference to the thread locals table of the current thread
+      val thread = Thread.currentThread
+      val threadLocalsField = 
classOf[Thread].getDeclaredField("inheritableThreadLocals")
+      threadLocalsField.setAccessible(true)
+      val threadLocalTable = threadLocalsField.get(thread)
+      // Get a reference to the array holding the thread local variables 
inside the
+      // ThreadLocalMap of the current thread
+      val threadLocalMapClass = 
Class.forName("java.lang.ThreadLocal$ThreadLocalMap")
+      val tableField = threadLocalMapClass.getDeclaredField("table")
+      tableField.setAccessible(true)
+      val table = tableField.get(threadLocalTable)
+      // The key to the ThreadLocalMap is a WeakReference object. The referent 
field of this object
+      // is a reference to the actual ThreadLocal variable
+      val referentField = 
classOf[Reference[Thread]].getDeclaredField("referent")
+      referentField.setAccessible(true)
+      var i = 0
+      while (i < lang.reflect.Array.getLength(table)) {
+        // Each entry in the table array of ThreadLocalMap is an Entry object
+        val entry = lang.reflect.Array.get(table, i)
+        if (entry != null) {
+          // Get a reference to the thread local object and remove it from the 
table
+          val threadLocal = 
referentField.get(entry).asInstanceOf[ThreadLocal[_]]
+          if (threadLocal != null &&
+              
threadLocal.getClass.getName.startsWith("scala.util.DynamicVariable")) {
+            threadLocal.remove()
+          }
+        }
+        i += 1
+      }
+      table
+    } catch {
+      case e: Exception =>
+        // ignore it
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fb40661/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 86790ba..3896ce9 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.CarbonReflectionUtils
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
 
 /**
  * TODO remove the duplicate code and add the common methods to common class.
@@ -52,16 +52,19 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       // Initialize the Keywords.
       initLexical
       phrase(start)(new lexical.Scanner(input)) match {
-        case Success(plan, _) => plan match {
-          case x: CarbonLoadDataCommand =>
-            x.inputSqlString = input
-            x
-          case x: CarbonAlterTableCompactionCommand =>
-            x.alterTableModel.alterSql = input
-            x
-          case logicalPlan => logicalPlan
+        case Success(plan, _) =>
+          CarbonScalaUtil.cleanParserThreadLocals()
+          plan match {
+            case x: CarbonLoadDataCommand =>
+              x.inputSqlString = input
+              x
+            case x: CarbonAlterTableCompactionCommand =>
+              x.alterTableModel.alterSql = input
+              x
+            case logicalPlan => logicalPlan
         }
         case failureOrError =>
+          CarbonScalaUtil.cleanParserThreadLocals()
           CarbonException.analysisException(failureOrError.toString)
       }
     }

Reply via email to