Repository: incubator-carbondata
Updated Branches:
  refs/heads/master b1347e913 -> 94cc9601e


Added validations in Unsafe dataload.

Fixed Style

Fixed Carbon Example


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/013db609
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/013db609
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/013db609

Branch: refs/heads/master
Commit: 013db609a51c4ca92aa24952ee92fccb2f365c22
Parents: b1347e9
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Tue Apr 18 14:43:41 2017 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Wed Apr 19 09:29:51 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/util/CarbonProperties.java  | 32 ++++++++++++++++++++
 .../impl/DictionaryDecodeReadSupport.java       |  2 +-
 .../sort/unsafe/UnsafeMemoryManager.java        | 30 +++++++++++++++---
 .../newflow/sort/unsafe/UnsafeSortDataRows.java | 21 +++++++++----
 .../sortandgroupby/sortdata/SortDataRows.java   |  1 +
 5 files changed, 75 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 8e0e5af..92c85a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -604,6 +604,38 @@ public final class CarbonProperties {
   }
 
   /**
+   * Get the sort chunk memory size
+   * @return
+   */
+  public int getSortMemoryChunkSizeInMB() {
+    int inMemoryChunkSizeInMB;
+    try {
+      inMemoryChunkSizeInMB = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB,
+              CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT));
+    } catch (Exception e) {
+      inMemoryChunkSizeInMB =
+          
Integer.parseInt(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT);
+      LOGGER.error("Problem in parsing the sort memory chunk size, setting 
with default value"
+          + inMemoryChunkSizeInMB);
+    }
+    if (inMemoryChunkSizeInMB > 1024) {
+      inMemoryChunkSizeInMB = 1024;
+      LOGGER.error(
+          "It is not recommended to increase the sort memory chunk size more 
than 1024MB, "
+              + "so setting the value to "
+              + inMemoryChunkSizeInMB);
+    } else if (inMemoryChunkSizeInMB < 1) {
+      inMemoryChunkSizeInMB = 1;
+      LOGGER.error(
+          "It is not recommended to decrease the sort memory chunk size less 
than 1MB, "
+              + "so setting the value to "
+              + inMemoryChunkSizeInMB);
+    }
+    return inMemoryChunkSizeInMB;
+  }
+
+  /**
    * Batch size of rows while sending data from one step to another in data 
loading.
    *
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
index 43953d0..79a1559 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
@@ -57,7 +57,7 @@ public class DictionaryDecodeReadSupport<T> implements 
CarbonReadSupport<T> {
     dataTypes = new DataType[carbonColumns.length];
     for (int i = 0; i < carbonColumns.length; i++) {
       if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && 
!carbonColumns[i]
-          .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          .hasEncoding(Encoding.DIRECT_DICTIONARY) && 
!carbonColumns[i].isComplex()) {
         CacheProvider cacheProvider = CacheProvider.getInstance();
         Cache<DictionaryColumnUniqueIdentifier, Dictionary> 
forwardDictionaryCache = cacheProvider
             .createCache(CacheType.FORWARD_DICTIONARY, 
absoluteTableIdentifier.getStorePath());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
index 6a3e165..af49978 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
@@ -33,9 +33,22 @@ public class UnsafeMemoryManager {
       LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
 
   static {
-    long size = Long.parseLong(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
-            CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+    long size;
+    try {
+      size = Long.parseLong(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+    } catch (Exception e) {
+      size = 
Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
+      LOGGER.info("Wrong memory size given, "
+          + "so setting default value to " + size);
+    }
+    if (size < 1024) {
+      size = 1024;
+      LOGGER.info("It is not recommended to keep unsafe memory size less than 
1024MB, "
+          + "so setting default value to " + size);
+    }
+
 
     boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -67,7 +80,16 @@ public class UnsafeMemoryManager {
   private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
     this.totalMemory = totalMemory;
     this.allocator = allocator;
-    minimumMemory = (long) (totalMemory * ((double) 10 / 100));
+    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    long sortMemoryChunkSize = 
CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
+    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
+    if (totalWorkingMemoryForAllThreads >= totalMemory) {
+      throw new RuntimeException("Working memory should be less than total 
memory configured, "
+          + "so either reduce the loading threads or increase the memory size. 
"
+          + "(Number of threads * number of threads) should be less than total 
unsafe memory");
+    }
+    minimumMemory = totalWorkingMemoryForAllThreads;
     LOGGER.info("Memory manager is created with size " + totalMemory + " with 
" + allocator
         + " and minimum reserve memory " + minimumMemory);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 9907509..cb24968 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -69,7 +70,7 @@ public class UnsafeSortDataRows {
 
   private final Object addRowsLock = new Object();
 
-  private int inMemoryChunkSizeInMB;
+  private long inMemoryChunkSize;
 
   private boolean enableInMemoryIntermediateMerge;
 
@@ -77,6 +78,11 @@ public class UnsafeSortDataRows {
 
   private long maxSizeAllowed;
 
+  /**
+   * semaphore which will used for managing sorted data object arrays
+   */
+  private Semaphore semaphore;
+
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger) {
     this.parameters = parameters;
@@ -86,9 +92,8 @@ public class UnsafeSortDataRows {
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
 
-    this.inMemoryChunkSizeInMB = 
Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB,
-            CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT));
+    this.inMemoryChunkSize = 
CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024;
     enableInMemoryIntermediateMerge = 
Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
             CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
@@ -106,7 +111,7 @@ public class UnsafeSortDataRows {
    * This method will be used to initialize
    */
   public void initialize() throws CarbonSortKeyAndGroupByException {
-    MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 
1024);
+    MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSize);
     this.rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
         parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
@@ -120,6 +125,7 @@ public class UnsafeSortDataRows {
     }
     this.dataSorterAndWriterExecutorService =
         Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
   public static MemoryBlock getMemoryBlock(long size) throws 
CarbonSortKeyAndGroupByException {
@@ -167,8 +173,9 @@ public class UnsafeSortDataRows {
               
unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
             }
             unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+            semaphore.acquire();
             dataSorterAndWriterExecutorService.submit(new 
DataSorterAndWriter(rowPage));
-            MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 
1024 * 1024);
+            MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
             boolean saveToDisk = 
!UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
             rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
                 parameters.getDimColCount() + 
parameters.getComplexDimColCount(),
@@ -317,6 +324,8 @@ public class UnsafeSortDataRows {
         }
       } catch (Throwable e) {
         threadStatusObserver.notifyFailed(e);
+      } finally {
+        semaphore.release();
       }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 3a7a579..ffaa566 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -158,6 +158,7 @@ public class SortDataRows {
           System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, 
sizeLeft);
         }
         try {
+          semaphore.acquire();
           dataSorterAndWriterExecutorService.submit(new 
DataSorterAndWriter(recordHolderListLocal));
         } catch (Exception e) {
           LOGGER.error(

Reply via email to