Repository: carbondata
Updated Branches:
  refs/heads/master 9b45c5b30 -> 88279ca55


[CARBONDATA-2391] [Compaction Thread Leak] Thread leak in compaction operation 
if prefetch is enabled and compaction process is killed

Problem
Thread leak in compaction operation if prefetch is enabled and compaction 
process is killed

Analysis
During compaction if prefetch is enabled RawResultIterator launches an executor 
service for prefetching the data.
If compaction fails or the process is killed it can lead to thread leak due to 
executor service still in running state.

Fix
Close the executor service in task completion listener of compaction

This closes #2217


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/88279ca5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/88279ca5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/88279ca5

Branch: refs/heads/master
Commit: 88279ca556933e6a586b5e1e00637efdf26836d7
Parents: 9b45c5b
Author: manishgupta88 <[email protected]>
Authored: Mon Apr 23 20:29:56 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Thu Apr 26 22:49:49 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/spark/rdd/CarbonMergerRDD.scala    | 10 +++++-----
 .../processing/merger/CarbonCompactionExecutor.java      | 11 +++++++++--
 .../processing/merger/RowResultMergerProcessor.java      |  2 ++
 3 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/88279ca5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 6cf8cb3..ee40aa9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -100,6 +100,7 @@ class CarbonMergerRDD[K, V](
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
       var processor: AbstractResultProcessor = null
+      var rawResultIteratorList: java.util.List[RawResultIterator] = null
       try {
 
 
@@ -180,10 +181,9 @@ class CarbonMergerRDD[K, V](
         context.addTaskCompletionListener { _ =>
           close()
         }
-        // fire a query and get the results.
-        var result2: java.util.List[RawResultIterator] = null
         try {
-          result2 = exec.processTableBlocks()
+          // fire a query and get the results.
+          rawResultIteratorList = exec.processTableBlocks()
         } catch {
           case e: Throwable =>
             LOGGER.error(e)
@@ -220,7 +220,7 @@ class CarbonMergerRDD[K, V](
               carbonMergerMapping.campactionType,
               partitionSpec)
         }
-        mergeStatus = processor.execute(result2)
+        mergeStatus = processor.execute(rawResultIteratorList)
         mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + 
mergeNumber
 
       } catch {
@@ -234,7 +234,7 @@ class CarbonMergerRDD[K, V](
         // close all the query executor service and clean up memory acquired 
during query processing
         if (null != exec) {
           LOGGER.info("Cleaning up query resources acquired during compaction")
-          exec.finish()
+          exec.close(rawResultIteratorList)
         }
         // clean up the resources for processor
         if (null != processor) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/88279ca5/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 6a6d834..20103b1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -176,13 +176,20 @@ public class CarbonCompactionExecutor {
    * Below method will be used
    * for cleanup
    */
-  public void finish() {
+  public void close(List<RawResultIterator> rawResultIteratorList) {
     try {
+      // close all the iterators. Iterators might not closed in case of 
compaction failure
+      // or if process is killed
+      if (null != rawResultIteratorList) {
+        for (RawResultIterator rawResultIterator : rawResultIteratorList) {
+          rawResultIterator.close();
+        }
+      }
       for (QueryExecutor queryExecutor : queryExecutorList) {
         queryExecutor.finish();
       }
     } catch (QueryExecutionException e) {
-      LOGGER.error(e, "Problem while finish: ");
+      LOGGER.error(e, "Problem while close. Ignoring the exception");
     }
     clearDictionaryFromQueryModel();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/88279ca5/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 2f06738..442f1c5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -116,6 +116,7 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
         Object[] convertedRow = iterator.next();
         if (null == convertedRow) {
           index--;
+          iterator.close();
           continue;
         }
         if (!isDataPresent) {
@@ -140,6 +141,7 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
       while (true) {
         Object[] convertedRow = iterator.next();
         if (null == convertedRow) {
+          iterator.close();
           break;
         }
         // do it only once

Reply via email to