Repository: carbondata Updated Branches: refs/heads/master bb09baca5 -> 4a37e05ca
[CARBONDATA-2751] Fixed Thread leak issue in data loading and Compatibility issue ### Problem: Thread leak when user is killing data loading process from UI NPE when user is querying old store. ### Solution When carbondata file writing is in progress during data loading and user is killing it from UI Producer and consumer thread are not getting shutdown. Need to handle the same in close method Old store (V1/V2) does not have datachunk3 object so while filling the local dictionary it is checking whether local dictionary is present in datachunk3 or not but datachunk3 null check is missing This closes #2521 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a37e05c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a37e05c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a37e05c Branch: refs/heads/master Commit: 4a37e05caeda04e96ede8b7f7ec4c58891e8f172 Parents: bb09bac Author: kumarvishal09 <[email protected]> Authored: Wed Jul 18 13:06:36 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Jul 19 23:02:41 2018 +0530 ---------------------------------------------------------------------- .../chunk/impl/DimensionRawColumnChunk.java | 3 ++- .../datastore/page/VarLengthColumnPageBase.java | 2 ++ .../steps/DataWriterBatchProcessorStepImpl.java | 22 ++++++++++++++++---- .../steps/DataWriterProcessorStepImpl.java | 21 +++++++++++++++++-- .../loading/steps/InputProcessorStepImpl.java | 4 +++- 5 files changed, 44 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java index 0c0b6b7..d645e08 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java @@ -141,7 +141,8 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk { } public CarbonDictionary getLocalDictionary() { - if (null != getDataChunkV3().local_dictionary && null == localDictionary) { + if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary + && null == localDictionary) { try { localDictionary = getDictionary(getDataChunkV3().local_dictionary); } catch (IOException | MemoryException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index 55877c3..4edd201 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -172,6 +172,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { // set total length and rowOffset in page page.totalLength = offset; + page.rowOffset.freeMemory(); page.rowOffset = rowOffset; for (int i = 0; i < rowId; i++) { page.putBytes(i, lvEncodedBytes, i * size, size); @@ -253,6 +254,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { // set total length and rowOffset in page page.totalLength = offset; + page.rowOffset.freeMemory(); page.rowOffset = rowOffset; // set data in page http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java index 71a624e..5663811 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java @@ -50,6 +50,9 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName()); private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap; + + private CarbonFactHandler carbonFactHandler; + public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) { super(configuration, child); @@ -91,11 +94,12 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++, listener); model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap); - CarbonFactHandler dataHandler = CarbonFactHandlerFactory + this.carbonFactHandler = CarbonFactHandlerFactory .createCarbonFactHandler(model); - dataHandler.initialise(); - processBatch(next, dataHandler); - finish(tableName, dataHandler); + carbonFactHandler.initialise(); + processBatch(next, carbonFactHandler); + finish(tableName, carbonFactHandler); + this.carbonFactHandler = null; } } i++; @@ -152,4 +156,14 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS batch.close(); rowCounter.getAndAdd(batchSize); } + + @Override public void close() { + if (!closed) { + super.close(); + if (null != this.carbonFactHandler) { + carbonFactHandler.finish(); + carbonFactHandler.closeHandler(); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index caf121f..cc038b9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,7 +63,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { private DataMapWriterListener listener; - private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap; + private final Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap; + + private ExecutorService rangeExecutorService; + + private List<CarbonFactHandler> carbonFactHandlers; public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) { @@ -80,6 +85,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { @Override public void initialize() throws IOException { super.initialize(); child.initialize(); + this.carbonFactHandlers = new CopyOnWriteArrayList<>(); } private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { @@ -111,7 +117,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); - ExecutorService rangeExecutorService = Executors.newFixedThreadPool(iterators.length, + rangeExecutorService = Executors.newFixedThreadPool(iterators.length, new CarbonThreadFactory("WriterForwardPool: " + tableName)); List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length); int i = 0; @@ -183,6 +189,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { rowsNotExist = false; dataHandler = CarbonFactHandlerFactory .createCarbonFactHandler(model); + carbonFactHandlers.add(dataHandler); dataHandler.initialise(); } processBatch(insideRangeIterator.next(), dataHandler); @@ -190,6 +197,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { if (!rowsNotExist) { finish(dataHandler); } + carbonFactHandlers.remove(dataHandler); } public void finish(CarbonFactHandler dataHandler) { @@ -267,6 +275,15 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { // ignoring the exception } } + if (null != rangeExecutorService) { + rangeExecutorService.shutdownNow(); + } + if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) { + for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) { + carbonFactHandler.finish(); + carbonFactHandler.closeHandler(); + } + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java index c9f5fcc..50c0215 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java @@ -88,7 +88,9 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { @Override public void close() { if (!closed) { super.close(); - executorService.shutdown(); + if (null != executorService) { + executorService.shutdownNow(); + } for (CarbonIterator inputIterator : inputIterators) { inputIterator.close(); }
