This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 385d9ab [CARBONDATA-4088] Drop metacache didn't clear some cache
information which leads to memory leak
385d9ab is described below
commit 385d9ab63a2975a9bc4c778d2686c0efb9cf4368
Author: jack86596 <[email protected]>
AuthorDate: Thu Dec 17 17:09:56 2020 +0800
[CARBONDATA-4088] Drop metacache didn't clear some cache information which
leads to memory leak
Why is this PR needed?
When there are two spark applications, one drop a table, some cache
information of this table
stay in another application and cannot be removed with any method like
"Drop metacache" command.
This leads to memory leak. With the passage of time, memory leak will also
accumulate which
finally leads to driver OOM. Following are the leak points:
1) tableModifiedTimeStore in CarbonFileMetastore;
2) segmentLockMap in BlockletDataMapIndexStore;
3) absoluteTableIdentifierByteMap in SegmentPropertiesAndSchemaHolder;
4) tableInfoMap in CarbonMetadata.
What changes were proposed in this PR?
Using expiring map to cache the table information in CarbonMetadata and
modified time in
CarbonFileMetaStore so that stale information will be cleared automatically
after the expiration
time. Operations in BlockletDataMapIndexStore no need to be locked, remove
all the logic
related to segmentLockMap.
Does this PR introduce any user interface change?
New configuration carbon.metacache.expiration.seconds is added.
Is any new testcase added?
No
This closes #4057
---
.../core/constants/CarbonCommonConstants.java | 14 ++++++
.../core/indexstore/BlockletIndexStore.java | 52 ++++------------------
.../carbondata/core/metadata/CarbonMetadata.java | 13 ++++--
.../carbondata/core/util/CarbonProperties.java | 21 +++++++++
docs/configuration-parameters.md | 1 +
.../spark/sql/hive/CarbonFileMetastore.scala | 16 +++----
6 files changed, 61 insertions(+), 56 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c7bdfa8..991b0f6 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2557,6 +2557,20 @@ public final class CarbonCommonConstants {
" characters. Please consider long string data type.";
/**
+ * Expiration time for tableInfo cache in CarbonMetadata, after the time
configured
+ * since last access to the cache entry, tableInfo will be removed from
cache. Recent
+ * access will refresh the timer. At the time when cache is being expired,
queries on
+ * the table may fail with NullPointerException.
+ */
+ public static final String CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS =
+ "carbon.metacache.expiration.seconds";
+
+ /**
+ * By default, the cache in CarbonMetadata will not be expired by time.
+ */
+ public static final long CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT
= Long.MAX_VALUE;
+
+ /**
* property which defines the presto query
*/
@CarbonProperty public static final String IS_QUERY_FROM_PRESTO =
"is_query_from_presto";
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
index a4c7102..14d0f77 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
@@ -58,21 +57,12 @@ public class BlockletIndexStore
protected CarbonLRUCache lruCache;
/**
- * map of block info to lock object map, while loading the btree this will
be filled
- * and removed after loading the tree for that particular block info, this
will be useful
- * while loading the tree concurrently so only block level lock will be
applied another
- * block can be loaded concurrently
- */
- private Map<String, Object> segmentLockMap;
-
- /**
* constructor to initialize the SegmentTaskIndexStore
*
* @param lruCache
*/
public BlockletIndexStore(CarbonLRUCache lruCache) {
this.lruCache = lruCache;
- segmentLockMap = new ConcurrentHashMap<String, Object>();
}
@Override
@@ -288,44 +278,18 @@ public class BlockletIndexStore
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo>
blockMetaInfoMap,
CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration
configuration,
boolean serializeDmStore, List<DataFileFooter> indexInfos) throws
IOException {
- String uniqueTableSegmentIdentifier =
- identifier.getUniqueTableSegmentIdentifier();
- Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
- if (lock == null) {
- lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
- }
- BlockIndex blockIndex;
- synchronized (lock) {
- blockIndex = (BlockIndex) BlockletIndexFactory.createIndex(carbonTable);
- final BlockletIndexModel blockletIndexModel = new
BlockletIndexModel(carbonTable,
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR
+ identifier
- .getIndexFileName(),
indexFileStore.getFileData(identifier.getIndexFileName()),
- blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe,
configuration,
- serializeDmStore);
- blockletIndexModel.setIndexInfos(indexInfos);
- blockIndex.init(blockletIndexModel);
- }
+ BlockIndex blockIndex = (BlockIndex)
BlockletIndexFactory.createIndex(carbonTable);
+ final BlockletIndexModel blockletIndexModel = new
BlockletIndexModel(carbonTable,
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR +
identifier
+ .getIndexFileName(),
indexFileStore.getFileData(identifier.getIndexFileName()),
+ blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe,
configuration,
+ serializeDmStore);
+ blockletIndexModel.setIndexInfos(indexInfos);
+ blockIndex.init(blockletIndexModel);
return blockIndex;
}
/**
- * Below method will be used to get the segment level lock object
- *
- * @param uniqueIdentifier
- * @return lock object
- */
- private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
- // get the segment lock object if it is present then return
- // otherwise add the new lock and return
- Object segmentLockObject = segmentLockMap.get(uniqueIdentifier);
- if (null == segmentLockObject) {
- segmentLockObject = new Object();
- segmentLockMap.put(uniqueIdentifier, segmentLockObject);
- }
- return segmentLockObject;
- }
-
- /**
* The method clears the access count of table segments
*
* @param tableSegmentUniqueIdentifiersWrapper
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 7c7382b..c59d806 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -19,12 +19,15 @@ package org.apache.carbondata.core.metadata;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import net.jodah.expiringmap.ExpirationPolicy;
+import net.jodah.expiringmap.ExpiringMap;
/**
* Class which persist the information about the tables present the carbon
schemas
@@ -39,11 +42,13 @@ public final class CarbonMetadata {
/**
* holds the list of tableInfo currently present
*/
- private Map<String, CarbonTable> tableInfoMap;
+ private ExpiringMap<String, CarbonTable> tableInfoMap;
private CarbonMetadata() {
// creating a concurrent map as it will be updated by multiple thread
- tableInfoMap = new ConcurrentHashMap<String, CarbonTable>();
+ tableInfoMap = ExpiringMap.builder()
+
.expiration(CarbonProperties.getInstance().getMetaCacheExpirationTime(),
TimeUnit.SECONDS)
+ .expirationPolicy(ExpirationPolicy.ACCESSED).build();
}
public static CarbonMetadata getInstance() {
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a2cc314..2491a1f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2086,6 +2086,27 @@ public final class CarbonProperties {
return Boolean.parseBoolean(configuredValue);
}
+ public long getMetaCacheExpirationTime() {
+ String configuredValue = CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS);
+ if (configuredValue == null || configuredValue.equalsIgnoreCase("0")) {
+ return
CarbonCommonConstants.CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT;
+ }
+ try {
+ long expirationTime = Long.parseLong(configuredValue);
+ LOGGER.info("Value for "
+ + CarbonCommonConstants.CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS
+ " is "
+ + expirationTime + ".");
+ return expirationTime;
+ } catch (NumberFormatException e) {
+ LOGGER.warn(configuredValue + " is not a valid input for "
+ + CarbonCommonConstants.CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS
+ ", taking "
+ +
CarbonCommonConstants.CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT
+ + " as default value.");
+ return
CarbonCommonConstants.CARBON_METACACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT;
+ }
+ }
+
public boolean isSIRepairEnabled(String dbName, String tableName) {
// Check if user has enabled/disabled the use of property for the current
db and table using
// the set command
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 0baafeb..89cdfb3 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -149,6 +149,7 @@ This section provides the details of all the configurations
required for the Car
| carbon.max.pagination.lru.cache.size.in.mb | -1 | Maximum memory **(in MB)**
upto which the SDK pagination reader can cache the blocklet rows. Suggest to
configure as multiple of blocklet size. Default value of -1 means there is no
memory limit for caching. Only integer values greater than 0 are accepted. |
| carbon.partition.max.driver.lru.cache.size | -1 | Maximum memory **(in MB)**
upto which driver can cache partition metadata. Beyond this, least recently
used data will be removed from cache before loading new set of values.
| carbon.mapOrderPushDown.<db_name>_<table_name>.column| empty | If order by
column is in sort column, specify that sort column here to avoid ordering at
map task . |
+| carbon.metacache.expiration.seconds | Long.MAX_VALUE | Expiration time **(in
seconds)** for tableInfo cache in CarbonMetadata and tableModifiedTime in
CarbonFileMetastore, after the time configured since last access to the cache
entry, tableInfo and tableModifiedTime will be removed from each cache. Recent
access will refresh the timer. Default value of Long.MAX_VALUE means the cache
will not be expired by time. **NOTE:** At the time when cache is being expired,
queries on the table ma [...]
## Data Mutation Configuration
| Parameter | Default Value | Description |
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index f632189..dc32eac 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -18,10 +18,11 @@
package org.apache.spark.sql.hive
import java.io.IOException
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
+import net.jodah.expiringmap.{ExpirationPolicy, ExpiringMap}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv,
CarbonSource, EnvHelper, SparkSession}
import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias =>
SubqueryAlias}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.events.{CreateCarbonRelationPostEvent,
LookupRelationPostEvent, OperationContext, OperationListenerBus}
@@ -64,7 +65,9 @@ object MatchLogicalRelation {
private object CarbonFileMetastore {
- final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
+ final val tableModifiedTimeStore = ExpiringMap.builder()
+ .expiration(CarbonProperties.getInstance().getMetaCacheExpirationTime,
TimeUnit.SECONDS)
+ .expirationPolicy(ExpirationPolicy.ACCESSED).build[String, java.lang.Long]
def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier,
localTimeStamp: Long): Boolean = {
@@ -109,7 +112,7 @@ private object CarbonFileMetastore {
tableModifiedTimeStore.put(tableUniqueId, timeStamp)
}
- def getTableModifiedTime(tableUniqueId: String): Long = {
+ def getTableModifiedTime(tableUniqueId: String): java.lang.Long = {
tableModifiedTimeStore.get(tableUniqueId)
}
@@ -124,8 +127,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
@transient private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getName)
- final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
-
/**
* Create Carbon Relation by reading the schema file
*/
@@ -501,13 +502,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
* This method will put the updated timestamp of schema file in the table
modified time store map
*/
private def updateSchemasUpdatedTime(tableUniqueId: String, timeStamp: Long)
{
- tableModifiedTimeStore.put(tableUniqueId, timeStamp)
CarbonFileMetastore.updateTableSchemaModifiedTime(tableUniqueId, timeStamp)
}
override def isSchemaRefreshed(absoluteTableIdentifier:
AbsoluteTableIdentifier,
sparkSession: SparkSession): Boolean = {
- val localTimeStamp =
Option(tableModifiedTimeStore.get(absoluteTableIdentifier
+ val localTimeStamp =
Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier
.getCarbonTableIdentifier
.getTableId))
if (localTimeStamp.isDefined) {