Repository: carbondata Updated Branches: refs/heads/master b21a6d49f -> c429cee16
[CARBONDATA-3035] Optimize parameters for unsafe working and sort memory Add memory type in log message to distinguish them and fix trival bugs for them This closes #2844 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c429cee1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c429cee1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c429cee1 Branch: refs/heads/master Commit: c429cee16b5b7e78e920608f5a8638cd01b91a58 Parents: b21a6d4 Author: xuchuanyin <[email protected]> Authored: Tue Oct 23 11:33:33 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Oct 24 16:11:41 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 +- .../core/memory/UnsafeMemoryManager.java | 66 +++++++++++--------- .../core/memory/UnsafeSortMemoryManager.java | 49 ++++++++------- .../carbondata/core/util/CarbonProperties.java | 34 +++------- 4 files changed, 75 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index b5e1e5d..c5f8335 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1628,9 +1628,9 @@ public final class CarbonCommonConstants { public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "false"; @CarbonProperty - public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB = + public static final String CARBON_SORT_STORAGE_INMEMORY_IN_MB = "carbon.sort.storage.inmemory.size.inmb"; - public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT = "512"; + public static final int CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT = 512; @CarbonProperty public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP = http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 6a69dfd..db0258f 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; /** @@ -43,7 +44,7 @@ public class UnsafeMemoryManager { private static Map<String,Set<MemoryBlock>> taskIdToMemoryBlockMap; static { long size = 0L; - String defaultWorkingMemorySize = null; + String configuredWorkingMemorySize = null; try { // check if driver unsafe memory is configured and JVM process is in driver. In that case // initialize unsafe memory configured for driver @@ -51,22 +52,22 @@ public class UnsafeMemoryManager { .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false")); boolean initializedWithUnsafeDriverMemory = false; if (isDriver) { - defaultWorkingMemorySize = CarbonProperties.getInstance() + configuredWorkingMemorySize = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB); - if (null != defaultWorkingMemorySize) { - size = Long.parseLong(defaultWorkingMemorySize); + if (null != configuredWorkingMemorySize) { + size = Long.parseLong(configuredWorkingMemorySize); initializedWithUnsafeDriverMemory = true; } } if (!initializedWithUnsafeDriverMemory) { - defaultWorkingMemorySize = CarbonProperties.getInstance() + configuredWorkingMemorySize = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB); - if (null != defaultWorkingMemorySize) { - size = Long.parseLong(defaultWorkingMemorySize); + if (null != configuredWorkingMemorySize) { + size = Long.parseLong(configuredWorkingMemorySize); } } } catch (Exception e) { - LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize); + LOGGER.info("Invalid working memory size value: " + configuredWorkingMemorySize); } long takenSize = size; MemoryType memoryType; @@ -75,6 +76,10 @@ public class UnsafeMemoryManager { long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); if (takenSize < defaultSize) { takenSize = defaultSize; + LOGGER.warn(String.format( + "It is not recommended to set unsafe working memory size less than %sMB," + + " so setting default value to %d", + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT, defaultSize)); } takenSize = takenSize * 1024 * 1024; } else { @@ -104,27 +109,27 @@ public class UnsafeMemoryManager { private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) { this.totalMemory = totalMemory; this.memoryType = memoryType; - LOGGER - .info("Working Memory manager is created with size " + totalMemory + " with " + memoryType); + LOGGER.info( + "Working Memory manager is created with size " + totalMemory + " with " + memoryType); } private synchronized MemoryBlock allocateMemory(MemoryType memoryType, String taskId, long memoryRequested) { if (memoryUsed + memoryRequested <= totalMemory) { - MemoryBlock allocate = getMemoryAllocator(memoryType).allocate(memoryRequested); - memoryUsed += allocate.size(); + MemoryBlock memoryBlock = getMemoryAllocator(memoryType).allocate(memoryRequested); + memoryUsed += memoryBlock.size(); Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); if (null == listOfMemoryBlock) { listOfMemoryBlock = new HashSet<>(); taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); } - listOfMemoryBlock.add(allocate); + listOfMemoryBlock.add(memoryBlock); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Memory block (" + allocate + ") is created with size " + allocate.size() - + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) - + "Bytes"); + LOGGER.debug(String.format("Creating working Memory block (%s) with size %d." + + " Total memory used %d Bytes, left %d Bytes.", + memoryBlock.toString(), memoryBlock.size(), memoryUsed, totalMemory - memoryUsed)); } - return allocate; + return memoryBlock; } return null; } @@ -138,9 +143,9 @@ public class UnsafeMemoryManager { memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory - - memoryUsed)); + LOGGER.debug(String.format( + "Freeing working memory block (%s) with size: %d, current available memory is: %d", + memoryBlock.toString(), memoryBlock.size(), totalMemory - memoryUsed)); } } } @@ -163,12 +168,13 @@ public class UnsafeMemoryManager { memoryUsed -= occuppiedMemory; memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + ( - totalMemory - memoryUsed)); + LOGGER.debug(String.format( + "Freeing working memory of size %d. Current available memory is %d", + occuppiedMemory, totalMemory - memoryUsed)); } - LOGGER.info("Total memory used after task " + taskId + " is " + memoryUsed - + " Current tasks running now are : " + taskIdToMemoryBlockMap.keySet()); + LOGGER.info(String.format( + "Total working memory used after task %s is %d. Current running tasks are %s", + taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", "))); } public synchronized boolean isMemoryAvailable() { @@ -195,7 +201,7 @@ public class UnsafeMemoryManager { baseBlock = INSTANCE.allocateMemory(memoryType, taskId, size); if (baseBlock == null) { try { - LOGGER.info("Memory is not available, retry after 500 millis"); + LOGGER.info("Working memory is not available, retry after 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { throw new MemoryException(e); @@ -207,8 +213,8 @@ public class UnsafeMemoryManager { } if (baseBlock == null) { INSTANCE.printCurrentMemoryUsage(); - throw new MemoryException( - "Not enough memory. please increase carbon.unsafe.working.memory.in.mb"); + throw new MemoryException("Not enough working memory, please increase " + + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB); } return baseBlock; } @@ -227,7 +233,7 @@ public class UnsafeMemoryManager { } private synchronized void printCurrentMemoryUsage() { - LOGGER.error( - " Memory Used : " + memoryUsed + " Tasks running : " + taskIdToMemoryBlockMap.keySet()); + LOGGER.info(String.format("Working memory used %d, running tasks are %s", + memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", "))); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java index 8dcf915..847f6e2 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java @@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; /** @@ -75,17 +76,18 @@ public class UnsafeSortMemoryManager { static { long size; try { - size = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB, - CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT)); + size = Long.parseLong(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB)); } catch (Exception e) { - size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT); - LOGGER.info("Wrong memory size given, " + "so setting default value to " + size); + size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT; + LOGGER.info("Wrong memory size given, so setting default value to " + size); } - if (size < 1024) { - size = 1024; - LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, " - + "so setting default value to " + size); + if (size < CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) { + size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT; + LOGGER.warn(String.format( + "It is not recommended to set unsafe sort memory size less than %dMB," + + " so setting default value to %d", + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT, size)); } long takenSize = size * 1024 * 1024; @@ -137,9 +139,9 @@ public class UnsafeSortMemoryManager { public synchronized void allocateDummyMemory(long size) { memoryUsed += size; if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Working Memory block (" + size + ") is created with size " + size - + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) - + "Bytes"); + LOGGER.debug(String.format( + "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes", + size, memoryUsed, totalMemory - memoryUsed)); } } @@ -152,9 +154,9 @@ public class UnsafeSortMemoryManager { memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + ( - totalMemory - memoryUsed)); + LOGGER.debug(String.format( + "Freeing sort memory block (%s) with size: %d, current available memory is: %d", + memoryBlock.toString(), memoryBlock.size(), totalMemory - memoryUsed)); } } } @@ -184,9 +186,12 @@ public class UnsafeSortMemoryManager { memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + ( - totalMemory - memoryUsed)); + String.format("Freeing sort memory of size: %d, current available memory is: %d", + occuppiedMemory, totalMemory - memoryUsed)); } + LOGGER.info(String.format( + "Total sort memory used after task %s is %d. Current running tasks are: %s", + taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", "))); } /** @@ -229,7 +234,8 @@ public class UnsafeSortMemoryManager { tries++; } if (baseBlock == null) { - throw new MemoryException("Not enough memory"); + throw new MemoryException("Not enough sort memory, please increase " + + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB); } return baseBlock; } @@ -239,10 +245,9 @@ public class UnsafeSortMemoryManager { MemoryBlock allocate = allocator.allocate(memoryRequested); memoryUsed += allocate.size(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Working Memory block (" + allocate.size() + ") is created with size " + allocate.size() - + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) - + "Bytes"); + LOGGER.debug(String.format( + "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes", + allocate.size(), memoryUsed, totalMemory - memoryUsed)); } Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); if (null == listOfMemoryBlock) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index e6440b6..e6d48e5 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -258,7 +258,6 @@ public final class CarbonProperties { validateSortIntermediateFilesLimit(); validateEnableAutoHandoff(); validateSchedulerMinRegisteredRatio(); - validateSortMemorySizeInMB(); validateWorkingMemory(); validateSortStorageMemory(); validateEnableQueryStatistics(); @@ -1282,18 +1281,6 @@ public final class CarbonProperties { propertySet.addAll(externalPropertySet); } - private void validateSortMemorySizeInMB() { - try { - int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB)); - carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB, - unsafeSortStorageMemoryString + ""); - } catch (NumberFormatException ne) { - LOGGER.warn("The specified value for property " - + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid."); - } - } - private void validateWorkingMemory() { try { int unsafeWorkingMemory = Integer.parseInt( @@ -1307,27 +1294,26 @@ public final class CarbonProperties { } private void validateSortStorageMemory() { - int unsafeSortStorageMemoryDefault = - Integer.parseInt(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT); int unsafeSortStorageMemory = 0; try { unsafeSortStorageMemory = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB)); + .getProperty(CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB)); } catch (NumberFormatException e) { LOGGER.warn("The specified value for property " - + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid." + + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB + "is invalid." + " Taking the default value." - + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT); - unsafeSortStorageMemory = unsafeSortStorageMemoryDefault; + + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT); + unsafeSortStorageMemory = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT; } - if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) { + if (unsafeSortStorageMemory + < CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) { LOGGER.warn("The specified value for property " - + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB + "is less than the default value." + " Taking the default value." - + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT); - unsafeSortStorageMemory = unsafeSortStorageMemoryDefault; + + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT); + unsafeSortStorageMemory = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT; } - carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB, + carbonProperties.setProperty(CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB, unsafeSortStorageMemory + ""); }
