Repository: carbondata
Updated Branches:
  refs/heads/master 0528a7985 -> 54dcd8d5b


[HOTFIX] Fixed LRU cache bug to invalidate the cacheable object to clean up the 
resources

This PR contains

Fix for LRU cache bug to invalidate the Cacheable object while removing it from 
LRU cache. This will help in clearing the unsafe memory for cacheable objects 
like BlockDataMaps
Fix for setting the driver flag for saprkCarbonFileFormat which will used for 
taking the driver memory for LRU cache and unsafe memory initialization
Modified the logic for properties validation for unsafe working and sort 
memory. Sort memory now will not consider the value of parameter 
sort.inmemory.size.inmb as it deprecated from long back. The memory configured 
for this parameter was divided in 80:20 ratio for sort and working unsafe 
memory which is now removed. Now only value for parameter 
carbon.sort.storage.inmemory.size.inmb will be considered.

This closes #2698


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/54dcd8d5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/54dcd8d5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/54dcd8d5

Branch: refs/heads/master
Commit: 54dcd8d5b1f6205523363454e8d197bb95652775
Parents: 0528a79
Author: manishgupta88 <[email protected]>
Authored: Thu Sep 6 22:56:49 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Mon Sep 10 17:21:26 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/cache/Cacheable.java |  7 +++
 .../carbondata/core/cache/CarbonLRUCache.java   |  8 +--
 .../AbstractColumnDictionaryInfo.java           |  4 ++
 .../core/datastore/block/AbstractIndex.java     |  4 ++
 .../indexstore/BlockletDataMapIndexWrapper.java |  8 +++
 .../core/memory/UnsafeMemoryManager.java        | 40 ++++++------
 .../carbondata/core/util/CarbonProperties.java  | 66 +++-----------------
 .../datamap/bloom/BloomCacheKeyValue.java       |  4 ++
 .../execution/datasources/CarbonFileIndex.scala |  6 ++
 9 files changed, 69 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java 
b/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
index d8be12b..c58982d 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
@@ -45,4 +45,11 @@ public interface Cacheable {
    * @return
    */
   long getMemorySize();
+
+  /**
+   * Method to be used for invalidating the cacheable object. API to be 
invoked at the time of
+   * removing the cacheable object from memory. Example at the of removing the 
cachebale object
+   * from LRU cache
+   */
+  void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java 
b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 03838a2..4a0c36c 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -155,10 +155,10 @@ public final class CarbonLRUCache {
   private void removeKey(String key) {
     Cacheable cacheable = lruCacheMap.get(key);
     if (null != cacheable) {
-      currentSize = currentSize - cacheable.getMemorySize();
-    }
-    Cacheable remove = lruCacheMap.remove(key);
-    if (null != remove) {
+      long memorySize = cacheable.getMemorySize();
+      cacheable.invalidate();
+      lruCacheMap.remove(key);
+      currentSize = currentSize - memorySize;
       LOGGER.info("Removed entry from InMemory lru cache :: " + key);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
index c138cc8..f5971a5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
@@ -296,5 +296,9 @@ public abstract class AbstractColumnDictionaryInfo 
implements DictionaryInfo {
     byte[] keyData = 
value.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
     return getSurrogateKey(keyData);
   }
+
+  @Override public void invalidate() {
+
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
index 7fbef8a..1972e97 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
@@ -100,6 +100,10 @@ public abstract class AbstractIndex implements Cacheable {
     return this.memorySize;
   }
 
+  @Override public void invalidate() {
+
+  }
+
   /**
    * The method is used to set the access count
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index b0fb13e..2cf0259 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
 
 /**
@@ -57,6 +58,13 @@ public class BlockletDataMapIndexWrapper implements 
Cacheable, Serializable {
     return wrapperSize;
   }
 
+  @Override public void invalidate() {
+    for (DataMap dataMap : dataMaps) {
+      dataMap.clear();
+    }
+    dataMaps = null;
+  }
+
   public List<BlockDataMap> getDataMaps() {
     return dataMaps;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 9133f0f..8fcbb6c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -42,6 +42,7 @@ public class UnsafeMemoryManager {
   private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap;
   static {
     long size = 0L;
+    String defaultWorkingMemorySize = null;
     try {
       // check if driver unsafe memory is configured and JVM process is in 
driver. In that case
       // initialize unsafe memory configured for driver
@@ -49,38 +50,41 @@ public class UnsafeMemoryManager {
           .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
       boolean initializedWithUnsafeDriverMemory = false;
       if (isDriver) {
-        String driverUnsafeMemorySize = CarbonProperties.getInstance()
+        defaultWorkingMemorySize = CarbonProperties.getInstance()
             
.getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB);
-        if (null != driverUnsafeMemorySize) {
-          size = Long.parseLong(CarbonProperties.getInstance()
-              
.getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB,
-                  CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
+        if (null != defaultWorkingMemorySize) {
+          size = Long.parseLong(defaultWorkingMemorySize);
           initializedWithUnsafeDriverMemory = true;
         }
       }
       if (!initializedWithUnsafeDriverMemory) {
-        size = Long.parseLong(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
-                CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
+        defaultWorkingMemorySize = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
+        if (null != defaultWorkingMemorySize) {
+          size = Long.parseLong(defaultWorkingMemorySize);
+        }
       }
     } catch (Exception e) {
-      size = 
Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-      LOGGER.info("Wrong memory size given, "
-          + "so setting default value to " + size);
-    }
-    if (size < 512) {
-      size = 512;
-      LOGGER.info("It is not recommended to keep unsafe memory size less than 
512MB, "
-          + "so setting default value to " + size);
+      LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize);
     }
-    long takenSize = size * 1024 * 1024;
+    long takenSize = size;
     MemoryAllocator allocator;
     if (offHeap) {
       allocator = MemoryAllocator.UNSAFE;
+      long defaultSize = 
Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
+      if (takenSize < defaultSize) {
+        takenSize = defaultSize;
+      }
+      takenSize = takenSize * 1024 * 1024;
     } else {
       long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
-      if (takenSize > maxMemory) {
+      if (takenSize == 0L) {
         takenSize = maxMemory;
+      } else {
+        takenSize = takenSize * 1024 * 1024;
+        if (takenSize > maxMemory) {
+          takenSize = maxMemory;
+        }
       }
       allocator = MemoryAllocator.HEAP;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6305283..559320a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1294,73 +1294,27 @@ public final class CarbonProperties {
   }
 
   private void validateSortMemorySizeInMB() {
-    int sortMemorySizeInMBDefault =
-        
Integer.parseInt(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-    int sortMemorySizeInMB = 0;
     try {
-      sortMemorySizeInMB = Integer.parseInt(
-          
carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB));
-    } catch (NumberFormatException e) {
-      LOGGER.warn(
-          "The specified value for property " + 
CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
-              + "is Invalid." + " Taking the default value."
-              + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-      sortMemorySizeInMB = sortMemorySizeInMBDefault;
-    }
-    if (sortMemorySizeInMB < sortMemorySizeInMBDefault) {
-      LOGGER.warn(
-          "The specified value for property " + 
CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
-              + "is less than default value." + ". Taking the default value."
-              + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-      sortMemorySizeInMB = sortMemorySizeInMBDefault;
-    }
-    String unsafeWorkingMemoryString =
-        
carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
-    String unsafeSortStorageMemoryString =
-        
carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB);
-    int workingMemory = 512;
-    int sortStorageMemory;
-    if (null == unsafeWorkingMemoryString && null == 
unsafeSortStorageMemoryString) {
-      workingMemory = workingMemory > ((sortMemorySizeInMB * 20) / 100) ?
-          workingMemory :
-          ((sortMemorySizeInMB * 20) / 100);
-      sortStorageMemory = sortMemorySizeInMB - workingMemory;
-      carbonProperties
-          .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
workingMemory + "");
-      
carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-          sortStorageMemory + "");
-    } else if (null != unsafeWorkingMemoryString && null == 
unsafeSortStorageMemoryString) {
+      int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties
+          
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
       
carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-          sortMemorySizeInMB + "");
-    } else if (null == unsafeWorkingMemoryString && null != 
unsafeSortStorageMemoryString) {
-      carbonProperties
-          .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
sortMemorySizeInMB + "");
+          unsafeSortStorageMemoryString + "");
+    } catch (NumberFormatException ne) {
+      LOGGER.warn("The specified value for property "
+          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + 
"is invalid.");
     }
   }
 
   private void validateWorkingMemory() {
-    int unsafeWorkingMemoryDefault =
-        
Integer.parseInt(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-    int unsafeWorkingMemory = 0;
     try {
-      unsafeWorkingMemory = Integer.parseInt(
+      int unsafeWorkingMemory = Integer.parseInt(
           
carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB));
+      carbonProperties
+          .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
unsafeWorkingMemory + "");
     } catch (NumberFormatException e) {
       LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is 
invalid."
-          + " Taking the default value."
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-      unsafeWorkingMemory = unsafeWorkingMemoryDefault;
-    }
-    if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) {
-      LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT
-          + "is less than the default value." + ". Taking the default value."
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-      unsafeWorkingMemory = unsafeWorkingMemoryDefault;
+          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is 
invalid.");
     }
-    carbonProperties
-        .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
unsafeWorkingMemory + "");
   }
 
   private void validateSortStorageMemory() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
index 29e94d8..a66ee63 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
@@ -100,6 +100,10 @@ public class BloomCacheKeyValue {
       return size;
     }
 
+    @Override public void invalidate() {
+      bloomFilters = null;
+    }
+
     public List<CarbonBloomFilter> getBloomFilters() {
       return bloomFilters;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index d970892..af05613 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
HDFSCarbonFile}
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.core.scan.expression.{Expression => 
CarbonExpression}
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, 
CarbonInputFormat}
@@ -79,6 +81,10 @@ class CarbonFileIndex(
 
   private def prune(dataFilters: Seq[Expression],
       directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
+    // set the driver flag to true which will used for unsafe memory 
initialization and carbon LRU
+    // cache instance initialization as per teh driver memory
+    CarbonProperties.getInstance
+      .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, 
"true")
     val tablePath = parameters.get("path")
     if (tablePath.nonEmpty && dataFilters.nonEmpty) {
       val hadoopConf = sparkSession.sessionState.newHadoopConf()

Reply via email to