[CARBONDATA-2081] refresh cache across different session issue and schema after 
rename is not being updated are fixed

Scenario: 1

open spark-sql and beeline.
create main table in spark-sql
create preaggreagate table in beeline.
drop main table in spark-sql.
perform 'show tables' operation . PreAggregate table is still not deleted.
Scenario: 2

perform following operation in same session:
create table t5 (c1 string, c2 int) stored by 'carbondata'
insert into t5 select 'asd',1
alter table t5 rename to t6
create table t5 (c1 string, c2 int,c3 string) stored by 'carbondata'
insert into t5 select 'asd',1,'sdf' (query is failing)

This closes #1862


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c55240d5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c55240d5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c55240d5

Branch: refs/heads/carbonstore
Commit: c55240d58ad50e718a89d7bd06cfa229ba1ba82b
Parents: 3a6136d
Author: rahulforallp <[email protected]>
Authored: Thu Jan 25 19:43:22 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Fri Jan 26 11:55:36 2018 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 19 ++++++++++++++-
 .../schema/CarbonAlterTableRenameCommand.scala  |  2 ++
 .../spark/sql/hive/CarbonFileMetastore.scala    |  2 ++
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  4 +++-
 .../src/main/spark2.2/CarbonSessionState.scala  | 25 +++++++-------------
 .../vectorreader/AddColumnTestCases.scala       | 12 ++++++++++
 6 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 585fe67..870b1f3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -23,10 +23,11 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
-import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.{HiveSessionCatalog, _}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -111,6 +112,8 @@ object CarbonEnv {
 
   val carbonEnvMap = new ConcurrentHashMap[SparkSession, CarbonEnv]
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
     if (sparkSession.isInstanceOf[CarbonSession]) {
       
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv
@@ -154,6 +157,7 @@ object CarbonEnv {
       databaseNameOp: Option[String],
       tableName: String)
     (sparkSession: SparkSession): CarbonTable = {
+    refreshRelationFromCache(TableIdentifier(tableName, 
databaseNameOp))(sparkSession)
     val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
     val catalog = getInstance(sparkSession).carbonMetastore
     // refresh cache
@@ -168,6 +172,19 @@ object CarbonEnv {
           .carbonTable)
   }
 
+  def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: 
SparkSession): Boolean = {
+    var isRefreshed = false
+    val carbonEnv = getInstance(sparkSession)
+    if 
(carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) {
+      sparkSession.sessionState.catalog.refreshTable(identifier)
+      DataMapStoreManager.getInstance().
+        
clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,
+          identifier.database.getOrElse("default"), identifier.table))
+      isRefreshed = true
+    }
+    isRefreshed
+  }
+
   /**
    * Return carbon table instance by looking up table in `sparkSession`
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 64d1d6f..dd34f08 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -122,6 +122,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = 
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
         .getClient()
+      sparkSession.catalog.refreshTable(TableIdentifier(oldTableName,
+        Some(oldDatabaseName)).quotedString)
       hiveClient.runSqlHive(
           s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO 
$oldDatabaseName.$newTableName")
       hiveClient.runSqlHive(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 0c52100..b44dc7e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -221,6 +221,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       val schemaConverter = new ThriftWrapperSchemaConverterImpl
       val wrapperTableInfo =
         schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, 
tableName, tablePath)
+      CarbonMetadata.getInstance().removeTable(tableUniqueName)
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
       val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
       metadata.carbonTables += carbonTable
@@ -258,6 +259,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       oldTableIdentifier.getTableId)
     val path = createSchemaThriftFile(newAbsoluteTableIdentifier, 
thriftTableInfo)
     addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
+
     path
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 54f58fc..759471b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -51,13 +51,15 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
     val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
-    if (info != null) {
+    val carbonRelation = if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
         CarbonSparkUtil.createSparkMeta(table), table)
     } else {
       super.createCarbonRelation(parameters, absIdentifier, sparkSession)
     }
+    carbonRelation.refresh()
+    carbonRelation
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala 
b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 66a20ea..3c151f0 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -95,18 +95,6 @@ class CarbonSessionCatalog(
   CarbonEnv.initListeners()
 
 
-  private def refreshRelationFromCache(identifier: TableIdentifier): Boolean = 
{
-    var isRefreshed = false
-    if 
(carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) {
-      refreshTable(identifier)
-      DataMapStoreManager.getInstance().
-        
clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,
-          identifier.database.getOrElse("default"), identifier.table))
-      logInfo(s"Schema changes have been detected for table: $identifier")
-      isRefreshed = true
-    }
-    isRefreshed
-  }
 
 
   override def lookupRelation(name: TableIdentifier): LogicalPlan = {
@@ -115,17 +103,20 @@ class CarbonSessionCatalog(
     rtnRelation match {
       case SubqueryAlias(_,
       LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
-        toRefreshRelation = refreshRelationFromCache(name)
+        toRefreshRelation = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
       case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = refreshRelationFromCache(name)
+        toRefreshRelation = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
       case SubqueryAlias(_, relation) if
       
relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation")
 ||
       
relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation")
 ||
       relation.getClass.getName.equals(
         "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
-        val catalogTable = 
CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta",
-          relation).asInstanceOf[CatalogTable]
-        toRefreshRelation = refreshRelationFromCache(catalogTable.identifier)
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation).asInstanceOf[CatalogTable]
+        toRefreshRelation =
+          
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
       case _ =>
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 9a87653..d36dd26 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -669,6 +669,18 @@ class AddColumnTestCases extends Spark2QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists renameTextFileTable")
   }
 
+  test("test rename [create table, rename, create same table with different 
schema]"){
+    sql("drop table if exists t5")
+    sql("drop table if exists t6")
+
+    sql("create table t5 (c1 string, c2 int) stored by 'carbondata'")
+    sql("insert into t5 select 'asd',1")
+    sql("alter table t5 rename to t6")
+    sql("create table t5 (c1 string, c2 int,c3 string) stored by 'carbondata'")
+    sql("insert into t5 select 'asd',1,'sdf'")
+    checkAnswer(sql("select * from t5"),Seq(Row("asd",1,"sdf")))
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS addcolumntest")
     sql("DROP TABLE IF EXISTS hivetable")

Reply via email to