This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 793f13f [CARBONDATA-3496] Maintain the list of TableDataMap based on
tableId instead of tableUniqueName
793f13f is described below
commit 793f13f621897b439a8e4577bd5c177599a4e882
Author: kunal642 <[email protected]>
AuthorDate: Mon Aug 26 12:21:17 2019 +0530
[CARBONDATA-3496] Maintain the list of TableDataMap based on tableId
instead of tableUniqueName
Problem:
TableNotFoundException is thrown from index server randomly for various
file like schema & .carbondata.
Through code analysis it was found that if table cache is stale in the
index executors and a query is fired then the cache can return a TableDataMap
objetc which is old and the carbondata file would not exist for the same.
Solution:
Make the cache based on tableId instead of tableUniqueName to avoid stale
cache access.
This closes #3374
---
.../core/datamap/DataMapStoreManager.java | 74 +++++++++++-----------
.../apache/carbondata/core/util/SessionParams.java | 11 +---
.../indexserver/DistributedShowCacheRDD.scala | 27 ++++----
.../carbondata/indexserver/IndexServer.scala | 12 ++--
.../command/cache/CarbonShowCacheCommand.scala | 29 +++++----
.../command/cache/ShowCacheEventListeners.scala | 10 +--
.../execution/command/CarbonHiveCommands.scala | 12 ++++
.../processing/datamap/DataMapWriterListener.java | 2 +-
8 files changed, 98 insertions(+), 79 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index f1f48fa..7ee48cc 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -37,6 +37,7 @@ import
org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import
org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -318,23 +319,23 @@ public final class DataMapStoreManager {
* Get the datamap for reading data.
*/
public TableDataMap getDataMap(CarbonTable table, DataMapSchema
dataMapSchema) {
- String tableUniqueName =
-
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableUniqueName();
- List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+ String tableId =
+
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableId();
+ List<TableDataMap> tableIndices = allDataMaps.get(table.getTableId());
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
if (keyUsingTablePath != null) {
- tableUniqueName = keyUsingTablePath;
- tableIndices = allDataMaps.get(tableUniqueName);
+ tableId = keyUsingTablePath;
+ tableIndices = allDataMaps.get(tableId);
}
}
// in case of fileformat or sdk, when table is dropped or schema is
changed the datamaps are
// not cleared, they need to be cleared by using API, so compare the
columns, if not same, clear
// the datamaps on that table
- if (allDataMaps.size() > 0 &&
!CollectionUtils.isEmpty(allDataMaps.get(tableUniqueName))
- &&
!allDataMaps.get(tableUniqueName).get(0).getTable().getTableInfo().getFactTable()
+ if (allDataMaps.size() > 0 &&
!CollectionUtils.isEmpty(allDataMaps.get(tableId))
+ &&
!allDataMaps.get(tableId).get(0).getTable().getTableInfo().getFactTable()
.getListOfColumns().equals(table.getTableInfo().getFactTable().getListOfColumns()))
{
- clearDataMaps(tableUniqueName);
+ clearDataMaps(table.getCarbonTableIdentifier());
tableIndices = null;
}
TableDataMap dataMap = null;
@@ -342,8 +343,8 @@ public final class DataMapStoreManager {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
}
if (dataMap == null) {
- synchronized (tableUniqueName.intern()) {
- tableIndices = allDataMaps.get(tableUniqueName);
+ synchronized (tableId.intern()) {
+ tableIndices = allDataMaps.get(tableId);
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(),
tableIndices);
}
@@ -417,12 +418,12 @@ public final class DataMapStoreManager {
String tableUniqueName =
table.getCarbonTableIdentifier().getTableUniqueName();
// Just update the segmentRefreshMap with the table if not added.
getTableSegmentRefresher(table);
- List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+ List<TableDataMap> tableIndices = allDataMaps.get(table.getTableId());
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
- tableIndices = allDataMaps.get(tableUniqueName);
+ tableIndices = allDataMaps.get(table.getTableId());
}
}
if (tableIndices == null) {
@@ -441,8 +442,8 @@ public final class DataMapStoreManager {
dataMapSchema, dataMapFactory, blockletDetailsFetcher,
segmentPropertiesFetcher);
tableIndices.add(dataMap);
- allDataMaps.put(tableUniqueName, tableIndices);
- tablePathMap.put(tableUniqueName, table.getTablePath());
+ allDataMaps.put(table.getTableId(), tableIndices);
+ tablePathMap.put(table.getTableId(), table.getTablePath());
return dataMap;
}
@@ -512,7 +513,7 @@ public final class DataMapStoreManager {
* @param identifier Table identifier
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean
launchJob) {
- String tableUniqueName =
identifier.getCarbonTableIdentifier().getTableUniqueName();
+ String tableId = identifier.getCarbonTableIdentifier().getTableId();
if (launchJob) {
// carbon table need to lookup only if launch job is set.
CarbonTable carbonTable = getCarbonTable(identifier);
@@ -537,17 +538,18 @@ public final class DataMapStoreManager {
CarbonMetadata.getInstance()
.removeTable(identifier.getDatabaseName(),
identifier.getTableName());
}
- List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+ List<TableDataMap> tableIndices =
+ allDataMaps.get(identifier.getCarbonTableIdentifier().getTableId());
if (tableIndices == null) {
String keyUsingTablePath =
getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
- tableUniqueName = keyUsingTablePath;
+ tableId = keyUsingTablePath;
}
}
- segmentRefreshMap.remove(identifier.uniqueName());
- clearDataMaps(tableUniqueName);
- allDataMaps.remove(tableUniqueName);
- tablePathMap.remove(tableUniqueName);
+ segmentRefreshMap.remove(tableId);
+ clearDataMaps(identifier.getCarbonTableIdentifier());
+ allDataMaps.remove(tableId);
+ tablePathMap.remove(tableId);
}
/**
@@ -575,8 +577,8 @@ public final class DataMapStoreManager {
/**
* this methods clears the datamap of table from memory
*/
- public void clearDataMaps(String tableUniqName) {
- List<TableDataMap> tableIndices = allDataMaps.get(tableUniqName);
+ public void clearDataMaps(CarbonTableIdentifier carbonTableIdentifier) {
+ List<TableDataMap> tableIndices =
allDataMaps.get(carbonTableIdentifier.getTableId());
if (tableIndices != null) {
for (TableDataMap tableDataMap : tableIndices) {
if (tableDataMap != null) {
@@ -587,8 +589,8 @@ public final class DataMapStoreManager {
}
}
}
- allDataMaps.remove(tableUniqName);
- tablePathMap.remove(tableUniqName);
+ allDataMaps.remove(carbonTableIdentifier.getTableId());
+ tablePathMap.remove(carbonTableIdentifier.getTableId());
}
/**
@@ -603,7 +605,7 @@ public final class DataMapStoreManager {
// doing any further changes.
return;
}
- String tableUniqueName =
identifier.getCarbonTableIdentifier().getTableUniqueName();
+ String tableId = identifier.getCarbonTableIdentifier().getTableId();
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(),
identifier.getTableName())) {
try {
@@ -614,7 +616,7 @@ public final class DataMapStoreManager {
// ignoring the exception
}
} else {
- List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+ List<TableDataMap> tableIndices = allDataMaps.get(tableId);
if (tableIndices != null) {
int i = 0;
for (TableDataMap tableDataMap : tableIndices) {
@@ -634,7 +636,7 @@ public final class DataMapStoreManager {
}
i++;
}
- allDataMaps.put(tableUniqueName, tableIndices);
+ allDataMaps.put(tableId, tableIndices);
}
}
}
@@ -643,8 +645,8 @@ public final class DataMapStoreManager {
* is datamap exist
* @return true if exist, else return false
*/
- public boolean isDataMapExist(String dbName, String tableName, String
dmName) {
- List<TableDataMap> tableDataMaps = allDataMaps.get(dbName + '_' +
tableName);
+ public boolean isDataMapExist(String tableId, String dmName) {
+ List<TableDataMap> tableDataMaps = allDataMaps.get(tableId);
if (tableDataMaps != null) {
for (TableDataMap dm : tableDataMaps) {
if (dm != null &&
dmName.equalsIgnoreCase(dm.getDataMapSchema().getDataMapName())) {
@@ -679,11 +681,11 @@ public final class DataMapStoreManager {
* Get the TableSegmentRefresher for the table. If not existed then add one
and return.
*/
public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) {
- String uniqueName = table.getAbsoluteTableIdentifier().uniqueName();
- if (segmentRefreshMap.get(uniqueName) == null) {
- segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(table));
+ String tableId =
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableId();
+ if (segmentRefreshMap.get(tableId) == null) {
+ segmentRefreshMap.put(tableId, new TableSegmentRefresher(table));
}
- return segmentRefreshMap.get(uniqueName);
+ return segmentRefreshMap.get(tableId);
}
/**
@@ -771,9 +773,9 @@ public final class DataMapStoreManager {
remainingDataMaps.add(tableDataMap);
}
}
- getAllDataMaps().put(carbonTable.getTableUniqueName(),
remainingDataMaps);
+ allDataMaps.put(carbonTable.getTableId(), remainingDataMaps);
} else {
- clearDataMaps(carbonTable.getTableUniqueName());
+ clearDataMaps(carbonTable.getCarbonTableIdentifier());
// clear the segment properties cache from executor
SegmentPropertiesAndSchemaHolder.getInstance()
.invalidate(carbonTable.getAbsoluteTableIdentifier());
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 93ee346..d2a8159 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.*;
@@ -218,15 +217,7 @@ public class SessionParams implements Serializable,
Cloneable {
} else if
(key.startsWith(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING)) {
isValid = true;
} else if
(key.startsWith(CarbonCommonConstants.CARBON_DATAMAP_VISIBLE)) {
- String[] keyArray = key.split("\\.");
- isValid = DataMapStoreManager.getInstance().isDataMapExist(
- keyArray[keyArray.length - 3],
- keyArray[keyArray.length - 2],
- keyArray[keyArray.length - 1]);
- if (!isValid) {
- throw new InvalidConfigurationException(
- String.format("Invalid configuration of %s, datamap does not
exist", key));
- }
+ isValid = true;
} else if
(key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
isValid = CarbonUtil.validateBoolean(value);
if (!isValid) {
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
index f1707c6..2c9721d 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -27,15 +27,16 @@ import
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactor
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.spark.rdd.CarbonRDD
-class DistributedShowCacheRDD(@transient private val ss: SparkSession,
tableName: String)
+class DistributedShowCacheRDD(@transient private val ss: SparkSession,
tableUniqueId: String)
extends CarbonRDD[String](ss, Nil) {
- val executorsList: Array[String] =
DistributionUtil.getExecutors(ss.sparkContext).flatMap {
- case (host, executors) =>
- executors.map {
- executor => s"executor_${host}_$executor"
- }
- }.toArray
+ val executorsList: Array[String] = DistributionUtil
+ .getExecutors(ss.sparkContext).flatMap {
+ case (host, executors) =>
+ executors.map { executor =>
+ s"executor_${ host }_$executor"
+ }
+ }.toArray
override protected def getPreferredLocations(split: Partition): Seq[String]
= {
if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
@@ -57,19 +58,23 @@ class DistributedShowCacheRDD(@transient private val ss:
SparkSession, tableName
override def internalCompute(split: Partition, context: TaskContext):
Iterator[String] = {
val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
- val tableList = tableName.split(",").map(_.replace("-", "_"))
+ val tableList = tableUniqueId.split(",")
val iterator = dataMaps.collect {
- case (table, tableDataMaps) if tableName.isEmpty ||
tableList.contains(table) =>
+ case (tableId, tableDataMaps) if tableUniqueId.isEmpty ||
tableList.contains(tableId) =>
val sizeAndIndexLengths = tableDataMaps.asScala
.map { dataMap =>
val dataMapName = if
(dataMap.getDataMapFactory.isInstanceOf[BlockletDataMapFactory]) {
- table
+ dataMap
+ .getDataMapFactory
+ .asInstanceOf[BlockletDataMapFactory]
+ .getCarbonTable
+ .getTableUniqueName
} else {
dataMap.getDataMapSchema.getRelationIdentifier.getDatabaseName +
"_" + dataMap
.getDataMapSchema.getDataMapName
}
s"${ dataMapName }:${ dataMap.getDataMapFactory.getCacheSize }:${
- dataMap.getDataMapSchema.getProviderName} "
+ dataMap.getDataMapSchema.getProviderName}"
}
sizeAndIndexLengths
}.flatten.toIterator
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index abee487..75af667 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -52,7 +52,7 @@ trait ServerInterface {
/**
* Get the cache size for the specified tables.
*/
- def showCache(tableNames: String) : Array[String]
+ def showCache(tableIds: String) : Array[String]
/**
* Invalidate the cache for the specified segments only. Used in case of
compaction/Update/Delete.
@@ -157,14 +157,14 @@ object IndexServer extends ServerInterface {
}
}
- override def showCache(tableName: String = ""): Array[String] = doAs {
- val jobgroup: String = "Show Cache for " + (tableName match {
+ override def showCache(tableId: String = ""): Array[String] = doAs {
+ val jobgroup: String = "Show Cache " + (tableId match {
case "" => "for all tables"
case table => s"for $table"
})
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id",
UUID.randomUUID().toString)
sparkSession.sparkContext.setLocalProperty("spark.job.description",
jobgroup)
- new DistributedShowCacheRDD(sparkSession, tableName).collect()
+ new DistributedShowCacheRDD(sparkSession, tableId).collect()
}
def main(args: Array[String]): Unit = {
@@ -206,6 +206,10 @@ object IndexServer extends ServerInterface {
.getOrCreateCarbonSession(CarbonProperties.getStorePath)
SparkSession.setActiveSession(spark)
SparkSession.setDefaultSession(spark)
+ if (spark.sparkContext.getConf
+ .get("spark.dynamicAllocation.enabled",
"false").equalsIgnoreCase("true")) {
+ throw new RuntimeException("Index server is not supported with dynamic
allocation enabled")
+ }
spark
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 4b7f680..f293f10 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -213,11 +213,10 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
def getTableCacheFromDriver(sparkSession: SparkSession, carbonTable:
CarbonTable,
numOfIndexFiles: Int = 0): Seq[Row] = {
- val cache = CacheProvider.getInstance().getCarbonCache
- if (cache != null) {
+ if (CacheProvider.getInstance().getCarbonCache != null) {
val childTableList = getChildTableList(carbonTable)(sparkSession)
val (parentMetaCacheInfo, dataMapCacheInfo) =
collectDriverMetaCacheInfo(carbonTable
- .getTableUniqueName) match {
+ .getTableUniqueName, carbonTable.getTableId) match {
case list =>
val parentCache = list
.filter(_._4.equalsIgnoreCase(BlockletDataMapFactory.DATA_MAP_SCHEMA
@@ -237,7 +236,8 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
val tableArray = childTable._1.split("-")
val dbName = tableArray(0)
val tableName = tableArray(1)
- val childMetaCacheInfo =
collectDriverMetaCacheInfo(s"${dbName}_$tableName")
+ val tableId = childTable._3
+ val childMetaCacheInfo = collectDriverMetaCacheInfo(s"${ dbName
}_$tableName", tableId)
childMetaCacheInfo.collect {
case childMeta if childMeta._3 != 0 =>
Row(childMeta._1, childMeta._3, 0L, childTable._2)
@@ -299,7 +299,7 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
(sparkSession: SparkSession): Seq[Row] = {
val childTables = getChildTableList(mainTable)(sparkSession)
val cache = if (tableIdentifier.nonEmpty) {
- executeJobToGetCache(childTables.map(_._1) ++
List(mainTable.getTableUniqueName))
+ executeJobToGetCache(childTables.map(_._3) ++ List(mainTable.getTableId))
} else {
cacheResult
}
@@ -335,11 +335,11 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
}
- private def executeJobToGetCache(tableUniqueNames: List[String]):
Seq[(String, Int, Long,
+ private def executeJobToGetCache(tableIds: List[String]): Seq[(String, Int,
Long,
String)] = {
try {
val (result, time) = CarbonScalaUtil.logTime {
-
IndexServer.getClient.showCache(tableUniqueNames.mkString(",")).map(_.split(":"))
+
IndexServer.getClient.showCache(tableIds.mkString(",")).map(_.split(":"))
.groupBy(_.head).map { t =>
var sum = 0L
var length = 0
@@ -369,14 +369,14 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
}
private def getChildTableList(carbonTable: CarbonTable)
- (sparkSession: SparkSession): List[(String, String)] = {
+ (sparkSession: SparkSession): List[(String, String, String)] = {
val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession,
internalCall)
val operationContext = new OperationContext
// datamapName -> (datamapProviderName, indexSize, datamapSize)
operationContext.setProperty(carbonTable.getTableUniqueName, List())
OperationListenerBus.getInstance.fireEvent(showTableCacheEvent,
operationContext)
operationContext.getProperty(carbonTable.getTableUniqueName)
- .asInstanceOf[List[(String, String)]]
+ .asInstanceOf[List[(String, String, String)]]
}
private def getDictionarySize(carbonTable: CarbonTable)(sparkSession:
SparkSession): Long = {
@@ -412,15 +412,18 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
(allIndexSize, allDatamapSize, allDictSize)
}
- private def collectDriverMetaCacheInfo(tableName: String): List[(String,
Int, Long, String)] = {
+ private def collectDriverMetaCacheInfo(tableName: String,
+ tableId: String): List[(String, Int, Long, String)] = {
val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
dataMaps.collect {
case (table, tableDataMaps) if table.isEmpty ||
- (tableName.nonEmpty &&
tableName.equalsIgnoreCase(table)) =>
+ (tableId.nonEmpty &&
tableId.equalsIgnoreCase(table)) =>
val sizeAndIndexLengths = tableDataMaps.asScala
- .map { dataMap =>
+ .collect { case dataMap if
+ dataMap.getDataMapSchema.getDataMapName.equalsIgnoreCase(tableName)
||
+
dataMap.getDataMapFactory.getCarbonTable.getTableUniqueName.equalsIgnoreCase(tableName)
=>
if
(dataMap.getDataMapFactory.isInstanceOf[BlockletDataMapFactory]) {
- s"$table:${ dataMap.getDataMapFactory.getCacheSize }:${
+ s"$tableName:${ dataMap.getDataMapFactory.getCacheSize }:${
dataMap.getDataMapSchema.getProviderName}"
} else {
s"${ dataMap.getDataMapSchema.getDataMapName }:${
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
index 55d8313..409ff4f 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
@@ -54,7 +54,7 @@ object ShowCachePreAggEventListener extends
OperationEventListener {
case childSchema if childSchema.getRelationIdentifier != null =>
(s"${ childSchema.getRelationIdentifier.getDatabaseName }-${
childSchema.getRelationIdentifier.getTableName
- }", childSchema.getProviderName)
+ }", childSchema.getProviderName,
childSchema.getRelationIdentifier.getTableId)
}.toList ++ childTables)
}
}
@@ -92,16 +92,18 @@ object ShowCacheDataMapEventListener extends
OperationEventListener {
}
private def filterDataMaps(dataMaps: List[DataMapSchema],
- filter: String): List[(String, String)] = {
+ filter: String): List[(String, String, String)] = {
dataMaps.collect {
case dataMap if dataMap.getProviderName
.equalsIgnoreCase(filter) =>
if
(filter.equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName)) {
(s"${ dataMap.getRelationIdentifier.getDatabaseName }-${
- dataMap.getDataMapName}", dataMap.getProviderName)
+ dataMap.getDataMapName}", dataMap.getProviderName,
+ dataMap.getRelationIdentifier.getTableId)
} else {
(s"${ dataMap.getRelationIdentifier.getDatabaseName }-${
- dataMap.getRelationIdentifier.getTableName}",
dataMap.getProviderName)
+ dataMap.getRelationIdentifier.getTableName}",
dataMap.getProviderName,
+ dataMap.getRelationIdentifier.getTableId)
}
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index d1d3992..0d2a4c3 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -23,9 +23,12 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.spark.sql.util.SparkSQLUtil
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonCommonConstantsInternal, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
SessionParams}
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -99,6 +102,15 @@ object CarbonSetCommand {
sessionParams.addProperty(key.toLowerCase(), value)
} else if (key.startsWith(CarbonCommonConstants.CARBON_DATAMAP_VISIBLE)) {
if (key.split("\\.").length == 6) {
+ val keyArray = key.split("\\.")
+ val dbName = keyArray(keyArray.length - 3)
+ val tableName = keyArray(keyArray.length - 2)
+ val table = CarbonEnv.getCarbonTable(Some(dbName),
tableName)(SparkSQLUtil.getSparkSession)
+ val isValid = DataMapStoreManager.getInstance
+ .isDataMapExist(table.getTableId, keyArray(keyArray.length - 1))
+ if (!isValid) throw new InvalidConfigurationException(String.format(
+ "Invalid configuration of %s, datamap does not exist",
+ key))
sessionParams.addProperty(key.toLowerCase, value)
} else {
throw new MalformedCarbonCommandException("property should be in " +
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index be00480..54a2b71 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -64,7 +64,7 @@ public class DataMapWriterListener {
String taskNo, SegmentProperties segmentProperties) {
// clear cache in executor side
DataMapStoreManager.getInstance()
-
.clearDataMaps(carbonTable.getCarbonTableIdentifier().getTableUniqueName());
+ .clearDataMaps(carbonTable.getCarbonTableIdentifier());
List<TableDataMap> tableIndices;
try {
tableIndices =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable);