This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ac647c3 [CARBONDATA-3725]fix concurrent creation of Materialized
Views issue
ac647c3 is described below
commit ac647c3c2becbc0f5a453f4e1b8687da33ce6417
Author: akashrn5 <[email protected]>
AuthorDate: Fri Feb 28 12:30:39 2020 +0530
[CARBONDATA-3725]fix concurrent creation of Materialized Views issue
Why is this PR needed?
When two create MVs are running concurrently in two different sessions with
two different databases, then in any create fails in any session, other
sessions's current database is changed and corresponding queries fails with
table not found exception.
What changes were proposed in this PR?
When the session is changed we need to clear the datamap catalog map which
may lead to this problem. Initially it was handled only for query and other
flows, it wasnt handled in create MV flow. Handled same, make proper
synchronisations to avoid these problems.
Does this PR introduce any user interface change?
Yes. (added a isSchemaRegistered in DatamapCatalog Interface, which is
required to avoid multiple registration of same datamap schemas)
Is any new testcase added?
No, existing test cases will take care
This closes #3642
---
.../carbondata/core/datamap/DataMapCatalog.java | 5 ++
.../core/datamap/DataMapStoreManager.java | 57 ++++++++++++----------
.../carbondata/mv/extension/MVAnalyzerRule.scala | 5 +-
.../mv/extension/MVDataMapProvider.scala | 10 +++-
.../apache/carbondata/mv/extension/MVHelper.scala | 2 +-
.../mv/rewrite/SummaryDatasetCatalog.scala | 30 +++++++++---
6 files changed, 71 insertions(+), 38 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
index 5dd4871..d4eb0c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
@@ -48,4 +48,9 @@ public interface DataMapCatalog<T> {
*/
void refresh();
+ /**
+ * This checks whether the datamapSchema is already registered
+ */
+ Boolean isMVExists(String mvName);
+
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index dee1fe4..2ddcf2b 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -228,22 +228,28 @@ public final class DataMapStoreManager {
* @param dataMapProvider
* @param dataMapSchema
*/
- public void registerDataMapCatalog(DataMapProvider dataMapProvider,
- DataMapSchema dataMapSchema) throws IOException {
- synchronized (lockObject) {
- initializeDataMapCatalogs(dataMapProvider);
- String name = dataMapSchema.getProviderName().toLowerCase();
- DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
- if (dataMapCatalog == null) {
- dataMapCatalog = dataMapProvider.createDataMapCatalog();
- // If MVDataMapProvider, then createDataMapCatalog will return
summaryDatasetCatalog
- // instance, which needs to be added to dataMapCatalogs.
- // For other datamaps, createDataMapCatalog will return null, so no
need to register
- if (dataMapCatalog != null) {
- dataMapCatalogs.put(name, dataMapCatalog);
- dataMapCatalog.registerSchema(dataMapSchema);
- }
- } else {
+ public synchronized void registerDataMapCatalog(DataMapProvider
dataMapProvider,
+ DataMapSchema dataMapSchema, boolean clearCatalogs) throws IOException {
+ // this check is added to check if when registering the datamapCatalog, if
the catalog map has
+ // datasets with old session, then need to clear and reload the map, else
error can be thrown
+ // if the databases are different in both the sessions
+ if (clearCatalogs) {
+ dataMapCatalogs = null;
+ }
+ initializeDataMapCatalogs(dataMapProvider);
+ String name = dataMapSchema.getProviderName().toLowerCase();
+ DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
+ if (dataMapCatalog == null) {
+ dataMapCatalog = dataMapProvider.createDataMapCatalog();
+ // If MVDataMapProvider, then createDataMapCatalog will return
summaryDatasetCatalog
+ // instance, which needs to be added to dataMapCatalogs.
+ // For other datamaps, createDataMapCatalog will return null, so no need
to register
+ if (dataMapCatalog != null) {
+ dataMapCatalogs.put(name, dataMapCatalog);
+ dataMapCatalog.registerSchema(dataMapSchema);
+ }
+ } else {
+ if (!dataMapCatalog.isMVExists(dataMapSchema.getDataMapName())) {
dataMapCatalog.registerSchema(dataMapSchema);
}
}
@@ -271,24 +277,23 @@ public final class DataMapStoreManager {
*/
public synchronized DataMapCatalog getDataMapCatalog(
DataMapProvider dataMapProvider,
- String providerName) throws IOException {
+ String providerName,
+ boolean clearCatalogs) throws IOException {
+ // This method removes the datamapCatalog for the corresponding provider
if the session gets
+ // refreshed or updated
+ if (clearCatalogs) {
+ dataMapCatalogs = null;
+ }
initializeDataMapCatalogs(dataMapProvider);
return dataMapCatalogs.get(providerName.toLowerCase());
}
/**
- * This method removes the datamapCatalog for the corresponding provider if
the session gets
- * refreshed or updated
- */
- public void clearDataMapCatalog() {
- dataMapCatalogs = null;
- }
-
- /**
* Initialize by reading all datamaps from store and re register it
* @param dataMapProvider
*/
- private void initializeDataMapCatalogs(DataMapProvider dataMapProvider)
throws IOException {
+ private synchronized void initializeDataMapCatalogs(DataMapProvider
dataMapProvider)
+ throws IOException {
if (dataMapCatalogs == null) {
dataMapCatalogs = new ConcurrentHashMap<>();
List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
index c95ce4c..27f097d 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala
@@ -85,14 +85,13 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
}
if (needAnalysis) {
var catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
-
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+ DataMapClassProvider.MV.getShortName,
false).asInstanceOf[SummaryDatasetCatalog]
// when first time DataMapCatalogs are initialized, it stores session
info also,
// but when carbon session is newly created, catalog map will not be
cleared,
// so if session info is different, remove the entry from map.
if (catalog != null &&
!catalog.mvSession.sparkSession.equals(sparkSession)) {
- DataMapStoreManager.getInstance().clearDataMapCatalog()
catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
-
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+ DataMapClassProvider.MV.getShortName,
true).asInstanceOf[SummaryDatasetCatalog]
}
if (catalog != null && isValidPlan(plan, catalog)) {
val modularPlan =
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
index 5d5da68..1700ce9 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.{DataMapCatalog,
DataMapProvider, Data
import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.indexstore.Blocklet
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty
+import
org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider,
DataMapProperty}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
DataMapSchema}
import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -60,7 +60,13 @@ class MVDataMapProvider(
ctasSqlStatement,
true)
try {
- DataMapStoreManager.getInstance.registerDataMapCatalog(this,
dataMapSchema)
+ val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(this,
+ DataMapClassProvider.MV.getShortName,
false).asInstanceOf[SummaryDatasetCatalog]
+ if (catalog != null &&
!catalog.mvSession.sparkSession.equals(sparkSession)) {
+ DataMapStoreManager.getInstance.registerDataMapCatalog(this,
dataMapSchema, true)
+ } else {
+ DataMapStoreManager.getInstance.registerDataMapCatalog(this,
dataMapSchema, false)
+ }
if (dataMapSchema.isLazy) {
DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
index 57518ea..7648efe 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
@@ -304,7 +304,7 @@ object MVHelper {
val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
new DataMapSchema("", DataMapClassProvider.MV.getShortName),
sparkSession)
var catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
- DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+ DataMapClassProvider.MV.getShortName,
false).asInstanceOf[SummaryDatasetCatalog]
if (catalog == null) {
catalog = new SummaryDatasetCatalog(sparkSession)
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 56d0324..63dc1eb 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.DataMapCatalog
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
@@ -53,6 +54,8 @@ case class MVPlanWrapper(plan: ModularPlan, dataMapSchema:
DataMapSchema) extend
private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
extends DataMapCatalog[SummaryDataset] {
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
@transient
private val summaryDatasets = new
scala.collection.mutable.ArrayBuffer[SummaryDataset]
@@ -107,12 +110,19 @@ private[mv] class SummaryDatasetCatalog(sparkSession:
SparkSession)
// catalog, if the datamap is in database other than
sparkSession.currentDataBase(), then it
// fails to register, so set the database present in the dataMapSchema
Object
setCurrentDataBase(dataMapSchema.getRelationIdentifier.getDatabaseName)
- val mvPlan = MVParser.getMVPlan(dataMapSchema.getCtasQuery, sparkSession)
- // here setting back to current database of current session, because if
the actual query
- // contains db name in query like, select db1.column1 from table and
current database is
- // default and if we drop the db1, still the session has current db as
db1.
- // So setting back to current database.
- setCurrentDataBase(currentDatabase)
+ val mvPlan = try {
+ MVParser.getMVPlan(dataMapSchema.getCtasQuery, sparkSession)
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Error executing the updated query during register MV
schema", ex)
+ throw ex
+ } finally {
+ // here setting back to current database of current session, because
if the actual query
+ // contains db name in query like, select db1.column1 from table and
current database is
+ // default and if we drop the db1, still the session has current db as
db1.
+ // So setting back to current database.
+ setCurrentDataBase(currentDatabase)
+ }
val planToRegister = MVHelper.dropDummyFunc(mvPlan)
val modularPlan =
mvSession.sessionState.modularizer.modularize(
@@ -166,6 +176,14 @@ private[mv] class SummaryDatasetCatalog(sparkSession:
SparkSession)
}
}
+ /**
+ * Checks if the schema is already registered
+ */
+ private[mv] def isMVExists(mvName: String): java.lang.Boolean = {
+ val dataIndex = summaryDatasets
+ .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(mvName))
+ dataIndex > 0
+ }
override def listAllValidSchema(): Array[SummaryDataset] = {
val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails