This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ac647c3  [CARBONDATA-3725]fix concurrent creation of Materialized 
Views issue
ac647c3 is described below

commit ac647c3c2becbc0f5a453f4e1b8687da33ce6417
Author: akashrn5 <akashnilu...@gmail.com>
AuthorDate: Fri Feb 28 12:30:39 2020 +0530

    [CARBONDATA-3725]fix concurrent creation of Materialized Views issue
    
    Why is this PR needed?
    When two create MVs are running concurrently in two different sessions with 
two different databases, then in any create fails in any session, other 
sessions's current database is changed and corresponding queries fails with 
table not found exception.
    
    What changes were proposed in this PR?
    When the session is changed we need to clear the datamap catalog map which 
may lead to this problem. Initially it was handled only for query and other 
flows, it wasnt handled in create MV flow. Handled same, make proper 
synchronisations to avoid these problems.
    
    Does this PR introduce any user interface change?
    Yes. (added a isSchemaRegistered in DatamapCatalog Interface, which is 
required to avoid multiple registration of same datamap schemas)
    
    Is any new testcase added?
    No, existing test cases will take care
    
    This closes #3642
---
 .../carbondata/core/datamap/DataMapCatalog.java    |  5 ++
 .../core/datamap/DataMapStoreManager.java          | 57 ++++++++++++----------
 .../carbondata/mv/extension/MVAnalyzerRule.scala   |  5 +-
 .../mv/extension/MVDataMapProvider.scala           | 10 +++-
 .../apache/carbondata/mv/extension/MVHelper.scala  |  2 +-
 .../mv/rewrite/SummaryDatasetCatalog.scala         | 30 +++++++++---
 6 files changed, 71 insertions(+), 38 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
index 5dd4871..d4eb0c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
@@ -48,4 +48,9 @@ public interface DataMapCatalog<T> {
    */
   void refresh();
 
+  /**
+   * This checks whether the datamapSchema is already registered
+   */
+  Boolean isMVExists(String mvName);
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index dee1fe4..2ddcf2b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -228,22 +228,28 @@ public final class DataMapStoreManager {
    * @param dataMapProvider
    * @param dataMapSchema
    */
-  public void registerDataMapCatalog(DataMapProvider dataMapProvider,
-      DataMapSchema dataMapSchema) throws IOException {
-    synchronized (lockObject) {
-      initializeDataMapCatalogs(dataMapProvider);
-      String name = dataMapSchema.getProviderName().toLowerCase();
-      DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
-      if (dataMapCatalog == null) {
-        dataMapCatalog = dataMapProvider.createDataMapCatalog();
-        // If MVDataMapProvider, then createDataMapCatalog will return 
summaryDatasetCatalog
-        // instance, which needs to be added to dataMapCatalogs.
-        // For other datamaps, createDataMapCatalog will return null, so no 
need to register
-        if (dataMapCatalog != null) {
-          dataMapCatalogs.put(name, dataMapCatalog);
-          dataMapCatalog.registerSchema(dataMapSchema);
-        }
-      } else {
+  public synchronized void registerDataMapCatalog(DataMapProvider 
dataMapProvider,
+      DataMapSchema dataMapSchema, boolean clearCatalogs) throws IOException {
+    // this check is added to check if when registering the datamapCatalog, if 
the catalog map has
+    // datasets with old session, then need to clear and reload the map, else 
error can be thrown
+    // if the databases are different in both the sessions
+    if (clearCatalogs) {
+      dataMapCatalogs = null;
+    }
+    initializeDataMapCatalogs(dataMapProvider);
+    String name = dataMapSchema.getProviderName().toLowerCase();
+    DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
+    if (dataMapCatalog == null) {
+      dataMapCatalog = dataMapProvider.createDataMapCatalog();
+      // If MVDataMapProvider, then createDataMapCatalog will return 
summaryDatasetCatalog
+      // instance, which needs to be added to dataMapCatalogs.
+      // For other datamaps, createDataMapCatalog will return null, so no need 
to register
+      if (dataMapCatalog != null) {
+        dataMapCatalogs.put(name, dataMapCatalog);
+        dataMapCatalog.registerSchema(dataMapSchema);
+      }
+    } else {
+      if (!dataMapCatalog.isMVExists(dataMapSchema.getDataMapName())) {
         dataMapCatalog.registerSchema(dataMapSchema);
       }
     }
@@ -271,24 +277,23 @@ public final class DataMapStoreManager {
    */
   public synchronized DataMapCatalog getDataMapCatalog(
       DataMapProvider dataMapProvider,
-      String providerName) throws IOException {
+      String providerName,
+      boolean clearCatalogs) throws IOException {
+    // This method removes the datamapCatalog for the corresponding provider 
if the session gets
+    // refreshed or updated
+    if (clearCatalogs) {
+      dataMapCatalogs = null;
+    }
     initializeDataMapCatalogs(dataMapProvider);
     return dataMapCatalogs.get(providerName.toLowerCase());
   }
 
   /**
-   * This method removes the datamapCatalog for the corresponding provider if 
the session gets
-   * refreshed or updated
-   */
-  public void clearDataMapCatalog() {
-    dataMapCatalogs = null;
-  }
-
-  /**
    * Initialize by reading all datamaps from store and re register it
    * @param dataMapProvider
    */
-  private void initializeDataMapCatalogs(DataMapProvider dataMapProvider) 
throws IOException {
+  private synchronized void initializeDataMapCatalogs(DataMapProvider 
dataMapProvider)
+      throws IOException {
     if (dataMapCatalogs == null) {
       dataMapCatalogs = new ConcurrentHashMap<>();
       List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
diff --git 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
index c95ce4c..27f097d 100644
--- 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
+++ 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
@@ -85,14 +85,13 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
     }
     if (needAnalysis) {
       var catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
-        
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+        DataMapClassProvider.MV.getShortName, 
false).asInstanceOf[SummaryDatasetCatalog]
       // when first time DataMapCatalogs are initialized, it stores session 
info also,
       // but when carbon session is newly created, catalog map will not be 
cleared,
       // so if session info is different, remove the entry from map.
       if (catalog != null && 
!catalog.mvSession.sparkSession.equals(sparkSession)) {
-        DataMapStoreManager.getInstance().clearDataMapCatalog()
         catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
-          
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+          DataMapClassProvider.MV.getShortName, 
true).asInstanceOf[SummaryDatasetCatalog]
       }
       if (catalog != null && isValidPlan(plan, catalog)) {
         val modularPlan = 
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
diff --git 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
index 5d5da68..1700ce9 100644
--- 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
+++ 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.{DataMapCatalog, 
DataMapProvider, Data
 import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.indexstore.Blocklet
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty
+import 
org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, 
DataMapProperty}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -60,7 +60,13 @@ class MVDataMapProvider(
       ctasSqlStatement,
       true)
     try {
-      DataMapStoreManager.getInstance.registerDataMapCatalog(this, 
dataMapSchema)
+      val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(this,
+        DataMapClassProvider.MV.getShortName, 
false).asInstanceOf[SummaryDatasetCatalog]
+      if (catalog != null && 
!catalog.mvSession.sparkSession.equals(sparkSession)) {
+        DataMapStoreManager.getInstance.registerDataMapCatalog(this, 
dataMapSchema, true)
+      } else {
+        DataMapStoreManager.getInstance.registerDataMapCatalog(this, 
dataMapSchema, false)
+      }
       if (dataMapSchema.isLazy) {
         DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
       }
diff --git 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
index 57518ea..7648efe 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
@@ -304,7 +304,7 @@ object MVHelper {
     val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
       new DataMapSchema("", DataMapClassProvider.MV.getShortName), 
sparkSession)
     var catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
-      DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+      DataMapClassProvider.MV.getShortName, 
false).asInstanceOf[SummaryDatasetCatalog]
     if (catalog == null) {
       catalog = new SummaryDatasetCatalog(sparkSession)
     }
diff --git 
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
 
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 56d0324..63dc1eb 100644
--- 
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ 
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.FindDataSourceTable
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.DataMapCatalog
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
@@ -53,6 +54,8 @@ case class MVPlanWrapper(plan: ModularPlan, dataMapSchema: 
DataMapSchema) extend
 private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
   extends DataMapCatalog[SummaryDataset] {
 
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   @transient
   private val summaryDatasets = new 
scala.collection.mutable.ArrayBuffer[SummaryDataset]
 
@@ -107,12 +110,19 @@ private[mv] class SummaryDatasetCatalog(sparkSession: 
SparkSession)
       // catalog, if the datamap is in database other than 
sparkSession.currentDataBase(), then it
       // fails to register, so set the database present in the dataMapSchema 
Object
       setCurrentDataBase(dataMapSchema.getRelationIdentifier.getDatabaseName)
-      val mvPlan = MVParser.getMVPlan(dataMapSchema.getCtasQuery, sparkSession)
-      // here setting back to current database of current session, because if 
the actual query
-      // contains db name in query like, select db1.column1 from table and 
current database is
-      // default and if we drop the db1, still the session has current db as 
db1.
-      // So setting back to current database.
-      setCurrentDataBase(currentDatabase)
+      val mvPlan = try {
+        MVParser.getMVPlan(dataMapSchema.getCtasQuery, sparkSession)
+      } catch {
+        case ex: Exception =>
+          LOGGER.error("Error executing the updated query during register MV 
schema", ex)
+          throw ex
+      } finally {
+        // here setting back to current database of current session, because 
if the actual query
+        // contains db name in query like, select db1.column1 from table and 
current database is
+        // default and if we drop the db1, still the session has current db as 
db1.
+        // So setting back to current database.
+        setCurrentDataBase(currentDatabase)
+      }
       val planToRegister = MVHelper.dropDummyFunc(mvPlan)
       val modularPlan =
         mvSession.sessionState.modularizer.modularize(
@@ -166,6 +176,14 @@ private[mv] class SummaryDatasetCatalog(sparkSession: 
SparkSession)
     }
   }
 
+  /**
+   * Checks if the schema is already registered
+   */
+  private[mv] def isMVExists(mvName: String): java.lang.Boolean = {
+    val dataIndex = summaryDatasets
+      .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(mvName))
+    dataIndex > 0
+  }
 
   override def listAllValidSchema(): Array[SummaryDataset] = {
     val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails

Reply via email to