Repository: carbondata
Updated Branches:
  refs/heads/master b21a6d49f -> c429cee16


[CARBONDATA-3035] Optimize parameters for unsafe working and sort memory

Add memory type in log message to distinguish them and fix trival bugs
for them

This closes #2844


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c429cee1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c429cee1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c429cee1

Branch: refs/heads/master
Commit: c429cee16b5b7e78e920608f5a8638cd01b91a58
Parents: b21a6d4
Author: xuchuanyin <[email protected]>
Authored: Tue Oct 23 11:33:33 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Wed Oct 24 16:11:41 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  4 +-
 .../core/memory/UnsafeMemoryManager.java        | 66 +++++++++++---------
 .../core/memory/UnsafeSortMemoryManager.java    | 49 ++++++++-------
 .../carbondata/core/util/CarbonProperties.java  | 34 +++-------
 4 files changed, 75 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b5e1e5d..c5f8335 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1628,9 +1628,9 @@ public final class CarbonCommonConstants {
   public static final String 
CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "false";
 
   @CarbonProperty
-  public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB =
+  public static final String CARBON_SORT_STORAGE_INMEMORY_IN_MB =
       "carbon.sort.storage.inmemory.size.inmb";
-  public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT = 
"512";
+  public static final int CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT = 512;
 
   @CarbonProperty
   public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 6a69dfd..db0258f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -43,7 +44,7 @@ public class UnsafeMemoryManager {
   private static Map<String,Set<MemoryBlock>> taskIdToMemoryBlockMap;
   static {
     long size = 0L;
-    String defaultWorkingMemorySize = null;
+    String configuredWorkingMemorySize = null;
     try {
       // check if driver unsafe memory is configured and JVM process is in 
driver. In that case
       // initialize unsafe memory configured for driver
@@ -51,22 +52,22 @@ public class UnsafeMemoryManager {
           .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
       boolean initializedWithUnsafeDriverMemory = false;
       if (isDriver) {
-        defaultWorkingMemorySize = CarbonProperties.getInstance()
+        configuredWorkingMemorySize = CarbonProperties.getInstance()
             
.getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB);
-        if (null != defaultWorkingMemorySize) {
-          size = Long.parseLong(defaultWorkingMemorySize);
+        if (null != configuredWorkingMemorySize) {
+          size = Long.parseLong(configuredWorkingMemorySize);
           initializedWithUnsafeDriverMemory = true;
         }
       }
       if (!initializedWithUnsafeDriverMemory) {
-        defaultWorkingMemorySize = CarbonProperties.getInstance()
+        configuredWorkingMemorySize = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
-        if (null != defaultWorkingMemorySize) {
-          size = Long.parseLong(defaultWorkingMemorySize);
+        if (null != configuredWorkingMemorySize) {
+          size = Long.parseLong(configuredWorkingMemorySize);
         }
       }
     } catch (Exception e) {
-      LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize);
+      LOGGER.info("Invalid working memory size value: " + 
configuredWorkingMemorySize);
     }
     long takenSize = size;
     MemoryType memoryType;
@@ -75,6 +76,10 @@ public class UnsafeMemoryManager {
       long defaultSize = 
Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
       if (takenSize < defaultSize) {
         takenSize = defaultSize;
+        LOGGER.warn(String.format(
+            "It is not recommended to set unsafe working memory size less than 
%sMB,"
+                + " so setting default value to %d",
+            CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT, 
defaultSize));
       }
       takenSize = takenSize * 1024 * 1024;
     } else {
@@ -104,27 +109,27 @@ public class UnsafeMemoryManager {
   private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) {
     this.totalMemory = totalMemory;
     this.memoryType = memoryType;
-    LOGGER
-        .info("Working Memory manager is created with size " + totalMemory + " 
with " + memoryType);
+    LOGGER.info(
+        "Working Memory manager is created with size " + totalMemory + " with 
" + memoryType);
   }
 
   private synchronized MemoryBlock allocateMemory(MemoryType memoryType, 
String taskId,
       long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
-      MemoryBlock allocate = 
getMemoryAllocator(memoryType).allocate(memoryRequested);
-      memoryUsed += allocate.size();
+      MemoryBlock memoryBlock = 
getMemoryAllocator(memoryType).allocate(memoryRequested);
+      memoryUsed += memoryBlock.size();
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {
         listOfMemoryBlock = new HashSet<>();
         taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
       }
-      listOfMemoryBlock.add(allocate);
+      listOfMemoryBlock.add(memoryBlock);
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Memory block (" + allocate + ") is created with size " + 
allocate.size()
-            + ". Total memory used " + memoryUsed + "Bytes, left " + 
(totalMemory - memoryUsed)
-            + "Bytes");
+        LOGGER.debug(String.format("Creating working Memory block (%s) with 
size %d."
+                + " Total memory used %d Bytes, left %d Bytes.",
+            memoryBlock.toString(), memoryBlock.size(), memoryUsed, 
totalMemory - memoryUsed));
       }
-      return allocate;
+      return memoryBlock;
     }
     return null;
   }
@@ -138,9 +143,9 @@ public class UnsafeMemoryManager {
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Freeing memory of size: " + memoryBlock.size() + "available 
memory:  " + (totalMemory
-                - memoryUsed));
+        LOGGER.debug(String.format(
+            "Freeing working memory block (%s) with size: %d, current 
available memory is: %d",
+            memoryBlock.toString(), memoryBlock.size(), totalMemory - 
memoryUsed));
       }
     }
   }
@@ -163,12 +168,13 @@ public class UnsafeMemoryManager {
     memoryUsed -= occuppiedMemory;
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "Freeing memory of size: " + occuppiedMemory + ": Current available 
memory is: " + (
-              totalMemory - memoryUsed));
+      LOGGER.debug(String.format(
+          "Freeing working memory of size %d. Current available memory is %d",
+          occuppiedMemory, totalMemory - memoryUsed));
     }
-    LOGGER.info("Total memory used after task " + taskId + " is " + memoryUsed
-        + " Current tasks running now are : " + 
taskIdToMemoryBlockMap.keySet());
+    LOGGER.info(String.format(
+        "Total working memory used after task %s is %d. Current running tasks 
are %s",
+        taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), 
", ")));
   }
 
   public synchronized boolean isMemoryAvailable() {
@@ -195,7 +201,7 @@ public class UnsafeMemoryManager {
       baseBlock = INSTANCE.allocateMemory(memoryType, taskId, size);
       if (baseBlock == null) {
         try {
-          LOGGER.info("Memory is not available, retry after 500 millis");
+          LOGGER.info("Working memory is not available, retry after 500 ms");
           Thread.sleep(500);
         } catch (InterruptedException e) {
           throw new MemoryException(e);
@@ -207,8 +213,8 @@ public class UnsafeMemoryManager {
     }
     if (baseBlock == null) {
       INSTANCE.printCurrentMemoryUsage();
-      throw new MemoryException(
-          "Not enough memory. please increase 
carbon.unsafe.working.memory.in.mb");
+      throw new MemoryException("Not enough working memory, please increase "
+          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
     }
     return baseBlock;
   }
@@ -227,7 +233,7 @@ public class UnsafeMemoryManager {
   }
 
   private synchronized void printCurrentMemoryUsage() {
-    LOGGER.error(
-        " Memory Used : " + memoryUsed + " Tasks running : " + 
taskIdToMemoryBlockMap.keySet());
+    LOGGER.info(String.format("Working memory used %d, running tasks are %s",
+        memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", ")));
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
index 8dcf915..847f6e2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -75,17 +76,18 @@ public class UnsafeSortMemoryManager {
   static {
     long size;
     try {
-      size = Long.parseLong(CarbonProperties.getInstance()
-          
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-              
CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT));
+      size = Long.parseLong(CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB));
     } catch (Exception e) {
-      size = 
Long.parseLong(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
-      LOGGER.info("Wrong memory size given, " + "so setting default value to " 
+ size);
+      size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
+      LOGGER.info("Wrong memory size given, so setting default value to " + 
size);
     }
-    if (size < 1024) {
-      size = 1024;
-      LOGGER.info("It is not recommended to keep unsafe memory size less than 
1024MB, "
-          + "so setting default value to " + size);
+    if (size < 
CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) {
+      size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
+      LOGGER.warn(String.format(
+          "It is not recommended to set unsafe sort memory size less than 
%dMB,"
+              + " so setting default value to %d",
+          CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT, 
size));
     }
 
     long takenSize = size * 1024 * 1024;
@@ -137,9 +139,9 @@ public class UnsafeSortMemoryManager {
   public synchronized void allocateDummyMemory(long size) {
     memoryUsed += size;
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Working Memory block (" + size + ") is created with size " 
+ size
-          + ". Total memory used " + memoryUsed + "Bytes, left " + 
(totalMemory - memoryUsed)
-          + "Bytes");
+      LOGGER.debug(String.format(
+          "Sort Memory block is created with size %d. Total memory used %d 
Bytes, left %d Bytes",
+          size, memoryUsed, totalMemory - memoryUsed));
     }
   }
 
@@ -152,9 +154,9 @@ public class UnsafeSortMemoryManager {
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Freeing memory of size: " + memoryBlock.size() + ": Current 
available memory is: " + (
-                totalMemory - memoryUsed));
+        LOGGER.debug(String.format(
+            "Freeing sort memory block (%s) with size: %d, current available 
memory is: %d",
+            memoryBlock.toString(), memoryBlock.size(), totalMemory - 
memoryUsed));
       }
     }
   }
@@ -184,9 +186,12 @@ public class UnsafeSortMemoryManager {
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
-          "Freeing memory of size: " + occuppiedMemory + ": Current available 
memory is: " + (
-              totalMemory - memoryUsed));
+          String.format("Freeing sort memory of size: %d, current available 
memory is: %d",
+              occuppiedMemory, totalMemory - memoryUsed));
     }
+    LOGGER.info(String.format(
+        "Total sort memory used after task %s is %d. Current running tasks 
are: %s",
+        taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), 
", ")));
   }
 
   /**
@@ -229,7 +234,8 @@ public class UnsafeSortMemoryManager {
       tries++;
     }
     if (baseBlock == null) {
-      throw new MemoryException("Not enough memory");
+      throw new MemoryException("Not enough sort memory, please increase "
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB);
     }
     return baseBlock;
   }
@@ -239,10 +245,9 @@ public class UnsafeSortMemoryManager {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Working Memory block (" + allocate.size() + ") is created with 
size " + allocate.size()
-                + ". Total memory used " + memoryUsed + "Bytes, left " + 
(totalMemory - memoryUsed)
-                + "Bytes");
+        LOGGER.debug(String.format(
+            "Sort Memory block is created with size %d. Total memory used %d 
Bytes, left %d Bytes",
+            allocate.size(), memoryUsed, totalMemory - memoryUsed));
       }
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index e6440b6..e6d48e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -258,7 +258,6 @@ public final class CarbonProperties {
     validateSortIntermediateFilesLimit();
     validateEnableAutoHandoff();
     validateSchedulerMinRegisteredRatio();
-    validateSortMemorySizeInMB();
     validateWorkingMemory();
     validateSortStorageMemory();
     validateEnableQueryStatistics();
@@ -1282,18 +1281,6 @@ public final class CarbonProperties {
     propertySet.addAll(externalPropertySet);
   }
 
-  private void validateSortMemorySizeInMB() {
-    try {
-      int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties
-          
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
-      
carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-          unsafeSortStorageMemoryString + "");
-    } catch (NumberFormatException ne) {
-      LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + 
"is invalid.");
-    }
-  }
-
   private void validateWorkingMemory() {
     try {
       int unsafeWorkingMemory = Integer.parseInt(
@@ -1307,27 +1294,26 @@ public final class CarbonProperties {
   }
 
   private void validateSortStorageMemory() {
-    int unsafeSortStorageMemoryDefault =
-        
Integer.parseInt(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
     int unsafeSortStorageMemory = 0;
     try {
       unsafeSortStorageMemory = Integer.parseInt(carbonProperties
-          
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
+          
.getProperty(CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB));
     } catch (NumberFormatException e) {
       LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + 
"is invalid."
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB + "is 
invalid."
           + " Taking the default value."
-          + 
CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
-      unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT);
+      unsafeSortStorageMemory = 
CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
     }
-    if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) {
+    if (unsafeSortStorageMemory
+        < CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) {
       LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB
           + "is less than the default value." + " Taking the default value."
-          + 
CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
-      unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT);
+      unsafeSortStorageMemory = 
CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
     }
-    
carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
+    
carbonProperties.setProperty(CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB,
         unsafeSortStorageMemory + "");
   }
 

Reply via email to