This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a4700af  [CARBONDATA-3602]Fix MV issues with session level operations
a4700af is described below

commit a4700afe40c1b10d018bc20eab5805def115337c
Author: akashrn5 <[email protected]>
AuthorDate: Wed Dec 4 13:46:40 2019 +0530

    [CARBONDATA-3602]Fix MV issues with session level operations
    
    Problems:
    1. when the same timeseries query is being executed from different
    session, one session query is not being hit to datamap table.
    When the query executed from different session, the semantic equals
    of ScalaUDF expression fails, as object reference is different,
    when the object equals is checked for that.
    
    2. When same datamap on same table, but in some other database is
    executed, fails with table not found error. This is because, when
    first time the datamap catalog map is initialized, it contains the
    session info, which wont be cleared when session get refreshed.
    
    Solution:
    1. make the TimeSeriesFunction as case class, as during run time
    scala generates the equals method, which will check the other
    equality when the object reference are not equal.
    
    2. when the session info gets updated, clear the entry in map
    and put again with new session info.
    
    This closes #3497
---
 .../core/datamap/DataMapStoreManager.java          |  8 +++++++
 .../carbondata/mv/datamap/MVAnalyzerRule.scala     | 10 ++++++++-
 .../mv/rewrite/SummaryDatasetCatalog.scala         | 26 +++++++++++++++++++++-
 .../command/timeseries/TimeSeriesFunction.scala    |  2 +-
 4 files changed, 43 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index c01a1ad..50c7d6b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -277,6 +277,14 @@ public final class DataMapStoreManager {
   }
 
   /**
+   * This method removes the datamapCatalog for the corresponding provider if 
the session gets
+   * refreshed or updated
+   */
+  public void clearDataMapCatalog() {
+    dataMapCatalogs = null;
+  }
+
+  /**
    * Initialize by reading all datamaps from store and re register it
    * @param dataMapProvider
    */
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index da38bae..be292be 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -83,8 +83,16 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
         }
         Aggregate(grp, aExp, child)
     }
-    val catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+    var catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
       DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+    // when first time DataMapCatalogs are initialized, it stores session info 
also, but when carbon
+    // session is newly created, catalog map will not be cleared, so if 
session info is different,
+    // remove the entry from map.
+    if (catalog != null && 
!catalog.mvSession.sparkSession.equals(sparkSession)) {
+      DataMapStoreManager.getInstance().clearDataMapCatalog()
+      catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+        
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+    }
     if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
       val modularPlan = 
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
       if (modularPlan.find(_.rewritten).isDefined) {
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 645b304..19e42a6 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.mv.rewrite
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{CarbonSession, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -108,7 +108,22 @@ private[mv] class SummaryDatasetCatalog(sparkSession: 
SparkSession)
   private[mv] def registerSchema(dataMapSchema: DataMapSchema): Unit = {
     writeLock {
       val updatedQuery = parser.addMVSkipFunction(dataMapSchema.getCtasQuery)
+      val currentDatabase = sparkSession match {
+        case carbonSession: CarbonSession =>
+          carbonSession.sessionState.catalog.getCurrentDatabase
+        case _ =>
+          sparkSession.catalog.currentDatabase
+      }
+      // This is required because datamap schemas are across databases, so 
while loading the
+      // catalog, if the datamap is in database other than 
sparkSession.currentDataBase(), then it
+      // fails to register, so set the database present in the dataMapSchema 
Object
+      setCurrentDataBase(dataMapSchema.getRelationIdentifier.getDatabaseName)
       val query = sparkSession.sql(updatedQuery)
+      // here setting back to current database of current session, because if 
the actual query
+      // contains db name in query like, select db1.column1 from table and 
current database is
+      // default and if we drop the db1, still the session has current db as 
db1.
+      // So setting back to current database.
+      setCurrentDataBase(currentDatabase)
       val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed)
       val modularPlan =
         mvSession.sessionState.modularizer.modularize(
@@ -143,6 +158,15 @@ private[mv] class SummaryDatasetCatalog(sparkSession: 
SparkSession)
     }
   }
 
+  private def setCurrentDataBase(dataBaseName: String): Unit = {
+    sparkSession match {
+      case carbonSession: CarbonSession =>
+        carbonSession.sessionState.catalog.setCurrentDatabase(dataBaseName)
+      case _ =>
+        sparkSession.catalog.setCurrentDatabase(dataBaseName)
+    }
+  }
+
   /** Removes the given [[DataFrame]] from the catalog */
   private[mv] def unregisterSchema(dataMapName: String): Unit = {
     writeLock {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
index 3ec02c9..ddcc787 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.preagg.TimeSeriesUDF
  * Time series udf class
  */
 
-class TimeSeriesFunction extends ((Timestamp, String) => Timestamp) with 
Serializable{
+case class TimeSeriesFunction() extends ((Timestamp, String) => Timestamp) 
with Serializable{
 
   override def apply(v1: Timestamp, v2: String): Timestamp = {
     TimeSeriesUDF.INSTANCE.applyUDF(v1, v2)

Reply via email to