Repository: carbondata
Updated Branches:
  refs/heads/master bb09baca5 -> 4a37e05ca


[CARBONDATA-2751] Fixed Thread leak issue in data loading and Compatibility 
issue

### Problem:

Thread leak when user is killing data loading process from UI

NPE when user is querying old store.

### Solution

When carbondata file writing is in progress during data loading and user is 
killing it from UI Producer and consumer thread are not getting shutdown. Need 
to handle the same in close method

Old store (V1/V2) does not have datachunk3 object so while filling the local 
dictionary it is checking whether local dictionary is present in datachunk3 or 
not but datachunk3 null check is missing

This closes #2521


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a37e05c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a37e05c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a37e05c

Branch: refs/heads/master
Commit: 4a37e05caeda04e96ede8b7f7ec4c58891e8f172
Parents: bb09bac
Author: kumarvishal09 <[email protected]>
Authored: Wed Jul 18 13:06:36 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Thu Jul 19 23:02:41 2018 +0530

----------------------------------------------------------------------
 .../chunk/impl/DimensionRawColumnChunk.java     |  3 ++-
 .../datastore/page/VarLengthColumnPageBase.java |  2 ++
 .../steps/DataWriterBatchProcessorStepImpl.java | 22 ++++++++++++++++----
 .../steps/DataWriterProcessorStepImpl.java      | 21 +++++++++++++++++--
 .../loading/steps/InputProcessorStepImpl.java   |  4 +++-
 5 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 0c0b6b7..d645e08 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -141,7 +141,8 @@ public class DimensionRawColumnChunk extends 
AbstractRawColumnChunk {
   }
 
   public CarbonDictionary getLocalDictionary() {
-    if (null != getDataChunkV3().local_dictionary && null == localDictionary) {
+    if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary
+        && null == localDictionary) {
       try {
         localDictionary = getDictionary(getDataChunkV3().local_dictionary);
       } catch (IOException | MemoryException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 55877c3..4edd201 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -172,6 +172,7 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
 
     // set total length and rowOffset in page
     page.totalLength = offset;
+    page.rowOffset.freeMemory();
     page.rowOffset = rowOffset;
     for (int i = 0; i < rowId; i++) {
       page.putBytes(i, lvEncodedBytes, i * size, size);
@@ -253,6 +254,7 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
 
     // set total length and rowOffset in page
     page.totalLength = offset;
+    page.rowOffset.freeMemory();
     page.rowOffset = rowOffset;
 
     // set data in page

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 71a624e..5663811 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -50,6 +50,9 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
       
LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
 
   private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
+
+  private CarbonFactHandler carbonFactHandler;
+
   public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration 
configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
@@ -91,11 +94,12 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
             CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
                 .createCarbonFactDataHandlerModel(configuration, 
storeLocation, i, k++, listener);
             model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap);
-            CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+            this.carbonFactHandler = CarbonFactHandlerFactory
                 .createCarbonFactHandler(model);
-            dataHandler.initialise();
-            processBatch(next, dataHandler);
-            finish(tableName, dataHandler);
+            carbonFactHandler.initialise();
+            processBatch(next, carbonFactHandler);
+            finish(tableName, carbonFactHandler);
+            this.carbonFactHandler = null;
           }
         }
         i++;
@@ -152,4 +156,14 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
     batch.close();
     rowCounter.getAndAdd(batchSize);
   }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      if (null != this.carbonFactHandler) {
+        carbonFactHandler.finish();
+        carbonFactHandler.closeHandler();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index caf121f..cc038b9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -62,7 +63,11 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
   private DataMapWriterListener listener;
 
-  private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
+  private final Map<String, LocalDictionaryGenerator> 
localDictionaryGeneratorMap;
+
+  private ExecutorService rangeExecutorService;
+
+  private List<CarbonFactHandler> carbonFactHandlers;
 
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
@@ -80,6 +85,7 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
   @Override public void initialize() throws IOException {
     super.initialize();
     child.initialize();
+    this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
   private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
@@ -111,7 +117,7 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
-      ExecutorService rangeExecutorService = 
Executors.newFixedThreadPool(iterators.length,
+      rangeExecutorService = Executors.newFixedThreadPool(iterators.length,
           new CarbonThreadFactory("WriterForwardPool: " + tableName));
       List<Future<Void>> rangeExecutorServiceSubmitList = new 
ArrayList<>(iterators.length);
       int i = 0;
@@ -183,6 +189,7 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
         rowsNotExist = false;
         dataHandler = CarbonFactHandlerFactory
             .createCarbonFactHandler(model);
+        carbonFactHandlers.add(dataHandler);
         dataHandler.initialise();
       }
       processBatch(insideRangeIterator.next(), dataHandler);
@@ -190,6 +197,7 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
     if (!rowsNotExist) {
       finish(dataHandler);
     }
+    carbonFactHandlers.remove(dataHandler);
   }
 
   public void finish(CarbonFactHandler dataHandler) {
@@ -267,6 +275,15 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
           // ignoring the exception
         }
       }
+      if (null != rangeExecutorService) {
+        rangeExecutorService.shutdownNow();
+      }
+      if (null != this.carbonFactHandlers && 
!this.carbonFactHandlers.isEmpty()) {
+        for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
+          carbonFactHandler.finish();
+          carbonFactHandler.closeHandler();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a37e05c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index c9f5fcc..50c0215 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -88,7 +88,9 @@ public class InputProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
   @Override public void close() {
     if (!closed) {
       super.close();
-      executorService.shutdown();
+      if (null != executorService) {
+        executorService.shutdownNow();
+      }
       for (CarbonIterator inputIterator : inputIterators) {
         inputIterator.close();
       }

Reply via email to