This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 793f13f  [CARBONDATA-3496] Maintain the list of TableDataMap based on 
tableId instead of tableUniqueName
793f13f is described below

commit 793f13f621897b439a8e4577bd5c177599a4e882
Author: kunal642 <[email protected]>
AuthorDate: Mon Aug 26 12:21:17 2019 +0530

    [CARBONDATA-3496] Maintain the list of TableDataMap based on tableId 
instead of tableUniqueName
    
    Problem:
    TableNotFoundException is thrown from index server randomly for various 
file like schema & .carbondata.
    Through code analysis it was found that if table cache is stale in the 
index executors and a query is fired then the cache can return a TableDataMap 
objetc which is old and the carbondata file would not exist for the same.
    Solution:
    Make the cache based on tableId instead of tableUniqueName to avoid stale 
cache access.
    
    This closes #3374
---
 .../core/datamap/DataMapStoreManager.java          | 74 +++++++++++-----------
 .../apache/carbondata/core/util/SessionParams.java | 11 +---
 .../indexserver/DistributedShowCacheRDD.scala      | 27 ++++----
 .../carbondata/indexserver/IndexServer.scala       | 12 ++--
 .../command/cache/CarbonShowCacheCommand.scala     | 29 +++++----
 .../command/cache/ShowCacheEventListeners.scala    | 10 +--
 .../execution/command/CarbonHiveCommands.scala     | 12 ++++
 .../processing/datamap/DataMapWriterListener.java  |  2 +-
 8 files changed, 98 insertions(+), 79 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index f1f48fa..7ee48cc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -37,6 +37,7 @@ import 
org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import 
org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -318,23 +319,23 @@ public final class DataMapStoreManager {
    * Get the datamap for reading data.
    */
   public TableDataMap getDataMap(CarbonTable table, DataMapSchema 
dataMapSchema) {
-    String tableUniqueName =
-        
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableUniqueName();
-    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+    String tableId =
+        
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableId();
+    List<TableDataMap> tableIndices = allDataMaps.get(table.getTableId());
     if (tableIndices == null) {
       String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
       if (keyUsingTablePath != null) {
-        tableUniqueName = keyUsingTablePath;
-        tableIndices = allDataMaps.get(tableUniqueName);
+        tableId = keyUsingTablePath;
+        tableIndices = allDataMaps.get(tableId);
       }
     }
     // in case of fileformat or sdk, when table is dropped or schema is 
changed the datamaps are
     // not cleared, they need to be cleared by using API, so compare the 
columns, if not same, clear
     // the datamaps on that table
-    if (allDataMaps.size() > 0 && 
!CollectionUtils.isEmpty(allDataMaps.get(tableUniqueName))
-        && 
!allDataMaps.get(tableUniqueName).get(0).getTable().getTableInfo().getFactTable()
+    if (allDataMaps.size() > 0 && 
!CollectionUtils.isEmpty(allDataMaps.get(tableId))
+        && 
!allDataMaps.get(tableId).get(0).getTable().getTableInfo().getFactTable()
         
.getListOfColumns().equals(table.getTableInfo().getFactTable().getListOfColumns()))
 {
-      clearDataMaps(tableUniqueName);
+      clearDataMaps(table.getCarbonTableIdentifier());
       tableIndices = null;
     }
     TableDataMap dataMap = null;
@@ -342,8 +343,8 @@ public final class DataMapStoreManager {
       dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
     }
     if (dataMap == null) {
-      synchronized (tableUniqueName.intern()) {
-        tableIndices = allDataMaps.get(tableUniqueName);
+      synchronized (tableId.intern()) {
+        tableIndices = allDataMaps.get(tableId);
         if (tableIndices != null) {
           dataMap = getTableDataMap(dataMapSchema.getDataMapName(), 
tableIndices);
         }
@@ -417,12 +418,12 @@ public final class DataMapStoreManager {
     String tableUniqueName = 
table.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(table);
-    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+    List<TableDataMap> tableIndices = allDataMaps.get(table.getTableId());
     if (tableIndices == null) {
       String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
       if (keyUsingTablePath != null) {
         tableUniqueName = keyUsingTablePath;
-        tableIndices = allDataMaps.get(tableUniqueName);
+        tableIndices = allDataMaps.get(table.getTableId());
       }
     }
     if (tableIndices == null) {
@@ -441,8 +442,8 @@ public final class DataMapStoreManager {
         dataMapSchema, dataMapFactory, blockletDetailsFetcher, 
segmentPropertiesFetcher);
 
     tableIndices.add(dataMap);
-    allDataMaps.put(tableUniqueName, tableIndices);
-    tablePathMap.put(tableUniqueName, table.getTablePath());
+    allDataMaps.put(table.getTableId(), tableIndices);
+    tablePathMap.put(table.getTableId(), table.getTablePath());
     return dataMap;
   }
 
@@ -512,7 +513,7 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean 
launchJob) {
-    String tableUniqueName = 
identifier.getCarbonTableIdentifier().getTableUniqueName();
+    String tableId = identifier.getCarbonTableIdentifier().getTableId();
     if (launchJob) {
       // carbon table need to lookup only if launch job is set.
       CarbonTable carbonTable = getCarbonTable(identifier);
@@ -537,17 +538,18 @@ public final class DataMapStoreManager {
       CarbonMetadata.getInstance()
           .removeTable(identifier.getDatabaseName(), 
identifier.getTableName());
     }
-    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+    List<TableDataMap> tableIndices =
+        allDataMaps.get(identifier.getCarbonTableIdentifier().getTableId());
     if (tableIndices == null) {
       String keyUsingTablePath = 
getKeyUsingTablePath(identifier.getTablePath());
       if (keyUsingTablePath != null) {
-        tableUniqueName = keyUsingTablePath;
+        tableId = keyUsingTablePath;
       }
     }
-    segmentRefreshMap.remove(identifier.uniqueName());
-    clearDataMaps(tableUniqueName);
-    allDataMaps.remove(tableUniqueName);
-    tablePathMap.remove(tableUniqueName);
+    segmentRefreshMap.remove(tableId);
+    clearDataMaps(identifier.getCarbonTableIdentifier());
+    allDataMaps.remove(tableId);
+    tablePathMap.remove(tableId);
   }
 
   /**
@@ -575,8 +577,8 @@ public final class DataMapStoreManager {
   /**
    * this methods clears the datamap of table from memory
    */
-  public void clearDataMaps(String tableUniqName) {
-    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqName);
+  public void clearDataMaps(CarbonTableIdentifier carbonTableIdentifier) {
+    List<TableDataMap> tableIndices = 
allDataMaps.get(carbonTableIdentifier.getTableId());
     if (tableIndices != null) {
       for (TableDataMap tableDataMap : tableIndices) {
         if (tableDataMap != null) {
@@ -587,8 +589,8 @@ public final class DataMapStoreManager {
         }
       }
     }
-    allDataMaps.remove(tableUniqName);
-    tablePathMap.remove(tableUniqName);
+    allDataMaps.remove(carbonTableIdentifier.getTableId());
+    tablePathMap.remove(carbonTableIdentifier.getTableId());
   }
 
   /**
@@ -603,7 +605,7 @@ public final class DataMapStoreManager {
       // doing any further changes.
       return;
     }
-    String tableUniqueName = 
identifier.getCarbonTableIdentifier().getTableUniqueName();
+    String tableId = identifier.getCarbonTableIdentifier().getTableId();
     if (CarbonProperties.getInstance()
         .isDistributedPruningEnabled(identifier.getDatabaseName(), 
identifier.getTableName())) {
       try {
@@ -614,7 +616,7 @@ public final class DataMapStoreManager {
         // ignoring the exception
       }
     } else {
-      List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+      List<TableDataMap> tableIndices = allDataMaps.get(tableId);
       if (tableIndices != null) {
         int i = 0;
         for (TableDataMap tableDataMap : tableIndices) {
@@ -634,7 +636,7 @@ public final class DataMapStoreManager {
           }
           i++;
         }
-        allDataMaps.put(tableUniqueName, tableIndices);
+        allDataMaps.put(tableId, tableIndices);
       }
     }
   }
@@ -643,8 +645,8 @@ public final class DataMapStoreManager {
    * is datamap exist
    * @return true if exist, else return false
    */
-  public boolean isDataMapExist(String dbName, String tableName, String 
dmName) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(dbName + '_' + 
tableName);
+  public boolean isDataMapExist(String tableId, String dmName) {
+    List<TableDataMap> tableDataMaps = allDataMaps.get(tableId);
     if (tableDataMaps != null) {
       for (TableDataMap dm : tableDataMaps) {
         if (dm != null && 
dmName.equalsIgnoreCase(dm.getDataMapSchema().getDataMapName())) {
@@ -679,11 +681,11 @@ public final class DataMapStoreManager {
    * Get the TableSegmentRefresher for the table. If not existed then add one 
and return.
    */
   public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) {
-    String uniqueName = table.getAbsoluteTableIdentifier().uniqueName();
-    if (segmentRefreshMap.get(uniqueName) == null) {
-      segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(table));
+    String tableId = 
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableId();
+    if (segmentRefreshMap.get(tableId) == null) {
+      segmentRefreshMap.put(tableId, new TableSegmentRefresher(table));
     }
-    return segmentRefreshMap.get(uniqueName);
+    return segmentRefreshMap.get(tableId);
   }
 
   /**
@@ -771,9 +773,9 @@ public final class DataMapStoreManager {
           remainingDataMaps.add(tableDataMap);
         }
       }
-      getAllDataMaps().put(carbonTable.getTableUniqueName(), 
remainingDataMaps);
+      allDataMaps.put(carbonTable.getTableId(), remainingDataMaps);
     } else {
-      clearDataMaps(carbonTable.getTableUniqueName());
+      clearDataMaps(carbonTable.getCarbonTableIdentifier());
       // clear the segment properties cache from executor
       SegmentPropertiesAndSchemaHolder.getInstance()
           .invalidate(carbonTable.getAbsoluteTableIdentifier());
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 93ee346..d2a8159 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.*;
@@ -218,15 +217,7 @@ public class SessionParams implements Serializable, 
Cloneable {
         } else if 
(key.startsWith(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING)) {
           isValid = true;
         } else if 
(key.startsWith(CarbonCommonConstants.CARBON_DATAMAP_VISIBLE)) {
-          String[] keyArray = key.split("\\.");
-          isValid = DataMapStoreManager.getInstance().isDataMapExist(
-              keyArray[keyArray.length - 3],
-              keyArray[keyArray.length - 2],
-              keyArray[keyArray.length - 1]);
-          if (!isValid) {
-            throw new InvalidConfigurationException(
-                String.format("Invalid configuration of %s, datamap does not 
exist", key));
-          }
+          isValid = true;
         } else if 
(key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
           isValid = CarbonUtil.validateBoolean(value);
           if (!isValid) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
index f1707c6..2c9721d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -27,15 +27,16 @@ import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactor
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.spark.rdd.CarbonRDD
 
-class DistributedShowCacheRDD(@transient private val ss: SparkSession, 
tableName: String)
+class DistributedShowCacheRDD(@transient private val ss: SparkSession, 
tableUniqueId: String)
   extends CarbonRDD[String](ss, Nil) {
 
-  val executorsList: Array[String] = 
DistributionUtil.getExecutors(ss.sparkContext).flatMap {
-    case (host, executors) =>
-      executors.map {
-        executor => s"executor_${host}_$executor"
-      }
-  }.toArray
+  val executorsList: Array[String] = DistributionUtil
+    .getExecutors(ss.sparkContext).flatMap {
+      case (host, executors) =>
+        executors.map { executor =>
+          s"executor_${ host }_$executor"
+        }
+    }.toArray
 
   override protected def getPreferredLocations(split: Partition): Seq[String] 
= {
     if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
@@ -57,19 +58,23 @@ class DistributedShowCacheRDD(@transient private val ss: 
SparkSession, tableName
 
   override def internalCompute(split: Partition, context: TaskContext): 
Iterator[String] = {
     val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
-    val tableList = tableName.split(",").map(_.replace("-", "_"))
+    val tableList = tableUniqueId.split(",")
     val iterator = dataMaps.collect {
-      case (table, tableDataMaps) if tableName.isEmpty || 
tableList.contains(table) =>
+      case (tableId, tableDataMaps) if tableUniqueId.isEmpty || 
tableList.contains(tableId) =>
         val sizeAndIndexLengths = tableDataMaps.asScala
           .map { dataMap =>
             val dataMapName = if 
(dataMap.getDataMapFactory.isInstanceOf[BlockletDataMapFactory]) {
-              table
+              dataMap
+                .getDataMapFactory
+                .asInstanceOf[BlockletDataMapFactory]
+                .getCarbonTable
+                .getTableUniqueName
             } else {
               dataMap.getDataMapSchema.getRelationIdentifier.getDatabaseName + 
"_" + dataMap
               .getDataMapSchema.getDataMapName
             }
             s"${ dataMapName }:${ dataMap.getDataMapFactory.getCacheSize }:${
-              dataMap.getDataMapSchema.getProviderName} "
+              dataMap.getDataMapSchema.getProviderName}"
           }
         sizeAndIndexLengths
     }.flatten.toIterator
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index abee487..75af667 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -52,7 +52,7 @@ trait ServerInterface {
   /**
    * Get the cache size for the specified tables.
    */
-  def showCache(tableNames: String) : Array[String]
+  def showCache(tableIds: String) : Array[String]
 
   /**
    * Invalidate the cache for the specified segments only. Used in case of 
compaction/Update/Delete.
@@ -157,14 +157,14 @@ object IndexServer extends ServerInterface {
     }
   }
 
-  override def showCache(tableName: String = ""): Array[String] = doAs {
-    val jobgroup: String = "Show Cache for " + (tableName match {
+  override def showCache(tableId: String = ""): Array[String] = doAs {
+    val jobgroup: String = "Show Cache " + (tableId match {
       case "" => "for all tables"
       case table => s"for $table"
     })
     sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
UUID.randomUUID().toString)
     sparkSession.sparkContext.setLocalProperty("spark.job.description", 
jobgroup)
-    new DistributedShowCacheRDD(sparkSession, tableName).collect()
+    new DistributedShowCacheRDD(sparkSession, tableId).collect()
   }
 
   def main(args: Array[String]): Unit = {
@@ -206,6 +206,10 @@ object IndexServer extends ServerInterface {
       .getOrCreateCarbonSession(CarbonProperties.getStorePath)
     SparkSession.setActiveSession(spark)
     SparkSession.setDefaultSession(spark)
+    if (spark.sparkContext.getConf
+      .get("spark.dynamicAllocation.enabled", 
"false").equalsIgnoreCase("true")) {
+      throw new RuntimeException("Index server is not supported with dynamic 
allocation enabled")
+    }
     spark
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 4b7f680..f293f10 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -213,11 +213,10 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
 
   def getTableCacheFromDriver(sparkSession: SparkSession, carbonTable: 
CarbonTable,
       numOfIndexFiles: Int = 0): Seq[Row] = {
-    val cache = CacheProvider.getInstance().getCarbonCache
-    if (cache != null) {
+    if (CacheProvider.getInstance().getCarbonCache != null) {
       val childTableList = getChildTableList(carbonTable)(sparkSession)
       val (parentMetaCacheInfo, dataMapCacheInfo) = 
collectDriverMetaCacheInfo(carbonTable
-        .getTableUniqueName) match {
+        .getTableUniqueName, carbonTable.getTableId) match {
         case list =>
           val parentCache = list
             
.filter(_._4.equalsIgnoreCase(BlockletDataMapFactory.DATA_MAP_SCHEMA
@@ -237,7 +236,8 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
           val tableArray = childTable._1.split("-")
           val dbName = tableArray(0)
           val tableName = tableArray(1)
-          val childMetaCacheInfo = 
collectDriverMetaCacheInfo(s"${dbName}_$tableName")
+          val tableId = childTable._3
+          val childMetaCacheInfo = collectDriverMetaCacheInfo(s"${ dbName 
}_$tableName", tableId)
           childMetaCacheInfo.collect {
             case childMeta if childMeta._3 != 0 =>
               Row(childMeta._1, childMeta._3, 0L, childTable._2)
@@ -299,7 +299,7 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
     (sparkSession: SparkSession): Seq[Row] = {
     val childTables = getChildTableList(mainTable)(sparkSession)
     val cache = if (tableIdentifier.nonEmpty) {
-      executeJobToGetCache(childTables.map(_._1) ++ 
List(mainTable.getTableUniqueName))
+      executeJobToGetCache(childTables.map(_._3) ++ List(mainTable.getTableId))
     } else {
       cacheResult
     }
@@ -335,11 +335,11 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
 
   }
 
-  private def executeJobToGetCache(tableUniqueNames: List[String]): 
Seq[(String, Int, Long,
+  private def executeJobToGetCache(tableIds: List[String]): Seq[(String, Int, 
Long,
     String)] = {
     try {
       val (result, time) = CarbonScalaUtil.logTime {
-        
IndexServer.getClient.showCache(tableUniqueNames.mkString(",")).map(_.split(":"))
+        
IndexServer.getClient.showCache(tableIds.mkString(",")).map(_.split(":"))
           .groupBy(_.head).map { t =>
           var sum = 0L
           var length = 0
@@ -369,14 +369,14 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
   }
 
   private def getChildTableList(carbonTable: CarbonTable)
-    (sparkSession: SparkSession): List[(String, String)] = {
+    (sparkSession: SparkSession): List[(String, String, String)] = {
     val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, 
internalCall)
     val operationContext = new OperationContext
     // datamapName -> (datamapProviderName, indexSize, datamapSize)
     operationContext.setProperty(carbonTable.getTableUniqueName, List())
     OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, 
operationContext)
     operationContext.getProperty(carbonTable.getTableUniqueName)
-      .asInstanceOf[List[(String, String)]]
+      .asInstanceOf[List[(String, String, String)]]
   }
 
   private def getDictionarySize(carbonTable: CarbonTable)(sparkSession: 
SparkSession): Long = {
@@ -412,15 +412,18 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
     (allIndexSize, allDatamapSize, allDictSize)
   }
 
-  private def collectDriverMetaCacheInfo(tableName: String): List[(String, 
Int, Long, String)] = {
+  private def collectDriverMetaCacheInfo(tableName: String,
+      tableId: String): List[(String, Int, Long, String)] = {
     val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
     dataMaps.collect {
       case (table, tableDataMaps) if table.isEmpty ||
-                                     (tableName.nonEmpty && 
tableName.equalsIgnoreCase(table)) =>
+                                     (tableId.nonEmpty && 
tableId.equalsIgnoreCase(table)) =>
         val sizeAndIndexLengths = tableDataMaps.asScala
-          .map { dataMap =>
+          .collect { case dataMap if
+          dataMap.getDataMapSchema.getDataMapName.equalsIgnoreCase(tableName) 
||
+          
dataMap.getDataMapFactory.getCarbonTable.getTableUniqueName.equalsIgnoreCase(tableName)
 =>
             if 
(dataMap.getDataMapFactory.isInstanceOf[BlockletDataMapFactory]) {
-              s"$table:${ dataMap.getDataMapFactory.getCacheSize }:${
+              s"$tableName:${ dataMap.getDataMapFactory.getCacheSize }:${
                 dataMap.getDataMapSchema.getProviderName}"
             } else {
               s"${ dataMap.getDataMapSchema.getDataMapName }:${
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
index 55d8313..409ff4f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
@@ -54,7 +54,7 @@ object ShowCachePreAggEventListener extends 
OperationEventListener {
             case childSchema if childSchema.getRelationIdentifier != null =>
               (s"${ childSchema.getRelationIdentifier.getDatabaseName }-${
                 childSchema.getRelationIdentifier.getTableName
-              }", childSchema.getProviderName)
+              }", childSchema.getProviderName, 
childSchema.getRelationIdentifier.getTableId)
           }.toList ++ childTables)
         }
     }
@@ -92,16 +92,18 @@ object ShowCacheDataMapEventListener extends 
OperationEventListener {
   }
 
   private def filterDataMaps(dataMaps: List[DataMapSchema],
-      filter: String): List[(String, String)] = {
+      filter: String): List[(String, String, String)] = {
     dataMaps.collect {
       case dataMap if dataMap.getProviderName
         .equalsIgnoreCase(filter) =>
         if 
(filter.equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName)) {
           (s"${ dataMap.getRelationIdentifier.getDatabaseName }-${
-            dataMap.getDataMapName}", dataMap.getProviderName)
+            dataMap.getDataMapName}", dataMap.getProviderName,
+            dataMap.getRelationIdentifier.getTableId)
         } else {
           (s"${ dataMap.getRelationIdentifier.getDatabaseName }-${
-            dataMap.getRelationIdentifier.getTableName}", 
dataMap.getProviderName)
+            dataMap.getRelationIdentifier.getTableName}", 
dataMap.getProviderName,
+            dataMap.getRelationIdentifier.getTableId)
         }
     }
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index d1d3992..0d2a4c3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -23,9 +23,12 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonCommonConstantsInternal, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
SessionParams}
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -99,6 +102,15 @@ object CarbonSetCommand {
       sessionParams.addProperty(key.toLowerCase(), value)
     } else if (key.startsWith(CarbonCommonConstants.CARBON_DATAMAP_VISIBLE)) {
       if (key.split("\\.").length == 6) {
+        val keyArray = key.split("\\.")
+        val dbName = keyArray(keyArray.length - 3)
+        val tableName = keyArray(keyArray.length - 2)
+        val table = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(SparkSQLUtil.getSparkSession)
+        val isValid = DataMapStoreManager.getInstance
+          .isDataMapExist(table.getTableId, keyArray(keyArray.length - 1))
+        if (!isValid) throw new InvalidConfigurationException(String.format(
+          "Invalid configuration of %s, datamap does not exist",
+          key))
         sessionParams.addProperty(key.toLowerCase, value)
       } else {
         throw new MalformedCarbonCommandException("property should be in " +
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index be00480..54a2b71 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -64,7 +64,7 @@ public class DataMapWriterListener {
       String taskNo, SegmentProperties segmentProperties) {
     // clear cache in executor side
     DataMapStoreManager.getInstance()
-        
.clearDataMaps(carbonTable.getCarbonTableIdentifier().getTableUniqueName());
+        .clearDataMaps(carbonTable.getCarbonTableIdentifier());
     List<TableDataMap> tableIndices;
     try {
       tableIndices = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable);

Reply via email to