This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new becccf5 [CARBONDATA-3733] Fix Incorrect query results on mv with limit
becccf5 is described below
commit becccf5b8bdf5f8f432de3a02507beaaa7cc18e5
Author: Indhumathi27 <[email protected]>
AuthorDate: Mon Mar 2 21:28:40 2020 +0530
[CARBONDATA-3733] Fix Incorrect query results on mv with limit
Why is this PR needed?
Issue 1:
After creating an materilaised view, queries with simple projection and
limit gives incorrect results.
Issue 2:
Compact IUD_DELTA on materilaised view throws NullPointerException, because
SegmentUpdateStatusManager is not set
Issue 3:
Queries with order by columns not in projection for create mv's, gives
incorrect results.
What changes were proposed in this PR?
Issue 1:
Copy subsume Flag and FlagSpec to subsumerPlan while rewriting with
summarydatasets.
Update the flagSpec as per the mv attributes and copy to relation.
Issue 2:
Set SegmentUpdateStatusManager to CarbonLoadModel using carbonTable in case
of IUD_DELTA compaction
Issue 3:
Order by columns has to be present in projection list during create mv.
This closes #3651
---
docs/index/mv-guide.md | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 46 ----------------------
.../CarbonAlterTableCompactionCommand.scala | 14 +++++--
.../apache/carbondata/mv/extension/MVHelper.scala | 21 +++++++++-
.../apache/carbondata/mv/extension/MVUtil.scala | 31 +++------------
.../carbondata/mv/rewrite/DefaultMatchMaker.scala | 6 ++-
.../carbondata/mv/rewrite/QueryRewrite.scala | 10 ++++-
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 8 ++++
.../mv/rewrite/TestAllOperationsOnMV.scala | 41 ++++++++++++++++++-
9 files changed, 98 insertions(+), 81 deletions(-)
diff --git a/docs/index/mv-guide.md b/docs/index/mv-guide.md
index 2b8810c..b071967 100644
--- a/docs/index/mv-guide.md
+++ b/docs/index/mv-guide.md
@@ -74,7 +74,7 @@ EXPLAIN SELECT a, sum(b) from maintable group by a;
GROUP BY country, sex
```
**NOTE**:
- * Group by columns has to be provided in projection list while creating mv
datamap
+ * Group by and Order by columns has to be provided in projection list while
creating mv datamap
* If only single parent table is involved in mv datamap creation, then
TableProperties of Parent table
(if not present in a aggregate function like sum(col)) listed below will be
inherited to datamap table
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5875af6..1097b95 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,12 +17,8 @@
package org.apache.spark.sql
-import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
-
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException,
NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -35,7 +31,6 @@ import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonMetadata}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -120,52 +115,11 @@ class CarbonEnv {
CarbonProperties.getInstance
.addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
initialized = true
- cleanChildTablesNotRegisteredInHive(sparkSession)
}
}
Profiler.initialize(sparkSession.sparkContext)
LOGGER.info("Initialize CarbonEnv completed...")
}
-
- private def cleanChildTablesNotRegisteredInHive(sparkSession: SparkSession):
Unit = {
- // If in case JDBC application is killed/stopped, when create datamap was
in progress, datamap
- // table was created and datampschema was saved to the system, but table
was not registered to
- // metastore. So, when we restart JDBC application, we need to clean up
- // stale tables and datamapschema's.
- val dataMapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas
- dataMapSchemas.asScala.foreach {
- dataMapSchema =>
- if (null != dataMapSchema.getRelationIdentifier &&
- !dataMapSchema.isIndexDataMap) {
- val isTableExists = try {
- sparkSession.sessionState
- .catalog
-
.tableExists(TableIdentifier(dataMapSchema.getRelationIdentifier.getTableName,
- Some(dataMapSchema.getRelationIdentifier.getDatabaseName)))
- } catch {
- // we need to take care of cleanup when the table does not exists,
if table exists and
- // some other user tries to access the table, it might fail, that
time no need to handle
- case ex: Exception =>
- LOGGER.error("Error while checking the table existence", ex)
- return
- }
- if (!isTableExists) {
- try {
-
DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
- } catch {
- case e: IOException =>
- throw e
- } finally {
- if
(FileFactory.isFileExist(dataMapSchema.getRelationIdentifier.getTablePath)) {
-
CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(dataMapSchema
- .getRelationIdentifier
- .getTablePath))
- }
- }
- }
- }
- }
- }
}
/**
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 681e8a0..192f2bc 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -42,12 +42,11 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory,
LockUsage}
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{FileFormat,
SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatusManager,
SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
-import
org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil,
CompactionType}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -220,17 +219,24 @@ case class CarbonAlterTableCompactionCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val compactionType =
CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
val compactionSize =
CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
if (alterTableModel.segmentUpdateStatusManager.isDefined) {
carbonLoadModel.setSegmentUpdateStatusManager(
alterTableModel.segmentUpdateStatusManager.get)
carbonLoadModel.setLoadMetadataDetails(
alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
+ } else {
+ // segmentUpdateStatusManager will not be defined in case of
IUD/horizontal compaction on
+ // materialized views. In that case, create new
segmentUpdateStatusManager object
+ // using carbonTable
+ val segmentUpdateStatusManager = new
SegmentUpdateStatusManager(carbonTable)
+
carbonLoadModel.setSegmentUpdateStatusManager(segmentUpdateStatusManager)
+ carbonLoadModel.setLoadMetadataDetails(
+ segmentUpdateStatusManager.getLoadMetadataDetails.toList.asJava)
}
}
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
if (null == carbonLoadModel.getLoadMetadataDetails) {
carbonLoadModel.readAndSetLoadMetadataDetails()
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
index 7648efe..4e7835b 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Cast, Coalesce, Expression, Literal, NamedExpression,
ScalaUDF}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Average}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit,
LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.command.{Field, PartitionerField,
TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand,
CarbonDropTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -85,6 +85,25 @@ object MVHelper {
"with limit")
case _ =>
}
+
+ // Order by columns needs to be present in projection list for creating
mv. This is because,
+ // we have to perform order by on all segments during query, which
requires the order by column
+ // data
+ queryPlan.transform {
+ case sort@Sort(order, _, _) =>
+ order.map { orderByCol =>
+ orderByCol.child match {
+ case attr: AttributeReference =>
+ if (!queryPlan.output.contains(attr.toAttribute)) {
+ throw new UnsupportedOperationException(
+ "Order by column `" + attr.name + "` must be present in
project columns")
+ }
+ }
+ order
+ }
+ sort
+ }
+
val mainTables = getCatalogTables(queryPlan)
if (mainTables.isEmpty) {
throw new MalformedCarbonCommandException(
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
index 47c7f23..c852331 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
@@ -47,20 +47,16 @@ class MVUtil {
case select: Select =>
select.children.map {
case groupBy: GroupBy =>
- getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
- relations, groupBy.flagSpec)
+ getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
relations)
case _: ModularRelation =>
- getFieldsFromProject(select.outputList, select.predicateList,
- relations, select.flagSpec)
+ getFieldsFromProject(select.outputList, select.predicateList,
relations)
}.head
case groupBy: GroupBy =>
groupBy.child match {
case select: Select =>
- getFieldsFromProject(groupBy.outputList, select.predicateList,
- relations, select.flagSpec)
+ getFieldsFromProject(groupBy.outputList, select.predicateList,
relations)
case _: ModularRelation =>
- getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
- relations, groupBy.flagSpec)
+ getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
relations)
}
}
}
@@ -71,14 +67,12 @@ class MVUtil {
* @param outputList of the modular plan
* @param predicateList of the modular plan
* @param relations list of main table from query
- * @param flagSpec to get SortOrder attribute if exists
* @return fieldRelationMap
*/
private def getFieldsFromProject(
outputList: Seq[NamedExpression],
predicateList: Seq[Expression],
- relations: Seq[Relation],
- flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, MVField] = {
+ relations: Seq[Relation]): mutable.LinkedHashMap[Field, MVField] = {
var fieldToDataMapFieldMap =
scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, relations)
var finalPredicateList: Seq[NamedExpression] = Seq.empty
@@ -88,21 +82,6 @@ class MVUtil {
finalPredicateList = finalPredicateList.:+(attr)
}
}
- // collect sort by columns
- if (flagSpec.nonEmpty) {
- flagSpec.map { f =>
- f.map {
- case list: ArrayBuffer[_] =>
- list.map {
- case s: SortOrder =>
- s.collect {
- case attr: AttributeReference =>
- finalPredicateList = finalPredicateList.:+(attr)
- }
- }
- }
- }
- }
fieldToDataMapFieldMap ++==
getFieldsFromProject(finalPredicateList.distinct, relations)
fieldToDataMapFieldMap
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index e72ce94..b967686 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -289,7 +289,11 @@ object SelectSelectNoChildDelta extends
DefaultMatchPattern with PredicateHelper
if (r2eJoinsMatch) {
if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty
&& isLOEmLOR) {
- Seq(sel_1a)
+ if (sel_1q.flagSpec.isEmpty) {
+ Seq(sel_1a)
+ } else {
+ Seq(sel_1a.copy(flags = sel_1q.flags, flagSpec =
sel_1q.flagSpec))
+ }
} else {
// no compensation needed
val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
index 405cd7c..07015d1 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
@@ -235,6 +235,13 @@ class QueryRewrite private (
case s: Select if s.dataMapTableRelation.isDefined =>
val relation =
s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
+ val aliasMap = getAttributeMap(relation.outputList, s.outputList)
+ // Update the flagspec as per the mv table attributes.
+ val updatedFlagSpec: Seq[Seq[Any]] = updateFlagSpec(
+ keepAlias = false,
+ s,
+ relation,
+ aliasMap)
val outputList = getUpdatedOutputList(relation.outputList,
s.dataMapTableRelation)
// when the output list contains multiple projection of same column,
but relation
// contains distinct columns, mapping may go wrong with columns, so
select distinct
@@ -242,7 +249,8 @@ class QueryRewrite private (
val oList = for ((o1, o2) <- mappings) yield {
if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else
o2
}
- relation.copy(outputList = oList).setRewritten()
+ relation.copy(outputList = oList, flags = s.flags, flagSpec =
updatedFlagSpec)
+ .setRewritten()
case g: GroupBy if g.dataMapTableRelation.isDefined =>
val relation =
g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 773482f..257d087 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -135,6 +135,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
"deptno, deptname, salary, empname from fact_table1")
sql("select * from source limit 2").show(false)
sql("create materialized view mv1 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ assert(sql(" select empname, deptname, avg(salary) from source group by
empname, deptname limit 2").collect().length == 2)
var df = sql("select empname, avg(salary) from source group by empname")
assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
@@ -280,6 +281,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql("create materialized view mv5 as select empname, designation from
fact_table1 where empname='shivani'")
val frame = sql("select designation from fact_table1 where
empname='shivani'")
assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv5"))
+ assert(sql("select designation from fact_table1 where empname='shivani'
limit 5").collect().length == 5)
checkAnswer(frame, sql("select designation from fact_table2 where
empname='shivani'"))
sql(s"drop materialized view mv5")
}
@@ -296,6 +298,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql("create materialized view mv7 as select empname, designation from
fact_table1 where empname='shivani'")
val frame = sql(
"select empname,designation from fact_table1 where empname='shivani' and
designation='SA'")
+ assert( sql("select empname,designation from fact_table1 where
empname='shivani' and designation='SA' limit 1").collect().length == 0)
assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv7"))
checkAnswer(frame, sql("select empname,designation from fact_table2 where
empname='shivani' and designation='SA'"))
sql(s"drop materialized view mv7")
@@ -394,6 +397,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
"select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END)
from fact_table1 group" +
" by empname")
assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan,
"mv17"))
+ assert(sql("select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE
0 END) from fact_table1 group by empname limit 2").collect().length == 2)
checkAnswer(frame, sql("select empname, sum(CASE WHEN utilization=27 THEN
deptno ELSE 0 END) from fact_table2 group" +
" by empname"))
sql(s"drop materialized view mv17")
@@ -437,6 +441,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
val frame = sql(
"select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2
t2 where t1.empname = t2.empname")
assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan,
"mv21"))
+ assert(sql("select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END)
from fact_table1 group by empname limit 2").collect().length == 2)
checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4
t1,fact_table5 t2 where t1.empname = t2.empname"))
sql(s"drop materialized view mv21")
}
@@ -485,6 +490,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
val frame1 = sql(
"select t1.empname as c1, t2.designation from fact_table1 t1 inner join
fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on
(t1.empname=t3.empname)")
assert(TestUtil.verifyMVDataMap(frame1.queryExecution.optimizedPlan,
"mv25"))
+ assert(sql("select t1.empname as c1, t2.designation from fact_table1 t1
inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3
t3 on (t1.empname=t3.empname) limit 2").collect().length == 2)
checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4
t1,fact_table5 t2 where t1.empname = t2.empname"))
sql(s"drop materialized view mv25")
}
@@ -508,6 +514,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan,
"mv27"))
checkAnswer(frame, sql("select t1.empname, t2.designation,
sum(t1.utilization) from fact_table4 t1,fact_table5 t2 " +
"where t1.empname = t2.empname group by t1.empname,
t2.designation"))
+ assert(sql("select t1.empname, t2.designation, sum(t1.utilization) from
fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname group by
t1.empname, t2.designation limit 2").collect().length == 2)
sql(s"drop materialized view mv27")
}
@@ -759,6 +766,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
val frame = sql(
"select empname,sum(salary) as total from fact_table1 group by empname
order by empname")
assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan,
"MV_order"))
+ assert(sql("select empname,sum(salary) as total from fact_table1 group by
empname order by empname limit 2").collect().length == 2)
}
test("jira carbondata-2528-2") {
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 66b0c38..c5211c3 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -575,16 +575,55 @@ class TestAllOperationsOnMV extends QueryTest with
BeforeAndAfterEach {
sql("insert into table maintable select 'abc',21,2000")
val res = sql("select name from maintable order by c_code")
sql("drop materialized view if exists dm1")
- sql("create materialized view dm1 as select name from maintable order by
c_code")
+ intercept[UnsupportedOperationException] {
+ sql("create materialized view dm1 as select name from maintable order
by c_code")
+ }.getMessage.contains("Order by column `c_code` must be present in project
columns")
+ sql("create materialized view dm1 as select name,c_code from maintable
order by c_code")
val df = sql("select name from maintable order by c_code")
TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "dm1")
checkAnswer(res, df)
intercept[Exception] {
sql("alter table maintable drop columns(c_code)")
}.getMessage.contains("Column name cannot be dropped because it exists in
mv materialized view: dm1")
+ sql("insert into table maintable select 'mno',20,2000")
+ checkAnswer(sql("select name from maintable order by c_code"),
Seq(Row("mno"), Row("abc")))
+ sql("drop table if exists maintable")
+ }
+
+ test("test query on mv with limit") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) STORED AS
carbondata")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("insert into table maintable select 'bcd',22,2000")
+ sql("insert into table maintable select 'def',22,2000")
+ sql("drop materialized view if exists mv1")
+ sql("create materialized view mv1 as select a.name,a.price from maintable
a")
+ var dataFrame = sql("select a.name,a.price from maintable a limit 1")
+ assert(dataFrame.count() == 1)
+ TestUtil.verifyMVDataMap(dataFrame.queryExecution.optimizedPlan, "mv1")
+ dataFrame = sql("select a.name,a.price from maintable a order by a.name
limit 1")
+ assert(dataFrame.count() == 1)
+ TestUtil.verifyMVDataMap(dataFrame.queryExecution.optimizedPlan, "mv1")
sql("drop table if exists maintable")
}
+ test("test horizontal comapction on mv for more than two update") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) STORED AS
carbondata")
+ sql("insert into table maintable values('abc',21,2000),('mno',24,3000)")
+ sql("drop materialized view if exists mv1")
+ sql("create materialized view mv1 as select name,c_code from maintable")
+ sql("update maintable set(name) = ('aaa') where c_code = 21").show(false)
+ var df = sql("select name,c_code from maintable")
+ TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1")
+ checkAnswer(df, Seq(Row("aaa",21), Row("mno",24)))
+ sql("update maintable set(name) = ('mmm') where c_code = 24").show(false)
+ df = sql("select name,c_code from maintable")
+ TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1")
+ checkAnswer(df, Seq(Row("aaa",21), Row("mmm",24)))
+ sql("drop table IF EXISTS maintable")
+ }
+
test("drop meta cache on mv materialized view table") {
defaultConfig()
sql("drop table IF EXISTS maintable")