Repository: incubator-carbondata Updated Branches: refs/heads/master b1347e913 -> 94cc9601e
Added validations in Unsafe dataload. Fixed Style Fixed Carbon Example Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/013db609 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/013db609 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/013db609 Branch: refs/heads/master Commit: 013db609a51c4ca92aa24952ee92fccb2f365c22 Parents: b1347e9 Author: ravipesala <ravi.pes...@gmail.com> Authored: Tue Apr 18 14:43:41 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Wed Apr 19 09:29:51 2017 +0800 ---------------------------------------------------------------------- .../carbondata/core/util/CarbonProperties.java | 32 ++++++++++++++++++++ .../impl/DictionaryDecodeReadSupport.java | 2 +- .../sort/unsafe/UnsafeMemoryManager.java | 30 +++++++++++++++--- .../newflow/sort/unsafe/UnsafeSortDataRows.java | 21 +++++++++---- .../sortandgroupby/sortdata/SortDataRows.java | 1 + 5 files changed, 75 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 8e0e5af..92c85a1 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -604,6 +604,38 @@ public final class CarbonProperties { } /** + * Get the sort chunk memory size + * @return + */ + public int getSortMemoryChunkSizeInMB() { + int inMemoryChunkSizeInMB; + try { + inMemoryChunkSizeInMB = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB, + CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT)); + } catch (Exception e) { + inMemoryChunkSizeInMB = + Integer.parseInt(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT); + LOGGER.error("Problem in parsing the sort memory chunk size, setting with default value" + + inMemoryChunkSizeInMB); + } + if (inMemoryChunkSizeInMB > 1024) { + inMemoryChunkSizeInMB = 1024; + LOGGER.error( + "It is not recommended to increase the sort memory chunk size more than 1024MB, " + + "so setting the value to " + + inMemoryChunkSizeInMB); + } else if (inMemoryChunkSizeInMB < 1) { + inMemoryChunkSizeInMB = 1; + LOGGER.error( + "It is not recommended to decrease the sort memory chunk size less than 1MB, " + + "so setting the value to " + + inMemoryChunkSizeInMB); + } + return inMemoryChunkSizeInMB; + } + + /** * Batch size of rows while sending data from one step to another in data loading. * * @return http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java index 43953d0..79a1559 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java @@ -57,7 +57,7 @@ public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> { dataTypes = new DataType[carbonColumns.length]; for (int i = 0; i < carbonColumns.length; i++) { if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i] - .hasEncoding(Encoding.DIRECT_DICTIONARY)) { + .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) { CacheProvider cacheProvider = CacheProvider.getInstance(); Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath()); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java index 6a3e165..af49978 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java @@ -33,9 +33,22 @@ public class UnsafeMemoryManager { LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName()); static { - long size = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, - CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)); + long size; + try { + size = Long.parseLong(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)); + } catch (Exception e) { + size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); + LOGGER.info("Wrong memory size given, " + + "so setting default value to " + size); + } + if (size < 1024) { + size = 1024; + LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, " + + "so setting default value to " + size); + } + boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, @@ -67,7 +80,16 @@ public class UnsafeMemoryManager { private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) { this.totalMemory = totalMemory; this.allocator = allocator; - minimumMemory = (long) (totalMemory * ((double) 10 / 100)); + long numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); + sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024; + long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores; + if (totalWorkingMemoryForAllThreads >= totalMemory) { + throw new RuntimeException("Working memory should be less than total memory configured, " + + "so either reduce the loading threads or increase the memory size. " + + "(Number of threads * number of threads) should be less than total unsafe memory"); + } + minimumMemory = totalWorkingMemoryForAllThreads; LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator + " and minimum reserve memory " + minimumMemory); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index 9907509..cb24968 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; @@ -69,7 +70,7 @@ public class UnsafeSortDataRows { private final Object addRowsLock = new Object(); - private int inMemoryChunkSizeInMB; + private long inMemoryChunkSize; private boolean enableInMemoryIntermediateMerge; @@ -77,6 +78,11 @@ public class UnsafeSortDataRows { private long maxSizeAllowed; + /** + * semaphore which will used for managing sorted data object arrays + */ + private Semaphore semaphore; + public UnsafeSortDataRows(SortParameters parameters, UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger) { this.parameters = parameters; @@ -86,9 +92,8 @@ public class UnsafeSortDataRows { // observer of writing file in thread this.threadStatusObserver = new ThreadStatusObserver(); - this.inMemoryChunkSizeInMB = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB, - CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT)); + this.inMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); + this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024; enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT, CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT)); @@ -106,7 +111,7 @@ public class UnsafeSortDataRows { * This method will be used to initialize */ public void initialize() throws CarbonSortKeyAndGroupByException { - MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024); + MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSize); this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), parameters.getDimColCount() + parameters.getComplexDimColCount(), parameters.getMeasureColCount(), parameters.getAggType(), baseBlock, @@ -120,6 +125,7 @@ public class UnsafeSortDataRows { } this.dataSorterAndWriterExecutorService = Executors.newFixedThreadPool(parameters.getNumberOfCores()); + semaphore = new Semaphore(parameters.getNumberOfCores()); } public static MemoryBlock getMemoryBlock(long size) throws CarbonSortKeyAndGroupByException { @@ -167,8 +173,9 @@ public class UnsafeSortDataRows { unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible(); } unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); + semaphore.acquire(); dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); - MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024); + MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize); boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable(); rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), parameters.getDimColCount() + parameters.getComplexDimColCount(), @@ -317,6 +324,8 @@ public class UnsafeSortDataRows { } } catch (Throwable e) { threadStatusObserver.notifyFailed(e); + } finally { + semaphore.release(); } return null; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/013db609/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index 3a7a579..ffaa566 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -158,6 +158,7 @@ public class SortDataRows { System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); } try { + semaphore.acquire(); dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal)); } catch (Exception e) { LOGGER.error(