Repository: carbondata Updated Branches: refs/heads/master 69c634c5f -> df22368d9
[CARBONDATA-1318]Fixed Concurrent table data loading unsafe memory issue Fixed task cancellation leak issue Fixed task cleanup issue in data loading Fixed Concurrent table data loading unsafe memory issue @CarbonDataQA This closes#1185 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/df22368d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/df22368d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/df22368d Branch: refs/heads/master Commit: df22368d98f0390cc7f9c1289a81257adf4509a6 Parents: 69c634c Author: kumarvishal <[email protected]> Authored: Wed Jul 19 13:42:52 2017 +0530 Committer: Ravindra Pesala <[email protected]> Committed: Thu Jul 20 18:12:52 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 +- .../page/UnsafeFixLengthColumnPage.java | 14 +- .../page/UnsafeVarLengthColumnPage.java | 13 +- .../core/memory/IntPointerBuffer.java | 42 ++-- .../core/memory/UnsafeMemoryManager.java | 99 +++++--- .../core/memory/UnsafeSortMemoryManager.java | 249 +++++++++++++++++++ .../executor/impl/AbstractQueryExecutor.java | 3 + .../carbondata/core/util/CarbonTaskInfo.java | 40 +++ .../core/util/ThreadLocalTaskInfo.java | 33 +++ .../apache/carbondata/spark/rdd/CarbonRDD.scala | 5 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 24 +- .../carbondata/spark/util/CommonUtil.scala | 23 +- .../processing/csvload/CSVInputFormat.java | 3 + .../processing/newflow/DataLoadExecutor.java | 19 +- .../sort/unsafe/UnsafeCarbonRowPage.java | 32 ++- .../newflow/sort/unsafe/UnsafeSortDataRows.java | 54 +++- 16 files changed, 554 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index ccb6344..dfc2153 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1184,9 +1184,11 @@ public final class CarbonCommonConstants { public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT = "64"; @CarbonProperty public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB = "sort.inmemory.size.inmb"; - public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024"; + @CarbonProperty + public static final String UNSAFE_WORKING_MEMORY_IN_MB = "carbon.unsafe.working.memory.in.mb"; + public static final String UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT = "512"; /** * Sorts the data in batches and writes the batch data to store with index file. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index e76c2c4..5dcc685 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE; @@ -42,6 +43,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { // base offset of memoryBlock private long baseOffset; + private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); + private static final int byteBits = BYTE.getSizeBits(); private static final int shortBits = DataType.SHORT.getSizeBits(); private static final int intBits = DataType.INT.getSizeBits(); @@ -59,13 +62,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { case FLOAT: case DOUBLE: int size = pageSize << dataType.getSizeBits(); - memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size); + memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); break; case SHORT_INT: size = pageSize * 3; - memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size); + memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); break; @@ -302,7 +305,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { public void freeMemory() { if (memoryBlock != null) { - UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); memoryBlock = null; baseAddress = null; baseOffset = 0; @@ -360,13 +363,14 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { // use raw compression and copy to byte[] int inputSize = pageSize * dataType.getSizeInBytes(); int compressedMaxSize = compressor.maxCompressedLength(inputSize); - MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(compressedMaxSize); + MemoryBlock compressed = + UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize); long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset()); assert outSize < Integer.MAX_VALUE; byte[] output = new byte[(int) outSize]; CarbonUnsafe.unsafe.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output, CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize); - UnsafeMemoryManager.INSTANCE.freeMemory(compressed); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, compressed); return output; } else { return super.compress(compressor); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java index dd6abc5..0cd64db 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; // This extension uses unsafe memory to store page data, for variable length data type (string, // decimal) @@ -47,6 +48,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { private static final double FACTOR = 1.25; + private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); + /** * create a page * @param dataType data type @@ -55,7 +58,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException { super(dataType, pageSize); capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR); - memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity)); + memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity)); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); } @@ -69,7 +72,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException { super(dataType, pageSize); this.capacity = capacity; - memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity)); + memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity)); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); } @@ -77,7 +80,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { @Override public void freeMemory() { if (memoryBlock != null) { - UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); memoryBlock = null; baseAddress = null; baseOffset = 0; @@ -90,10 +93,10 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { private void ensureMemory(int requestSize) throws MemoryException { if (totalLength + requestSize > capacity) { int newSize = 2 * capacity; - MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(newSize); + MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize); CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset, newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity); - UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); memoryBlock = newBlock; baseAddress = newBlock.getBaseObject(); baseOffset = newBlock.getBaseOffset(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java index 0d604fd..dadb1e4 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java @@ -17,26 +17,32 @@ package org.apache.carbondata.core.memory; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; + /** * Holds the pointers for rows. */ public class IntPointerBuffer { + private static final LogService LOGGER = + LogServiceFactory.getLogService(IntPointerBuffer.class.getName()); + private int length; private int actualSize; private int[] pointerBlock; - private MemoryBlock baseBlock; - private MemoryBlock pointerMemoryBlock; - public IntPointerBuffer(MemoryBlock baseBlock) { + private long taskId; + + public IntPointerBuffer(long taskId) { // TODO can be configurable, it is initial size and it can grow automatically. this.length = 100000; pointerBlock = new int[length]; - this.baseBlock = baseBlock; + this.taskId = taskId; } public IntPointerBuffer(int length) { @@ -67,24 +73,25 @@ public class IntPointerBuffer { return pointerBlock[rowId]; } - public void loadToUnsafe() throws MemoryException { - pointerMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(pointerBlock.length * 4); - for (int i = 0; i < pointerBlock.length; i++) { - CarbonUnsafe.unsafe - .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4, - pointerBlock[i]); + public void loadToUnsafe() { + try { + pointerMemoryBlock = + UnsafeSortMemoryManager.allocateMemoryWithRetry(this.taskId, pointerBlock.length * 4); + for (int i = 0; i < pointerBlock.length; i++) { + CarbonUnsafe.unsafe + .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4, + pointerBlock[i]); + } + pointerBlock = null; + } catch (MemoryException e) { + LOGGER.warn("Not enough memory for allocating pointer buffer, sorting in heap"); } - pointerBlock = null; } public int getActualSize() { return actualSize; } - public MemoryBlock getBaseBlock() { - return baseBlock; - } - public int[] getPointerBlock() { return pointerBlock; } @@ -103,10 +110,7 @@ public class IntPointerBuffer { public void freeMemory() { pointerBlock = null; if (pointerMemoryBlock != null) { - UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock); - } - if (baseBlock != null) { - UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock); + UnsafeSortMemoryManager.INSTANCE.freeMemory(this.taskId, pointerMemoryBlock); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 28e63a9..991bc90 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -17,7 +17,10 @@ package org.apache.carbondata.core.memory; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import org.apache.carbondata.common.logging.LogService; @@ -36,23 +39,23 @@ public class UnsafeMemoryManager { private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); + private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap; static { long size; try { size = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, - CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)); + .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)); } catch (Exception e) { size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); LOGGER.info("Wrong memory size given, " + "so setting default value to " + size); } - if (size < 1024) { - size = 1024; - LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, " + if (size < 512) { + size = 512; + LOGGER.info("It is not recommended to keep unsafe memory size less than 512MB, " + "so setting default value to " + size); } - long takenSize = size * 1024 * 1024; MemoryAllocator allocator; if (offHeap) { @@ -65,6 +68,7 @@ public class UnsafeMemoryManager { allocator = MemoryAllocator.HEAP; } INSTANCE = new UnsafeMemoryManager(takenSize, allocator); + taskIdToMemoryBlockMap = new HashMap<>(); } public static final UnsafeMemoryManager INSTANCE; @@ -75,76 +79,91 @@ public class UnsafeMemoryManager { private MemoryAllocator allocator; - private long minimumMemory; - - // for debug purpose - private Set<MemoryBlock> set = new HashSet<>(); - private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) { this.totalMemory = totalMemory; this.allocator = allocator; - long numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); - long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); - sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024; - long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores; - if (totalWorkingMemoryForAllThreads >= totalMemory) { - throw new RuntimeException("Working memory should be less than total memory configured, " - + "so either reduce the loading threads or increase the memory size. " - + "(Number of threads * number of threads) should be less than total unsafe memory"); - } - minimumMemory = totalWorkingMemoryForAllThreads; - LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator - + " and minimum reserve memory " + minimumMemory); + LOGGER + .info("Working Memory manager is created with size " + totalMemory + " with " + allocator); } - private synchronized MemoryBlock allocateMemory(long memoryRequested) { + private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) { if (memoryUsed + memoryRequested <= totalMemory) { MemoryBlock allocate = allocator.allocate(memoryRequested); memoryUsed += allocate.size(); if (LOGGER.isDebugEnabled()) { - set.add(allocate); - LOGGER.error("Memory block (" + allocate + ") is created with size " + allocate.size() + - ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes"); + LOGGER.debug( + "Working Memory block (" + allocate + ") is created with size " + allocate.size() + + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) + + "Bytes"); + } + Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); + if (null == listOfMemoryBlock) { + listOfMemoryBlock = new HashSet<>(); + taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); } + listOfMemoryBlock.add(allocate); return allocate; } return null; } - public synchronized void freeMemory(MemoryBlock memoryBlock) { + public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) { + taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); allocator.free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { - set.remove(memoryBlock); - LOGGER.error("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed + - "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size()); + LOGGER.debug( + "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + ( + totalMemory - memoryUsed)); } } - private synchronized long getAvailableMemory() { - return totalMemory - memoryUsed; + public void freeMemoryAll(long taskId) { + Set<MemoryBlock> memoryBlockSet = null; + synchronized (INSTANCE) { + memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId); + } + long occuppiedMemory = 0; + if (null != memoryBlockSet) { + Iterator<MemoryBlock> iterator = memoryBlockSet.iterator(); + MemoryBlock memoryBlock = null; + while (iterator.hasNext()) { + memoryBlock = iterator.next(); + occuppiedMemory += memoryBlock.size(); + allocator.free(memoryBlock); + } + } + synchronized (INSTANCE) { + memoryUsed -= occuppiedMemory; + memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + ( + totalMemory - memoryUsed)); + } } - public boolean isMemoryAvailable() { - return getAvailableMemory() > minimumMemory; + public synchronized boolean isMemoryAvailable() { + return memoryUsed > totalMemory; } public long getUsableMemory() { - return totalMemory - minimumMemory; + return totalMemory; } /** * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully. */ - public static MemoryBlock allocateMemoryWithRetry(long size) throws MemoryException { + public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException { MemoryBlock baseBlock = null; int tries = 0; - while (tries < 100) { - baseBlock = INSTANCE.allocateMemory(size); + while (tries < 300) { + baseBlock = INSTANCE.allocateMemory(taskId, size); if (baseBlock == null) { try { - Thread.sleep(50); + Thread.sleep(500); } catch (InterruptedException e) { throw new MemoryException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java new file mode 100644 index 0000000..d975cd4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.memory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; + +/** + * Memory manager to keep track of + * all memory for storing the sorted data + */ +public class UnsafeSortMemoryManager { + + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName()); + + /** + * offheap is enabled + */ + private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); + + /** + * map to keep taskid to memory blocks + */ + private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap; + + /** + * singleton instance + */ + public static final UnsafeSortMemoryManager INSTANCE; + + /** + * total memory available for sort data storage + */ + private long totalMemory; + + /** + * current memory used + */ + private long memoryUsed; + + /** + * current memory allocator + */ + private MemoryAllocator allocator; + + static { + long size; + try { + size = Long.parseLong(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)); + } catch (Exception e) { + size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); + LOGGER.info("Wrong memory size given, " + "so setting default value to " + size); + } + if (size < 1024) { + size = 1024; + LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, " + + "so setting default value to " + size); + } + + long takenSize = size * 1024 * 1024; + MemoryAllocator allocator; + if (offHeap) { + allocator = MemoryAllocator.UNSAFE; + } else { + long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100; + if (takenSize > maxMemory) { + takenSize = maxMemory; + } + allocator = MemoryAllocator.HEAP; + } + INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator); + taskIdToMemoryBlockMap = new HashMap<>(); + } + + private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) { + this.totalMemory = totalMemory; + this.allocator = allocator; + LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator); + } + + /** + * Below method will be used to check whether memory required is + * available or not + * + * @param required + * @return if memory available + */ + public synchronized boolean isMemoryAvailable(long required) { + return memoryUsed + required < totalMemory; + } + + /** + * Below method will be used to allocate dummy memory + * this will be used to allocate first and then used when u need + * + * @param size + */ + public synchronized void allocateDummyMemory(long size) { + memoryUsed += size; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Working Memory block (" + size + ") is created with size " + size + + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) + + "Bytes"); + } + } + + public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) { + allocator.free(memoryBlock); + taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); + memoryUsed -= memoryBlock.size(); + memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + ( + totalMemory - memoryUsed)); + } + } + + /** + * Below method will be used to free all the + * memory occupied for a task, this will be useful + * when in case of task failure we need to clear all the memory occupied + * @param taskId + */ + public void freeMemoryAll(long taskId) { + Set<MemoryBlock> memoryBlockSet = null; + synchronized (INSTANCE) { + memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId); + } + long occuppiedMemory = 0; + if (null != memoryBlockSet) { + Iterator<MemoryBlock> iterator = memoryBlockSet.iterator(); + MemoryBlock memoryBlock = null; + while (iterator.hasNext()) { + memoryBlock = iterator.next(); + occuppiedMemory += memoryBlock.size(); + allocator.free(memoryBlock); + } + } + synchronized (INSTANCE) { + memoryUsed -= occuppiedMemory; + memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + ( + totalMemory - memoryUsed)); + } + } + + /** + * Before calling this method caller should call allocateMemoryDummy + * This method will be used to allocate the memory, this can be used + * when caller wants to allocate memory first and used it anytime + * @param taskId + * @param memoryRequested + * @return memory block + */ + public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) { + MemoryBlock allocate = allocator.allocate(memoryRequested); + Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); + if (null == listOfMemoryBlock) { + listOfMemoryBlock = new HashSet<>(); + taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); + } + listOfMemoryBlock.add(allocate); + return allocate; + } + + /** + * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully. + */ + public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException { + MemoryBlock baseBlock = null; + int tries = 0; + while (tries < 100) { + baseBlock = INSTANCE.allocateMemory(taskId, size); + if (baseBlock == null) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + throw new MemoryException(e); + } + } else { + break; + } + tries++; + } + if (baseBlock == null) { + throw new MemoryException("Not enough memory"); + } + return baseBlock; + } + + private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) { + if (memoryUsed + memoryRequested <= totalMemory) { + MemoryBlock allocate = allocator.allocate(memoryRequested); + memoryUsed += allocate.size(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Working Memory block (" + allocate.size() + ") is created with size " + allocate.size() + + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) + + "Bytes"); + } + Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); + if (null == listOfMemoryBlock) { + listOfMemoryBlock = new HashSet<>(); + taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); + } + listOfMemoryBlock.add(allocate); + return allocate; + } + return null; + } + + public static boolean isOffHeap() { + return offHeap; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index ff54673..faa4564 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -43,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -62,6 +63,7 @@ import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.commons.lang3.ArrayUtils; @@ -519,6 +521,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { if (null != queryIterator) { queryIterator.close(); } + UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); if (null != queryProperties.executorService) { queryProperties.executorService.shutdown(); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java new file mode 100644 index 0000000..d3e4d7a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.util; + +import java.io.Serializable; + +/** + * Value object to keep track of all the thread local variable + */ +public class CarbonTaskInfo implements Serializable { + + /** + * serial version id + */ + private static final long serialVersionUID = 1L; + + public long taskId; + + public long getTaskId() { + return taskId; + } + + public void setTaskId(long taskId) { + this.taskId = taskId; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java new file mode 100644 index 0000000..8c871b8 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.util; + +/** + * Class to keep all the thread local variable for task + */ +public class ThreadLocalTaskInfo { + static final InheritableThreadLocal<CarbonTaskInfo> threadLocal = + new InheritableThreadLocal<CarbonTaskInfo>(); + + public static void setCarbonTaskInfo(CarbonTaskInfo carbonTaskInfo) { + threadLocal.set(carbonTaskInfo); + } + + public static CarbonTaskInfo getCarbonTaskInfo() { + return threadLocal.get(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index 106a9fd..48e97ed 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.{CarbonSessionInfo, CarbonTaskInfo, SessionParams, ThreadLocalSessionInfo, ThreadLocalTaskInfo} /** * This RDD maintains session level ThreadLocal @@ -41,6 +41,9 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, final def compute(split: Partition, context: TaskContext): Iterator[T] = { ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) + val carbonTaskInfo = new CarbonTaskInfo + carbonTaskInfo.setTaskId(System.nanoTime) + ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo) internalCompute(split, context) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index d325f71..ac1c723 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -42,7 +42,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.LoadMetadataDetails -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} import org.apache.carbondata.processing.csvload.BlockDetails import org.apache.carbondata.processing.csvload.CSVInputFormat import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator @@ -240,7 +240,11 @@ class NewCarbonDataLoadRDD[K, V]( loadMetadataDetails) // Intialize to set carbon properties loader.initialize() - new DataLoadExecutor().execute(model, + val executor = new DataLoadExecutor() + // in case of success, failure or cancelation clear memory and stop execution + context.addTaskCompletionListener { context => executor.close() + CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)} + executor.execute(model, loader.storeLocation, recordReaders) } catch { @@ -327,7 +331,6 @@ class NewCarbonDataLoadRDD[K, V]( } } } - /** * generate blocks id * @@ -423,7 +426,6 @@ class NewDataFrameLoaderRDD[K, V]( recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition, carbonLoadModel, context) } - val loader = new SparkPartitionLoader(model, theSplit.index, null, @@ -431,7 +433,11 @@ class NewDataFrameLoaderRDD[K, V]( loadMetadataDetails) // Intialize to set carbon properties loader.initialize() - new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray) + val executor = new DataLoadExecutor + // in case of success, failure or cancelation clear memory and stop execution + context.addTaskCompletionListener { context => executor.close() + CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)} + executor.execute(model, loader.storeLocation, recordReaders.toArray) } catch { case e: BadRecordFoundException => loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) @@ -606,7 +612,6 @@ class PartitionTableDataLoaderRDD[K, V]( carbonLoadModel.setSegmentId(String.valueOf(loadCount)) carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index))) carbonLoadModel.setPreFetch(false) - val recordReaders = Array[CarbonIterator[Array[AnyRef]]] { new NewRddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel, context) } @@ -618,7 +623,11 @@ class PartitionTableDataLoaderRDD[K, V]( loadMetadataDetails) // Intialize to set carbon properties loader.initialize() - new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) + val executor = new DataLoadExecutor + // in case of success, failure or cancelation clear memory and stop execution + context.addTaskCompletionListener { context => executor.close() + CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)} + executor.execute(model, loader.storeLocation, recordReaders) } catch { case e: BadRecordFoundException => loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) @@ -642,7 +651,6 @@ class PartitionTableDataLoaderRDD[K, V]( } } var finished = false - override def hasNext: Boolean = !finished override def next(): (K, V) = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index bb8c5a6..579347b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.spark.util + import java.text.SimpleDateFormat import java.util @@ -35,6 +36,7 @@ import org.apache.spark.util.FileUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager} import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType @@ -615,8 +617,21 @@ object CommonUtil { result.result() } - def partitionInfoOutput: Seq[Attribute] = Seq( - AttributeReference("partition", StringType, nullable = false, - new MetadataBuilder().putString("comment", "partitions info").build())() - ) + def partitionInfoOutput: Seq[Attribute] = { + Seq( + AttributeReference("partition", StringType, nullable = false, + new MetadataBuilder().putString("comment", "partitions info").build())() + ) + } + + /** + * Method to clear the memory for a task + * if present + */ + def clearUnsafeMemory(taskId: Long) { + UnsafeMemoryManager. + INSTANCE.freeMemoryAll(taskId) + UnsafeSortMemoryManager. + INSTANCE.freeMemoryAll(taskId) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java index e252e7f..3a6428d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java @@ -304,6 +304,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri if (boundedInputStream != null) { boundedInputStream.close(); } + if (null != csvParser) { + csvParser.stopParsing(); + } } finally { reader = null; boundedInputStream = null; http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java index 66e6d37..d4e79f8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java @@ -33,9 +33,12 @@ public class DataLoadExecutor { private static final LogService LOGGER = LogServiceFactory.getLogService(DataLoadExecutor.class.getName()); + private AbstractDataLoadProcessorStep loadProcessorStep; + + private boolean isClosed; + public void execute(CarbonLoadModel loadModel, String storeLocation, CarbonIterator<Object[]>[] inputIterators) throws Exception { - AbstractDataLoadProcessorStep loadProcessorStep = null; try { loadProcessorStep = new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators); @@ -60,10 +63,6 @@ public class DataLoadExecutor { } finally { removeBadRecordKey( loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier()); - if (loadProcessorStep != null) { - // 3. Close the step - loadProcessorStep.close(); - } } } @@ -91,4 +90,14 @@ public class DataLoadExecutor { String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey(); BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey); } + + /** + * Method to clean all the resource + */ + public void close() { + if (!isClosed && loadProcessorStep != null) { + loadProcessorStep.close(); + } + isClosed = true; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java index 2ac138b..9d2ee9a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java @@ -25,6 +25,8 @@ import java.util.Arrays; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.IntPointerBuffer; import org.apache.carbondata.core.memory.MemoryBlock; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; +import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.DataTypeUtil; @@ -55,9 +57,13 @@ public class UnsafeCarbonRowPage { private boolean saveToDisk; + private MemoryManagerType managerType; + + private long taskId; + public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type, - MemoryBlock memoryBlock, boolean saveToDisk) { + MemoryBlock memoryBlock, boolean saveToDisk, long taskId) { this.noDictionaryDimensionMapping = noDictionaryDimensionMapping; this.noDictionarySortColumnMapping = noDictionarySortColumnMapping; this.dimensionSize = dimensionSize; @@ -65,10 +71,12 @@ public class UnsafeCarbonRowPage { this.measureDataType = type; this.saveToDisk = saveToDisk; this.nullSetWords = new long[((measureSize - 1) >> 6) + 1]; - buffer = new IntPointerBuffer(memoryBlock); - this.dataBlock = buffer.getBaseBlock(); + this.taskId = taskId; + buffer = new IntPointerBuffer(this.taskId); + this.dataBlock = memoryBlock; // TODO Only using 98% of space for safe side.May be we can have different logic. sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100; + this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER; } public int addRow(Object[] row) { @@ -324,7 +332,14 @@ public class UnsafeCarbonRowPage { } public void freeMemory() { - buffer.freeMemory(); + switch (managerType) { + case UNSAFE_MEMORY_MANAGER: + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock); + break; + default: + UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock); + buffer.freeMemory(); + } } public boolean isSaveToDisk() { @@ -369,4 +384,13 @@ public class UnsafeCarbonRowPage { public boolean[] getNoDictionarySortColumnMapping() { return noDictionarySortColumnMapping; } + + public void setNewDataBlock(MemoryBlock newMemoryBlock) { + this.dataBlock = newMemoryBlock; + this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER; + } + + public enum MemoryManagerType { + UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index a42d0ea..8021b45 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -31,12 +31,15 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.IntPointerBuffer; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; +import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator; import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms; import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow; @@ -86,6 +89,8 @@ public class UnsafeSortDataRows { */ private Semaphore semaphore; + private final long taskId; + public UnsafeSortDataRows(SortParameters parameters, UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) { this.parameters = parameters; @@ -94,9 +99,9 @@ public class UnsafeSortDataRows { // observer of writing file in thread this.threadStatusObserver = new ThreadStatusObserver(); - + this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); this.inMemoryChunkSize = inMemoryChunkSize; - this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024; + this.inMemoryChunkSize = inMemoryChunkSize * 1024 * 1024; enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT, CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT)); @@ -115,12 +120,18 @@ public class UnsafeSortDataRows { * This method will be used to initialize */ public void initialize() throws MemoryException { - MemoryBlock baseBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize); + MemoryBlock baseBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); + boolean isMemoryAvailable = + UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size()); + if (isMemoryAvailable) { + UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size()); + } this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), parameters.getNoDictionarySortColumn(), parameters.getDimColCount() + parameters.getComplexDimColCount(), parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock, - !UnsafeMemoryManager.INSTANCE.isMemoryAvailable()); + !isMemoryAvailable, taskId); // Delete if any older file exists in sort temp folder deleteSortLocationIfExists(); @@ -177,8 +188,13 @@ public class UnsafeSortDataRows { unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); semaphore.acquire(); dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); - MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize); - boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable(); + MemoryBlock memoryBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); + boolean saveToDisk = + UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size()); + if (!saveToDisk) { + UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size()); + } rowPage = new UnsafeCarbonRowPage( parameters.getNoDictionaryDimnesionColumn(), parameters.getNoDictionarySortColumn(), @@ -186,7 +202,7 @@ public class UnsafeSortDataRows { parameters.getMeasureColCount(), parameters.getMeasureDataType(), memoryBlock, - saveToDisk); + saveToDisk, taskId); bytesAdded += rowPage.addRow(rowBatch[i]); } catch (Exception e) { LOGGER.error( @@ -214,14 +230,18 @@ public class UnsafeSortDataRows { unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); semaphore.acquire(); dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); - MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize); - boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable(); + MemoryBlock memoryBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); + boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size()); + if (!saveToDisk) { + UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size()); + } rowPage = new UnsafeCarbonRowPage( parameters.getNoDictionaryDimnesionColumn(), parameters.getNoDictionarySortColumn(), parameters.getDimColCount(), parameters.getMeasureColCount(), parameters.getMeasureDataType(), memoryBlock, - saveToDisk); + saveToDisk, taskId); rowPage.addRow(row); } catch (Exception e) { LOGGER.error( @@ -343,7 +363,7 @@ public class UnsafeSortDataRows { timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(), new UnsafeRowComparatorForNormalDIms(page)); } - if (rowPage.isSaveToDisk()) { + if (page.isSaveToDisk()) { // create a new file every time File sortTempFile = new File( parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System @@ -356,6 +376,18 @@ public class UnsafeSortDataRows { // intermediate merging of sort temp files will be triggered unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile); } else { + // creating a new memory block as size is already allocated + // so calling lazy memory allocator + MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE + .allocateMemoryLazy(taskId, page.getDataBlock().size()); + // copying data from working memory manager to sortmemory manager + CarbonUnsafe.unsafe + .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(), + newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), + page.getDataBlock().size()); + // free unsafememory manager + page.freeMemory(); + page.setNewDataBlock(newMemoryBlock); // add sort temp filename to and arrayList. When the list size reaches 20 then // intermediate merging of sort temp files will be triggered page.getBuffer().loadToUnsafe();
