Repository: carbondata
Updated Branches:
  refs/heads/master 65b69a9cd -> 7978b974d


[CARBONDATA-2181] Thread Leak during compaction processing on restructured table

Problem
Thread leak in compaction operation

Analysis
Compaction uses both query and data loading processes. During data laod 
operation during compaction new threads are spawned in sorting, merger and
data writer step using executor service. These threads are not getting closed 
in case comapciton fails or the operation is killed from spark UI
as observed by taking a thread dump after compaction failure.

Fix
Add a task completion listener in each compaction task which will close all the 
executor service instances as well as clean any other system
resources to prevent thread leak

This closes #2086


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7978b974
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7978b974
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7978b974

Branch: refs/heads/master
Commit: 7978b974d34b484d91a7707698e205703ab0b90c
Parents: 65b69a9
Author: manishgupta88 <[email protected]>
Authored: Wed Mar 21 12:08:01 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Thu Mar 22 13:22:16 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 29 ++++++++++++++++----
 .../merger/AbstractResultProcessor.java         |  6 ++++
 .../merger/CompactionResultSortProcessor.java   | 16 +++++++++++
 .../merger/RowResultMergerProcessor.java        |  8 ++++++
 4 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7978b974/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index da268c1..9ce4904 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -99,6 +99,7 @@ class CarbonMergerRDD[K, V](
       var mergeStatus = false
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
+      var processor: AbstractResultProcessor = null
       try {
 
 
@@ -175,6 +176,10 @@ class CarbonMergerRDD[K, V](
           carbonTable, dataFileMetadataSegMapping, restructuredBlockExists,
           new SparkDataTypeConverterImpl)
 
+        // add task completion listener to clean up the resources
+        context.addTaskCompletionListener { _ =>
+          close()
+        }
         // fire a query and get the results.
         var result2: java.util.List[RawResultIterator] = null
         try {
@@ -194,7 +199,6 @@ class CarbonMergerRDD[K, V](
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
           databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, 
true, false)
 
-        var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {
           LOGGER.info("CompactionResultSortProcessor flow is selected")
           processor = new CompactionResultSortProcessor(
@@ -223,9 +227,25 @@ class CarbonMergerRDD[K, V](
         case e: Exception =>
           LOGGER.error(e)
           throw e
-      } finally {
-        // delete temp location data
+      }
+
+      private def close(): Unit = {
+        deleteLocalDataFolders()
+        // close all the query executor service and clean up memory acquired 
during query processing
+        if (null != exec) {
+          LOGGER.info("Cleaning up query resources acquired during compaction")
+          exec.finish()
+        }
+        // clean up the resources for processor
+        if (null != processor) {
+          LOGGER.info("Closing compaction processor instance to clean up 
loading resources")
+          processor.close()
+        }
+      }
+
+      private def deleteLocalDataFolders(): Unit = {
         try {
+          LOGGER.info("Deleting local folder store location")
           val isCompactionFlow = true
           TableProcessingOperations
             .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
isCompactionFlow, false)
@@ -233,9 +253,6 @@ class CarbonMergerRDD[K, V](
           case e: Exception =>
             LOGGER.error(e)
         }
-        if (null != exec) {
-          exec.finish()
-        }
       }
 
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7978b974/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index a08177a..3f46a24 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -39,6 +39,12 @@ public abstract class AbstractResultProcessor {
    */
   public abstract boolean execute(List<RawResultIterator> resultIteratorList);
 
+  /**
+   * This method will be sued to clean up the resources and close all the 
spawned threads to avoid
+   * any kind of memory or thread leak
+   */
+  public abstract void close();
+
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
       CompactionType compactionType, CarbonFactDataHandlerModel 
carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7978b974/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 7435d73..e02f3ab 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -186,6 +186,22 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
     return isCompactionSuccess;
   }
 
+  @Override
+  public void close() {
+    // close the sorter executor service
+    if (null != sortDataRows) {
+      sortDataRows.close();
+    }
+    // close the final merger
+    if (null != finalMerger) {
+      finalMerger.close();
+    }
+    // close data handler
+    if (null != dataHandler) {
+      dataHandler.closeHandler();
+    }
+  }
+
   /**
    * This method will clean up the local folders and files created during 
compaction process
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7978b974/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 2616def..0430fd3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -180,6 +180,14 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
     return mergeStatus;
   }
 
+  @Override
+  public void close() {
+    // close data handler
+    if (null != dataHandler) {
+      dataHandler.closeHandler();
+    }
+  }
+
   /**
    * Below method will be used to add sorted row
    *

Reply via email to