Repository: carbondata
Updated Branches:
  refs/heads/master 956833e55 -> 311a5b7e3


[CARBONDATA-1484] Fixed driver cache issue

Driver cache is not cleared when the table is dropped in one driver and queried 
in another driver.
This PR checks the modified time and refreshes the cache.

This closes #1415


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/311a5b7e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/311a5b7e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/311a5b7e

Branch: refs/heads/master
Commit: 311a5b7e31e98777cc32f27ecdcde86859eadaa5
Parents: 956833e
Author: ravipesala <[email protected]>
Authored: Fri Oct 13 22:08:49 2017 +0530
Committer: Jacky Li <[email protected]>
Committed: Mon Oct 16 16:51:01 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/CarbonFileMetastore.scala     | 10 ++++++----
 .../spark/sql/hive/CarbonSessionState.scala      | 19 +++++++++++++------
 2 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/311a5b7e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 75ad4ae..16724fc 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -442,7 +442,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param timeStamp
    */
   private def updateSchemasUpdatedTime(timeStamp: Long) {
-    tableModifiedTimeStore.put("default", timeStamp)
+    tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, 
timeStamp)
   }
 
   def updateAndTouchSchemasUpdatedTime(basePath: String) {
@@ -461,10 +461,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
       LOGGER.audit(s"Creating timestamp file for $basePath")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
-    val systemTime = System.currentTimeMillis()
     FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(systemTime)
-    systemTime
+      .setLastModifiedTime(System.currentTimeMillis())
+    // since there is no guarantee that exact same set modified time returns 
when called
+    // lastmodified time, so better get the time from file.
+    FileFactory.getCarbonFile(timestampFile, timestampFileType)
+      .getLastModifiedTime
   }
 
   def checkSchemasModifiedTimeAndReloadTables(storePath: String) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311a5b7e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 478b178..6892dad 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -32,6 +32,9 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
 /**
  * This class will have carbon catalog and refresh the relation from cache if 
the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table
@@ -96,22 +99,26 @@ class CarbonSessionCatalog(
     }
   }
 
-  private def refreshRelationFromCache(name: TableIdentifier,
+  private def refreshRelationFromCache(identifier: TableIdentifier,
       alias: Option[String],
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean 
= {
     var isRefreshed = false
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
     carbonEnv.carbonMetastore.
-      
checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(sparkSession).storePath)
+      checkSchemasModifiedTimeAndReloadTables(storePath)
 
     val tableMeta = carbonEnv.carbonMetastore
       
.getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
         carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
-    if (tableMeta.isDefined &&
+    if (tableMeta.isEmpty || (tableMeta.isDefined &&
         tableMeta.get.carbonTable.getTableLastUpdatedTime !=
-          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) {
-      refreshTable(name)
+          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) 
{
+      refreshTable(identifier)
+      DataMapStoreManager.getInstance().
+        clearDataMap(AbsoluteTableIdentifier.from(storePath,
+          identifier.database.getOrElse("default"), identifier.table))
       isRefreshed = true
-      logInfo(s"Schema changes have been detected for table: $name")
+      logInfo(s"Schema changes have been detected for table: $identifier")
     }
     isRefreshed
   }

Reply via email to