Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 2c5ecfbfe -> 8b105a1e1


[CARBONDATA-2143] Fixed query memory leak issue for task failure during 
initialization of record reader

Problem:
Whenever a query is executed, in the internalCompute method of CarbonScanRdd 
class record reader is initialized. A task completion listener is attached to 
each task after initialization of the record reader.
During record reader initialization, queryResultIterator is initialized and one 
blocklet is processed. The blocklet processed will use available unsafe memory.
Lets say there are 100 columns and 80 columns get the space but there is no 
space left for the remaining columns to be stored in the unsafe memory. This 
will result is memory exception and record reader initialization will fail 
leading to failure in query.
In the above case the unsafe memory allocated for 80 columns will not be freed 
and will always remain occupied till the JVM process persists.

Impact
It is memory leak in the system and can lead to query failures for queries 
executed after one one query fails due to the above reason.

Solution:
Attach the task completion listener before record reader initialization so that 
if the query fails at the very first instance after using unsafe memory, still 
that memory will be cleared.

This closes #1948


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b105a1e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b105a1e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b105a1e

Branch: refs/heads/branch-1.3
Commit: 8b105a1e1f6e7e7e3b0bc13d44c1bf93fd821e31
Parents: 2c5ecfb
Author: m00258959 <manish.gu...@huawei.com>
Authored: Wed Feb 7 12:07:33 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Feb 8 22:54:48 2018 +0530

----------------------------------------------------------------------
 .../executor/impl/AbstractQueryExecutor.java    | 14 +++++++-
 .../carbondata/hadoop/AbstractRecordReader.java |  8 +++--
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 38 +++++++++++---------
 3 files changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b105a1e/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6875f35..6490694 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -586,9 +586,17 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
    */
   @Override public void finish() throws QueryExecutionException {
     CarbonUtil.clearBlockCache(queryProperties.dataBlocks);
+    Throwable exceptionOccurred = null;
     if (null != queryIterator) {
-      queryIterator.close();
+      // catch if there is any exception so that it can be rethrown after 
clearing all the resources
+      // else if any exception is thrown from this point executor service will 
not be terminated
+      try {
+        queryIterator.close();
+      } catch (Throwable e) {
+        exceptionOccurred = e;
+      }
     }
+    // clear all the unsafe memory used for the given task ID
     
UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
     if (null != queryProperties.executorService) {
       // In case of limit query when number of limit records is already found 
so executors
@@ -596,6 +604,10 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
       // the query performance.
       queryProperties.executorService.shutdownNow();
     }
+    // if there is any exception re throw the exception
+    if (null != exceptionOccurred) {
+      throw new QueryExecutionException(exceptionOccurred);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b105a1e/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
index 62a97f9..bd4bbce 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
@@ -36,8 +36,10 @@ public abstract class AbstractRecordReader<T> extends 
RecordReader<Void, T> {
    */
   public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) 
{
     // result size
-    QueryStatistic queryStatistic = new QueryStatistic();
-    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, 
recordCount);
-    recorder.recordStatistics(queryStatistic);
+    if (null != recorder) {
+      QueryStatistic queryStatistic = new QueryStatistic();
+      queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, 
recordCount);
+      recorder.recordStatistics(queryStatistic);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b105a1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 49c0225..102c6c8 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -362,18 +362,21 @@ class CarbonScanRDD(
           }
       }
 
+      // add task completion before calling initialize as initialize method 
will internally call
+      // for usage of unsafe method for processing of one blocklet and if 
there is any exception
+      // while doing that the unsafe memory occupied for that task will not 
get cleared
+      context.addTaskCompletionListener { _ =>
+        reader.close()
+        close()
+        logStatistics(queryStartTime, model.getStatisticsRecorder)
+      }
+      // initialize the reader
       reader.initialize(inputSplit, attemptContext)
 
       new Iterator[Any] {
         private var havePair = false
         private var finished = false
 
-        context.addTaskCompletionListener { _ =>
-          reader.close()
-          close()
-          logStatistics(queryStartTime, model.getStatisticsRecorder)
-        }
-
         override def hasNext: Boolean = {
           if (context.isInterrupted) {
             throw new TaskKilledException
@@ -394,10 +397,6 @@ class CarbonScanRDD(
           value
         }
 
-        private def close() {
-          
TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
-          inputMetricsStats.updateAndClose()
-        }
       }
     } else {
       new Iterator[Any] {
@@ -411,6 +410,11 @@ class CarbonScanRDD(
     iterator.asInstanceOf[Iterator[InternalRow]]
   }
 
+  private def close() {
+    TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
+    inputMetricsStats.updateAndClose()
+  }
+
   def prepareInputFormatForDriver(conf: Configuration): 
CarbonTableInputFormat[Object] = {
     CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
@@ -456,12 +460,14 @@ class CarbonScanRDD(
   }
 
   def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): 
Unit = {
-    val queryStatistic = new QueryStatistic()
-    
queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-      System.currentTimeMillis - queryStartTime)
-    recorder.recordStatistics(queryStatistic)
-    // print executor query statistics for each task_id
-    recorder.logStatisticsAsTableExecutor()
+    if (null != recorder) {
+      val queryStatistic = new QueryStatistic()
+      
queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+        System.currentTimeMillis - queryStartTime)
+      recorder.recordStatistics(queryStatistic)
+      // print executor query statistics for each task_id
+      recorder.logStatisticsAsTableExecutor()
+    }
   }
 
   /**

Reply via email to