This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4999183 [CARBONDATA-3818] Missed code to check "MV with same query
already exists" during MV Refactory
4999183 is described below
commit 4999183c20499afc35b220db44a89185754dc712
Author: Indhumathi27 <[email protected]>
AuthorDate: Mon May 11 18:04:52 2020 +0530
[CARBONDATA-3818] Missed code to check "MV with same query already exists"
during MV Refactory
Why is this PR needed?
Missed code to check "MV with same query already exists" during MV
Refactory.
What changes were proposed in this PR?
Add code to which if "MV with same query already exists" and add testcase
This closes #3761
---
.../apache/carbondata/view/MVCatalogInSpark.scala | 7 ++++++
.../org/apache/carbondata/view/MVHelper.scala | 4 +--
.../command/view/CarbonCreateMVCommand.scala | 29 ++++++++++++++--------
.../timeseries/TestCreateMVWithTimeSeries.scala | 12 ++++++++-
4 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index 85b4909..b75a88b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -162,6 +162,13 @@ case class MVCatalogInSpark(session: SparkSession)
}
}
+ /**
+ * Check and return mv having same query already present in the catalog
+ */
+ def getMVWithSameQueryPresent(query: LogicalPlan): Option[MVSchemaWrapper] =
{
+ lookupSchema(query)
+ }
+
/** Returns feasible registered mv schemas for processing the given
ModularPlan. */
private def lookupSchema(plan: LogicalPlan): Option[MVSchemaWrapper] = {
withReadLock {
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
index e072653..08c679e 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
@@ -289,8 +289,8 @@ object MVHelper {
if (null != relation) {
relatedFields += RelatedFieldWrapper(
relation.database,
- reference.name,
- relation.identifier.table)
+ relation.identifier.table,
+ reference.name)
}
}
findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 22c4b52..1106870 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -101,15 +101,16 @@ case class CarbonCreateMVCommand(
}
}
- val schema = doCreate(session, identifier, viewManager)
+ // get mv catalog
+ var viewCatalog = viewManager.getCatalog(catalogFactory, false)
+ .asInstanceOf[MVCatalogInSpark]
+ if (!viewCatalog.session.equals(session)) {
+ viewCatalog = viewManager.getCatalog(catalogFactory, true)
+ .asInstanceOf[MVCatalogInSpark]
+ }
+ val schema = doCreate(session, identifier, viewManager, viewCatalog)
try {
- var viewCatalog = viewManager.getCatalog(catalogFactory, false)
- .asInstanceOf[MVCatalogInSpark]
- if (!viewCatalog.session.equals(session)) {
- viewCatalog = viewManager.getCatalog(catalogFactory, true)
- .asInstanceOf[MVCatalogInSpark]
- }
viewCatalog.registerSchema(schema)
if (schema.isRefreshOnManual) {
viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
@@ -156,10 +157,18 @@ case class CarbonCreateMVCommand(
private def doCreate(session: SparkSession,
tableIdentifier: TableIdentifier,
- viewManager: MVManagerInSpark): MVSchema = {
+ viewManager: MVManagerInSpark,
+ viewCatalog: MVCatalogInSpark): MVSchema = {
val logicalPlan = MVHelper.dropDummyFunction(
MVQueryParser.getQueryPlan(queryString, session))
- val modularPlan = checkQuery(session, logicalPlan)
+ // check if mv with same query already exists
+ val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
+ if (mvSchemaWrapper.nonEmpty) {
+ val mvWithSameQuery =
mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
+ throw new MalformedMVCommandException(
+ s"MV with the name `$mvWithSameQuery` has been already created with
the same query")
+ }
+ val modularPlan = checkQuery(logicalPlan)
val viewSchema = getOutputSchema(logicalPlan)
val relatedTables = getRelatedTables(logicalPlan)
val relatedTableList = toCarbonTables(session, relatedTables)
@@ -462,7 +471,7 @@ case class CarbonCreateMVCommand(
generatePartitionerField(relatedTablePartitionColumns.toList, Seq.empty)
}
- private def checkQuery(sparkSession: SparkSession, logicalPlan:
LogicalPlan): ModularPlan = {
+ private def checkQuery(logicalPlan: LogicalPlan): ModularPlan = {
// if there is limit in query string, throw exception, as its not a valid
usecase
logicalPlan match {
case Limit(_, _) =>
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
b/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
index 57b8ce0..0218adb 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.view.timeseries
import java.util.concurrent.{Callable, Executors, TimeUnit}
-import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException,
MalformedMVCommandException}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.view.rewrite.TestUtil
@@ -207,6 +207,16 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
sql("drop materialized view if exists mv1")
}
+ test("check if mv with same query exists") {
+ sql("drop materialized view if exists mv1")
+ sql("create materialized view mv1 as select
timeseries(projectjoindate,'Second'), sum(projectcode) from maintable group by
timeseries(projectjoindate,'Second')")
+ sql("drop materialized view if exists mv2")
+ intercept[MalformedMVCommandException] {
+ sql("create materialized view mv2 as select
timeseries(projectjoindate,'Second'), sum(projectcode) from maintable group by
timeseries(projectjoindate,'Second')")
+ }.getMessage.contains("MV with the name `mv1` has been already created
with the same query")
+ sql("drop materialized view if exists mv1")
+ }
+
class QueryTask(query: String) extends Callable[String] {
override def call(): String = {
var result = "PASS"