Repository: carbondata Updated Branches: refs/heads/master 9b45c5b30 -> 88279ca55
[CARBONDATA-2391] [Compaction Thread Leak] Thread leak in compaction operation if prefetch is enabled and compaction process is killed Problem Thread leak in compaction operation if prefetch is enabled and compaction process is killed Analysis During compaction if prefetch is enabled RawResultIterator launches an executor service for prefetching the data. If compaction fails or the process is killed it can lead to thread leak due to executor service still in running state. Fix Close the executor service in task completion listener of compaction This closes #2217 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/88279ca5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/88279ca5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/88279ca5 Branch: refs/heads/master Commit: 88279ca556933e6a586b5e1e00637efdf26836d7 Parents: 9b45c5b Author: manishgupta88 <[email protected]> Authored: Mon Apr 23 20:29:56 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Apr 26 22:49:49 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/spark/rdd/CarbonMergerRDD.scala | 10 +++++----- .../processing/merger/CarbonCompactionExecutor.java | 11 +++++++++-- .../processing/merger/RowResultMergerProcessor.java | 2 ++ 3 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/88279ca5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 6cf8cb3..ee40aa9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -100,6 +100,7 @@ class CarbonMergerRDD[K, V]( var mergeNumber = "" var exec: CarbonCompactionExecutor = null var processor: AbstractResultProcessor = null + var rawResultIteratorList: java.util.List[RawResultIterator] = null try { @@ -180,10 +181,9 @@ class CarbonMergerRDD[K, V]( context.addTaskCompletionListener { _ => close() } - // fire a query and get the results. - var result2: java.util.List[RawResultIterator] = null try { - result2 = exec.processTableBlocks() + // fire a query and get the results. + rawResultIteratorList = exec.processTableBlocks() } catch { case e: Throwable => LOGGER.error(e) @@ -220,7 +220,7 @@ class CarbonMergerRDD[K, V]( carbonMergerMapping.campactionType, partitionSpec) } - mergeStatus = processor.execute(result2) + mergeStatus = processor.execute(rawResultIteratorList) mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber } catch { @@ -234,7 +234,7 @@ class CarbonMergerRDD[K, V]( // close all the query executor service and clean up memory acquired during query processing if (null != exec) { LOGGER.info("Cleaning up query resources acquired during compaction") - exec.finish() + exec.close(rawResultIteratorList) } // clean up the resources for processor if (null != processor) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/88279ca5/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index 6a6d834..20103b1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -176,13 +176,20 @@ public class CarbonCompactionExecutor { * Below method will be used * for cleanup */ - public void finish() { + public void close(List<RawResultIterator> rawResultIteratorList) { try { + // close all the iterators. Iterators might not closed in case of compaction failure + // or if process is killed + if (null != rawResultIteratorList) { + for (RawResultIterator rawResultIterator : rawResultIteratorList) { + rawResultIterator.close(); + } + } for (QueryExecutor queryExecutor : queryExecutorList) { queryExecutor.finish(); } } catch (QueryExecutionException e) { - LOGGER.error(e, "Problem while finish: "); + LOGGER.error(e, "Problem while close. Ignoring the exception"); } clearDictionaryFromQueryModel(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/88279ca5/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 2f06738..442f1c5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -116,6 +116,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { Object[] convertedRow = iterator.next(); if (null == convertedRow) { index--; + iterator.close(); continue; } if (!isDataPresent) { @@ -140,6 +141,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { while (true) { Object[] convertedRow = iterator.next(); if (null == convertedRow) { + iterator.close(); break; } // do it only once
