This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new ac647c3 [CARBONDATA-3725]fix concurrent creation of Materialized Views issue ac647c3 is described below commit ac647c3c2becbc0f5a453f4e1b8687da33ce6417 Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Fri Feb 28 12:30:39 2020 +0530 [CARBONDATA-3725]fix concurrent creation of Materialized Views issue Why is this PR needed? When two create MVs are running concurrently in two different sessions with two different databases, then in any create fails in any session, other sessions's current database is changed and corresponding queries fails with table not found exception. What changes were proposed in this PR? When the session is changed we need to clear the datamap catalog map which may lead to this problem. Initially it was handled only for query and other flows, it wasnt handled in create MV flow. Handled same, make proper synchronisations to avoid these problems. Does this PR introduce any user interface change? Yes. (added a isSchemaRegistered in DatamapCatalog Interface, which is required to avoid multiple registration of same datamap schemas) Is any new testcase added? No, existing test cases will take care This closes #3642 --- .../carbondata/core/datamap/DataMapCatalog.java | 5 ++ .../core/datamap/DataMapStoreManager.java | 57 ++++++++++++---------- .../carbondata/mv/extension/MVAnalyzerRule.scala | 5 +- .../mv/extension/MVDataMapProvider.scala | 10 +++- .../apache/carbondata/mv/extension/MVHelper.scala | 2 +- .../mv/rewrite/SummaryDatasetCatalog.scala | 30 +++++++++--- 6 files changed, 71 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java index 5dd4871..d4eb0c3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java @@ -48,4 +48,9 @@ public interface DataMapCatalog<T> { */ void refresh(); + /** + * This checks whether the datamapSchema is already registered + */ + Boolean isMVExists(String mvName); + } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index dee1fe4..2ddcf2b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -228,22 +228,28 @@ public final class DataMapStoreManager { * @param dataMapProvider * @param dataMapSchema */ - public void registerDataMapCatalog(DataMapProvider dataMapProvider, - DataMapSchema dataMapSchema) throws IOException { - synchronized (lockObject) { - initializeDataMapCatalogs(dataMapProvider); - String name = dataMapSchema.getProviderName().toLowerCase(); - DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name); - if (dataMapCatalog == null) { - dataMapCatalog = dataMapProvider.createDataMapCatalog(); - // If MVDataMapProvider, then createDataMapCatalog will return summaryDatasetCatalog - // instance, which needs to be added to dataMapCatalogs. - // For other datamaps, createDataMapCatalog will return null, so no need to register - if (dataMapCatalog != null) { - dataMapCatalogs.put(name, dataMapCatalog); - dataMapCatalog.registerSchema(dataMapSchema); - } - } else { + public synchronized void registerDataMapCatalog(DataMapProvider dataMapProvider, + DataMapSchema dataMapSchema, boolean clearCatalogs) throws IOException { + // this check is added to check if when registering the datamapCatalog, if the catalog map has + // datasets with old session, then need to clear and reload the map, else error can be thrown + // if the databases are different in both the sessions + if (clearCatalogs) { + dataMapCatalogs = null; + } + initializeDataMapCatalogs(dataMapProvider); + String name = dataMapSchema.getProviderName().toLowerCase(); + DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name); + if (dataMapCatalog == null) { + dataMapCatalog = dataMapProvider.createDataMapCatalog(); + // If MVDataMapProvider, then createDataMapCatalog will return summaryDatasetCatalog + // instance, which needs to be added to dataMapCatalogs. + // For other datamaps, createDataMapCatalog will return null, so no need to register + if (dataMapCatalog != null) { + dataMapCatalogs.put(name, dataMapCatalog); + dataMapCatalog.registerSchema(dataMapSchema); + } + } else { + if (!dataMapCatalog.isMVExists(dataMapSchema.getDataMapName())) { dataMapCatalog.registerSchema(dataMapSchema); } } @@ -271,24 +277,23 @@ public final class DataMapStoreManager { */ public synchronized DataMapCatalog getDataMapCatalog( DataMapProvider dataMapProvider, - String providerName) throws IOException { + String providerName, + boolean clearCatalogs) throws IOException { + // This method removes the datamapCatalog for the corresponding provider if the session gets + // refreshed or updated + if (clearCatalogs) { + dataMapCatalogs = null; + } initializeDataMapCatalogs(dataMapProvider); return dataMapCatalogs.get(providerName.toLowerCase()); } /** - * This method removes the datamapCatalog for the corresponding provider if the session gets - * refreshed or updated - */ - public void clearDataMapCatalog() { - dataMapCatalogs = null; - } - - /** * Initialize by reading all datamaps from store and re register it * @param dataMapProvider */ - private void initializeDataMapCatalogs(DataMapProvider dataMapProvider) throws IOException { + private synchronized void initializeDataMapCatalogs(DataMapProvider dataMapProvider) + throws IOException { if (dataMapCatalogs == null) { dataMapCatalogs = new ConcurrentHashMap<>(); List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas(); diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala index c95ce4c..27f097d 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala @@ -85,14 +85,13 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { } if (needAnalysis) { var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider, - DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog] + DataMapClassProvider.MV.getShortName, false).asInstanceOf[SummaryDatasetCatalog] // when first time DataMapCatalogs are initialized, it stores session info also, // but when carbon session is newly created, catalog map will not be cleared, // so if session info is different, remove the entry from map. if (catalog != null && !catalog.mvSession.sparkSession.equals(sparkSession)) { - DataMapStoreManager.getInstance().clearDataMapCatalog() catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider, - DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog] + DataMapClassProvider.MV.getShortName, true).asInstanceOf[SummaryDatasetCatalog] } if (catalog != null && isValidPlan(plan, catalog)) { val modularPlan = catalog.mvSession.sessionState.rewritePlan(plan).withMVTable diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala index 5d5da68..1700ce9 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala @@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, Data import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.indexstore.Blocklet -import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty +import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog} import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -60,7 +60,13 @@ class MVDataMapProvider( ctasSqlStatement, true) try { - DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema) + val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(this, + DataMapClassProvider.MV.getShortName, false).asInstanceOf[SummaryDatasetCatalog] + if (catalog != null && !catalog.mvSession.sparkSession.equals(sparkSession)) { + DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema, true) + } else { + DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema, false) + } if (dataMapSchema.isLazy) { DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) } diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala index 57518ea..7648efe 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala @@ -304,7 +304,7 @@ object MVHelper { val dataMapProvider = DataMapManager.get().getDataMapProvider(null, new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession) var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider, - DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog] + DataMapClassProvider.MV.getShortName, false).asInstanceOf[SummaryDatasetCatalog] if (catalog == null) { catalog = new SummaryDatasetCatalog(sparkSession) } diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala index 56d0324..63dc1eb 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.FindDataSourceTable +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datamap.DataMapCatalog import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.schema.table.DataMapSchema @@ -53,6 +54,8 @@ case class MVPlanWrapper(plan: ModularPlan, dataMapSchema: DataMapSchema) extend private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) extends DataMapCatalog[SummaryDataset] { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + @transient private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset] @@ -107,12 +110,19 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) // catalog, if the datamap is in database other than sparkSession.currentDataBase(), then it // fails to register, so set the database present in the dataMapSchema Object setCurrentDataBase(dataMapSchema.getRelationIdentifier.getDatabaseName) - val mvPlan = MVParser.getMVPlan(dataMapSchema.getCtasQuery, sparkSession) - // here setting back to current database of current session, because if the actual query - // contains db name in query like, select db1.column1 from table and current database is - // default and if we drop the db1, still the session has current db as db1. - // So setting back to current database. - setCurrentDataBase(currentDatabase) + val mvPlan = try { + MVParser.getMVPlan(dataMapSchema.getCtasQuery, sparkSession) + } catch { + case ex: Exception => + LOGGER.error("Error executing the updated query during register MV schema", ex) + throw ex + } finally { + // here setting back to current database of current session, because if the actual query + // contains db name in query like, select db1.column1 from table and current database is + // default and if we drop the db1, still the session has current db as db1. + // So setting back to current database. + setCurrentDataBase(currentDatabase) + } val planToRegister = MVHelper.dropDummyFunc(mvPlan) val modularPlan = mvSession.sessionState.modularizer.modularize( @@ -166,6 +176,14 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) } } + /** + * Checks if the schema is already registered + */ + private[mv] def isMVExists(mvName: String): java.lang.Boolean = { + val dataIndex = summaryDatasets + .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(mvName)) + dataIndex > 0 + } override def listAllValidSchema(): Array[SummaryDataset] = { val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails