This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4e8f819 [CARBONDATA-3705] Support create and load MV for spark
datasource table
4e8f819 is described below
commit 4e8f819ff5e836f99315bbb94603526af66ccdd4
Author: Jacky Li <[email protected]>
AuthorDate: Tue Feb 18 21:01:29 2020 +0800
[CARBONDATA-3705] Support create and load MV for spark datasource table
Why is this PR needed?
Materialized View is a feature built on top of Spark, it should support not
only carbondata format but also other formats.
What changes were proposed in this PR?
Added an API in CarbonMetaStore to lookup any relation, not just carbon
relation
When creating MV, use newly added lookup relation to get all parent table
relations. Use CatalogTable instead of CarbonTable whenever possible
Skip the segment handling when loading MV
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3625
---
.../carbondata/core/datamap/DataMapProvider.java | 7 +-
.../metadata/schema/table/RelationIdentifier.java | 29 ++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 14 ++
.../spark/sql/hive/CarbonFileMetastore.scala | 30 ++-
.../apache/spark/sql/hive/CarbonMetaStore.scala | 5 +-
.../sql/parser/CarbonSparkSqlParserUtil.scala | 1 +
.../apache/carbondata/mv/extension/MVHelper.scala | 211 +++++++++++----------
.../apache/carbondata/mv/extension/MVUtil.scala | 122 +++++-------
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 127 +++++++++++++
.../mv/rewrite/TestAllOperationsOnMV.scala | 7 +-
10 files changed, 365 insertions(+), 188 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index c38411a..6df7e56 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -189,7 +190,11 @@ public abstract class DataMapProvider {
return false;
}
} else {
- List<RelationIdentifier> relationIdentifiers =
dataMapSchema.getParentTables();
+ // set segment mapping only for carbondata table
+ List<RelationIdentifier> relationIdentifiers =
dataMapSchema.getParentTables()
+ .stream()
+ .filter(RelationIdentifier::isCarbonDataTable)
+ .collect(Collectors.toList());
for (RelationIdentifier relationIdentifier : relationIdentifiers) {
List<String> mainTableSegmentList =
DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
index ff4b7a0..9f4a7e9 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Objects;
/**
* class to maintain the relation between parent and child
@@ -35,6 +36,8 @@ public class RelationIdentifier implements Serializable,
Writable {
private String tablePath = "";
+ private String provider = "carbondata";
+
public RelationIdentifier(String databaseName, String tableName, String
tableId) {
this.databaseName = databaseName;
this.tableName = tableName;
@@ -61,12 +64,25 @@ public class RelationIdentifier implements Serializable,
Writable {
this.tablePath = tablePath;
}
+ public String getProvider() {
+ return provider;
+ }
+
+ public void setProvider(String provider) {
+ this.provider = provider;
+ }
+
+ public boolean isCarbonDataTable() {
+ return provider.equalsIgnoreCase("carbondata");
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(databaseName);
out.writeUTF(tableName);
out.writeUTF(tableId);
out.writeUTF(tablePath);
+ out.writeUTF(provider);
}
@Override
@@ -75,6 +91,7 @@ public class RelationIdentifier implements Serializable,
Writable {
this.tableName = in.readUTF();
this.tableId = in.readUTF();
this.tablePath = in.readUTF();
+ this.provider = in.readUTF();
}
@Override
@@ -84,15 +101,16 @@ public class RelationIdentifier implements Serializable,
Writable {
RelationIdentifier that = (RelationIdentifier) o;
- if (databaseName != null ?
- !databaseName.equals(that.databaseName) :
- that.databaseName != null) {
+ if (!Objects.equals(databaseName, that.databaseName)) {
+ return false;
+ }
+ if (!Objects.equals(tableName, that.tableName)) {
return false;
}
- if (tableName != null ? !tableName.equals(that.tableName) : that.tableName
!= null) {
+ if (!Objects.equals(provider, that.provider)) {
return false;
}
- return tableId != null ? tableId.equals(that.tableId) : that.tableId ==
null;
+ return Objects.equals(tableId, that.tableId);
}
@Override
@@ -100,6 +118,7 @@ public class RelationIdentifier implements Serializable,
Writable {
int result = databaseName != null ? databaseName.hashCode() : 0;
result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
result = 31 * result + (tableId != null ? tableId.hashCode() : 0);
+ result = 31 * result + (provider != null ? provider.hashCode() : 0);
return result;
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 0f52fb7..92782e9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -288,6 +288,20 @@ object CarbonEnv {
}
/**
+ * Return any kinds of table including non-carbon table
+ */
+ def getAnyTable(
+ databaseNameOp: Option[String],
+ tableName: String)
+ (sparkSession: SparkSession): CarbonTable = {
+ val catalog = getInstance(sparkSession).carbonMetaStore
+ catalog
+ .lookupAnyRelation(databaseNameOp, tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
+ }
+
+ /**
*
* @return true is the relation was changes and was removed from cache.
false is there is no
* change in the relation.
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 55bf43b..df05499 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv,
CarbonSource, EnvHelper, SparkSession}
import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias =>
SubqueryAlias}
@@ -31,6 +29,7 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.util.CarbonReflectionUtils
@@ -173,12 +172,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
def lookupRelation(dbName: Option[String], tableName: String)
- (sparkSession: SparkSession): LogicalPlan = {
+ (sparkSession: SparkSession): CarbonRelation = {
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
}
override def lookupRelation(tableIdentifier: TableIdentifier)
- (sparkSession: SparkSession): LogicalPlan = {
+ (sparkSession: SparkSession): CarbonRelation = {
val database = tableIdentifier.database.getOrElse(
sparkSession.catalog.currentDatabase)
val relation =
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
@@ -218,6 +217,29 @@ class CarbonFileMetastore extends CarbonMetaStore {
relation
}
+ override def lookupAnyRelation(
+ dbName: Option[String], tableName: String)
+ (sparkSession: SparkSession): LogicalPlan = {
+ val tableIdentifier = new TableIdentifier(tableName, dbName)
+ val rawRelation =
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
+ rawRelation match {
+ case SubqueryAlias(_, c)
+ if
(c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation")
||
+
c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation")
||
+ c.getClass.getName.equals(
+
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+ val catalogTable =
+ CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta",
c).asInstanceOf[CatalogTable]
+ val tableInfo =
CarbonSparkSqlParserUtil.buildTableInfoFromCatalogTable(
+ catalogTable, false, sparkSession)
+ val carbonTable = CarbonTable.buildFromTableInfo(tableInfo)
+ CarbonRelation(carbonTable.getDatabaseName, carbonTable.getTableName,
carbonTable)
+ case _ =>
+ throw new NoSuchTableException(
+ sparkSession.sessionState.catalog.getCurrentDatabase,
tableIdentifier.table)
+ }
+ }
+
def tableExists(
table: String,
databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean
= {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 6003f81..db71e53 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -35,9 +35,12 @@ import org.apache.carbondata.format.SchemaEvolutionEntry
trait CarbonMetaStore {
def lookupRelation(dbName: Option[String], tableName: String)
- (sparkSession: SparkSession): LogicalPlan
+ (sparkSession: SparkSession): CarbonRelation
def lookupRelation(tableIdentifier: TableIdentifier)
+ (sparkSession: SparkSession): CarbonRelation
+
+ def lookupAnyRelation(dbName: Option[String], tableName: String)
(sparkSession: SparkSession): LogicalPlan
/**
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 02c1a5b..2d128a4 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -460,6 +460,7 @@ object CarbonSparkSqlParserUtil {
)
TableNewProcessor(tableModel)
}
+ tableInfo.setTablePath(identifier.getTablePath)
tableInfo.setTransactionalTable(isTransactionalTable)
tableInfo
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
index 6f0dcbb..b23f350 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
@@ -22,11 +22,12 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSource, SparkSession}
import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Cast, Coalesce, Expression, Literal, NamedExpression,
ScalaUDF}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Average}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit,
LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, PartitionerField,
TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand,
CarbonDropTableCommand}
@@ -45,7 +46,7 @@ import
org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan}
import org.apache.carbondata.mv.plans.util.SQLBuilder
-import org.apache.carbondata.mv.rewrite.{MVUdf, SummaryDatasetCatalog, Utils}
+import org.apache.carbondata.mv.rewrite.{MVUdf, SummaryDatasetCatalog}
import org.apache.carbondata.mv.timeseries.{TimeSeriesFunction, TimeSeriesUtil}
import org.apache.carbondata.spark.util.CommonUtil
@@ -54,16 +55,12 @@ case class MVField(
columnTableRelationList: Seq[ColumnTableRelation]) {
}
-case class ColumnTableRelation(
- parentColumnName: String,
- parentColumnId: String,
- parentTableName: String,
- parentDatabaseName: String,
- parentTableId: String) {
-}
+case class ColumnTableRelation(columnName: String, tableName: String,
databaseName: String)
+
+case class Relation(output: Seq[AttributeReference], catalogTable:
Option[CatalogTable])
/**
- * Utility for MV datamap operations.
+ * Utility for MV operations.
*/
object MVHelper {
@@ -72,42 +69,41 @@ object MVHelper {
dataMapSchema: DataMapSchema,
queryString: String,
ifNotExistsSet: Boolean = false): Unit = {
- val dmProperties = dataMapSchema.getProperties.asScala
- if (dmProperties.contains("streaming") &&
dmProperties("streaming").equalsIgnoreCase("true")) {
+ val properties = dataMapSchema.getProperties.asScala
+ if (properties.contains("streaming") &&
properties("streaming").equalsIgnoreCase("true")) {
throw new MalformedCarbonCommandException(
s"Materialized view does not support streaming"
)
}
val mvUtil = new MVUtil
- mvUtil.validateDMProperty(dmProperties)
- val logicalPlan = dropDummyFunc(
- MVParser.getMVPlan(queryString, sparkSession))
+ mvUtil.validateDMProperty(properties)
+ val queryPlan = dropDummyFunc(MVParser.getMVPlan(queryString,
sparkSession))
// if there is limit in MV ctas query string, throw exception, as its not
a valid usecase
- logicalPlan match {
+ queryPlan match {
case Limit(_, _) =>
throw new MalformedCarbonCommandException("Materialized view does not
support the query " +
"with limit")
case _ =>
}
- val selectTables = getTables(logicalPlan)
- if (selectTables.isEmpty) {
+ val mainTables = getCatalogTables(queryPlan)
+ if (mainTables.isEmpty) {
throw new MalformedCarbonCommandException(
s"Non-Carbon table does not support creating MV datamap")
}
- val modularPlan = validateMVQuery(sparkSession, logicalPlan)
+ val modularPlan = validateMVQuery(sparkSession, queryPlan)
val updatedQueryWithDb = modularPlan.asCompactSQL
- val (timeSeriesColumn, granularity): (String, String) =
validateMVTimeSeriesQuery(
- logicalPlan,
- dataMapSchema)
- val fullRebuild = isFullReload(logicalPlan)
+ val (timeSeriesColumn, granularity) = validateMVTimeSeriesQuery(queryPlan,
dataMapSchema)
+ val isQueryNeedFullRebuild = checkIsQueryNeedFullRebuild(queryPlan)
+ val hasNonCarbonProvider = checkMainTableHasNonCarbonSource(mainTables)
+ val isNeedFullRebuild = isQueryNeedFullRebuild || hasNonCarbonProvider
var counter = 0
// the ctas query can have duplicate columns, so we should take distinct
and create fields,
// so that it won't fail during create mv table
- val fields = logicalPlan.output.map { attr =>
+ val fields = queryPlan.output.map { attr =>
if (attr.dataType.isInstanceOf[ArrayType] ||
attr.dataType.isInstanceOf[StructType] ||
attr.dataType.isInstanceOf[MapType]) {
throw new UnsupportedOperationException(
- s"MV datamap is not supported for complex datatype columns and
complex datatype return " +
+ s"MV is not supported for complex datatype columns and complex
datatype return " +
s"types of function :" + attr.name)
}
val name = updateColumnName(attr, counter)
@@ -133,30 +129,23 @@ object MVHelper {
val tableProperties = mutable.Map[String, String]()
val parentTables = new util.ArrayList[String]()
- val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
- selectTables.foreach { selectTable =>
- val mainCarbonTable = try {
- Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
- selectTable.identifier.table)(sparkSession))
- } catch {
- // Exception handling if it's not a CarbonTable
- case ex: Exception =>
- throw new MalformedCarbonCommandException(
- s"Non-Carbon table does not support creating MV")
- }
- if (!mainCarbonTable.get.getTableInfo.isTransactionalTable) {
+ val parentTablesList = new util.ArrayList[CarbonTable](mainTables.size)
+ mainTables.foreach { mainTable =>
+ val table = CarbonEnv.getAnyTable(
+ mainTable.identifier.database,
mainTable.identifier.table)(sparkSession)
+ if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on
NonTransactional table")
}
- if (mainCarbonTable.get.isChildTableForMV) {
+ if (table.isChildTableForMV) {
throw new MalformedCarbonCommandException(
- "Cannot create MV on child table " +
mainCarbonTable.get.getTableUniqueName)
+ "Cannot create MV on child table " + table.getTableUniqueName)
}
- parentTables.add(mainCarbonTable.get.getTableName)
- if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
+ if (table.isStreamingSink) {
throw new MalformedCarbonCommandException(
- s"Streaming table does not support creating materialized view")
+ s"Streaming table does not support creating materialized view")
}
- parentTablesList.add(mainCarbonTable.get)
+ parentTables.add(table.getTableName)
+ parentTablesList.add(table)
}
// Check if load is in progress in any of the parent table mapped to the
datamap
@@ -173,12 +162,12 @@ object MVHelper {
tableProperties.put(CarbonCommonConstants.PARENT_TABLES,
parentTables.asScala.mkString(","))
val finalModularPlan = new
SQLBuilder(modularPlan).SQLizer.execute(modularPlan)
- val fieldRelationMap =
mvUtil.getFieldsAndDataMapFieldsFromPlan(finalModularPlan,
- getLogicalRelation(logicalPlan))
+ val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(
+ finalModularPlan, getRelation(queryPlan))
// If dataMap is mapped to single main table, then inherit table
properties from main table,
// else, will use default table properties. If DMProperties contains table
properties, then
// table properties of datamap table will be updated
- if (parentTablesList.size() == 1) {
+ if (parentTablesList.size() == 1 &&
CarbonSource.isCarbonDataSource(mainTables.head)) {
inheritTablePropertiesFromMainTable(
parentTablesList.get(0),
fields,
@@ -207,11 +196,11 @@ object MVHelper {
}
}
}
- dmProperties.foreach(t => tableProperties.put(t._1, t._2))
- val usePartitioning = dmProperties.getOrElse("partitioning",
"true").toBoolean
- var partitionerFields: Seq[PartitionerField] = Seq.empty
+ properties.foreach(t => tableProperties.put(t._1, t._2))
+ val usePartitioning = properties.getOrElse("partitioning",
"true").toBoolean
+
// Inherit partition from parent table if datamap is mapped to single
parent table
- if (parentTablesList.size() == 1) {
+ val partitionerFields = if (parentTablesList.size() == 1) {
val partitionInfo = parentTablesList.get(0).getPartitionInfo
val parentPartitionColumns = if (!usePartitioning) {
Seq.empty
@@ -220,26 +209,26 @@ object MVHelper {
} else {
Seq()
}
- partitionerFields = getPartitionerFields(parentPartitionColumns,
fieldRelationMap)
+ getPartitionerFields(parentPartitionColumns, fieldRelationMap)
+ } else {
+ Seq.empty
}
- var order = 0
val columnOrderMap = new java.util.HashMap[Integer, String]()
if (partitionerFields.nonEmpty) {
- fields.foreach { field =>
- columnOrderMap.put(order, field.column)
- order += 1
+ fields.zipWithIndex.foreach { case (field, index) =>
+ columnOrderMap.put(index, field.column)
}
}
- // TODO Use a proper DB
- val tableIdentifier = TableIdentifier(
- dataMapSchema.getDataMapName + "_table",
selectTables.head.identifier.database)
+ val mvTableIdentifier = TableIdentifier(
+ dataMapSchema.getDataMapName + "_table",
mainTables.head.identifier.database)
+
// prepare table model of the collected tokens
- val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
+ val mvTableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExistPresent = ifNotExistsSet,
- CarbonParserUtil.convertDbNameToLowerCase(tableIdentifier.database),
- tableIdentifier.table.toLowerCase,
+ CarbonParserUtil.convertDbNameToLowerCase(mvTableIdentifier.database),
+ mvTableIdentifier.table.toLowerCase,
fields,
partitionerFields,
tableProperties,
@@ -247,28 +236,28 @@ object MVHelper {
isAlterFlow = false,
None)
- val tablePath = if (dmProperties.contains("path")) {
- dmProperties("path")
+ val mvTablePath = if (properties.contains("path")) {
+ properties("path")
} else {
- CarbonEnv.getTablePath(tableModel.databaseNameOp,
tableModel.tableName)(sparkSession)
+ CarbonEnv.getTablePath(mvTableModel.databaseNameOp,
mvTableModel.tableName)(sparkSession)
}
- CarbonCreateTableCommand(TableNewProcessor(tableModel),
- tableModel.ifNotExistsSet, Some(tablePath), isVisible =
false).run(sparkSession)
+ CarbonCreateTableCommand(TableNewProcessor(mvTableModel),
+ mvTableModel.ifNotExistsSet, Some(mvTablePath), isVisible =
false).run(sparkSession)
- // Map list of main table columns mapped to datamap table and add to
dataMapSchema
+ // Map list of main table columns mapped to MV table and add to
dataMapSchema
val mainTableToColumnsMap = new java.util.HashMap[String,
util.Set[String]]()
val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator()
while (mainTableFieldIterator.hasNext) {
val value = mainTableFieldIterator.next()
value.columnTableRelationList.foreach {
columnTableRelation =>
- if (null ==
mainTableToColumnsMap.get(columnTableRelation.parentTableName)) {
+ if (null ==
mainTableToColumnsMap.get(columnTableRelation.tableName)) {
val columns = new util.HashSet[String]()
- columns.add(columnTableRelation.parentColumnName.toLowerCase())
- mainTableToColumnsMap.put(columnTableRelation.parentTableName,
columns)
+ columns.add(columnTableRelation.columnName.toLowerCase())
+ mainTableToColumnsMap.put(columnTableRelation.tableName, columns)
} else {
- mainTableToColumnsMap.get(columnTableRelation.parentTableName)
- .add(columnTableRelation.parentColumnName.toLowerCase())
+ mainTableToColumnsMap.get(columnTableRelation.tableName)
+ .add(columnTableRelation.columnName.toLowerCase())
}
}
}
@@ -280,27 +269,30 @@ object MVHelper {
} else {
dataMapSchema.setCtasQuery(updatedQueryWithDb)
}
- dataMapSchema
- .setRelationIdentifier(new
RelationIdentifier(tableIdentifier.database.get,
- tableIdentifier.table,
- CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession).getTableId))
+ dataMapSchema.setRelationIdentifier(
+ new RelationIdentifier(
+ mvTableIdentifier.database.get,
+ mvTableIdentifier.table,
+ CarbonEnv.getCarbonTable(mvTableIdentifier)(sparkSession).getTableId))
- val parentIdents = selectTables.map { table =>
+ val parentIdents = mainTables.map { table =>
val relationIdentifier = new RelationIdentifier(table.database,
table.identifier.table, "")
relationIdentifier.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
+ relationIdentifier.setProvider(table.provider.get)
relationIdentifier
}
- dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
+ dataMapSchema.getRelationIdentifier.setTablePath(mvTablePath)
dataMapSchema.setParentTables(new
util.ArrayList[RelationIdentifier](parentIdents.asJava))
- dataMapSchema.getProperties.put(DataMapProperty.FULL_REFRESH,
fullRebuild.toString)
+ dataMapSchema.getProperties.put(DataMapProperty.FULL_REFRESH,
isNeedFullRebuild.toString)
try {
DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
} catch {
case ex: Exception =>
- val dropTableCommand = CarbonDropTableCommand(true,
+ val dropTableCommand = CarbonDropTableCommand(
+ ifExistsSet = true,
new
Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
dataMapSchema.getRelationIdentifier.getTableName,
- true)
+ dropChildTable = true)
dropTableCommand.run(sparkSession)
throw ex
}
@@ -378,15 +370,18 @@ object MVHelper {
if (value.nonEmpty) value.head else name
}
- private def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
+ // Return all relations involved in the plan
+ private def getCatalogTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
logicalPlan.collect {
case l: LogicalRelation => l.catalogTable.get
+ case h: HiveTableRelation => h.tableMeta
}
}
- private def getLogicalRelation(logicalPlan: LogicalPlan):
Seq[LogicalRelation] = {
+ private def getRelation(logicalPlan: LogicalPlan): Seq[Relation] = {
logicalPlan.collect {
- case l: LogicalRelation => l
+ case l: LogicalRelation => Relation(l.output, l.catalogTable)
+ case h: HiveTableRelation => Relation(h.output, Some(h.tableMeta))
}
}
@@ -413,10 +408,20 @@ object MVHelper {
}
/**
- * Check if we can do incremental load on the mv table. Some cases like
aggregation functions
- * which are present inside other expressions like sum(a)+sum(b) cannot be
incremental loaded.
+ * Return true if we can do incremental load on the mv table based on data
source
+ * @param mainTables
+ * @return
+ */
+ private def checkMainTableHasNonCarbonSource(mainTables: Seq[CatalogTable]):
Boolean = {
+ mainTables.exists(table => !CarbonSource.isCarbonDataSource(table))
+ }
+
+ /**
+ * Return true if we can do incremental load on the mv table based on the
query plan.
+ * Some cases like aggregation functions which are present inside other
expressions
+ * like sum(a)+sum(b) cannot be incremental loaded.
*/
- private def isFullReload(logicalPlan: LogicalPlan): Boolean = {
+ private def checkIsQueryNeedFullRebuild(logicalPlan: LogicalPlan): Boolean =
{
var isFullReload = false
logicalPlan.transformAllExpressions {
case a: Alias => a
@@ -534,7 +539,7 @@ object MVHelper {
val validRelation = fieldRelations.zipWithIndex.collectFirst {
case ((field, dataMapField), index) if
dataMapField.columnTableRelationList.nonEmpty &&
-
head.equals(dataMapField.columnTableRelationList.head.parentColumnName) &&
+ head.equals(dataMapField.columnTableRelationList.head.columnName)
&&
dataMapField.aggregateFunction.isEmpty =>
(PartitionerField(field.name.get,
field.dataType,
@@ -568,20 +573,21 @@ object MVHelper {
generatePartitionerField(allPartitionColumn.toList, Seq.empty)
}
- private def inheritTablePropertiesFromMainTable(parentTable: CarbonTable,
+ private def inheritTablePropertiesFromMainTable(
+ parentTable: CarbonTable,
fields: Seq[Field],
fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, MVField],
tableProperties: mutable.Map[String, String]): Unit = {
- var neworder = Seq[String]()
+ var newOrder = Seq[String]()
val parentOrder = parentTable.getSortColumns.asScala
- parentOrder.foreach(parentcol =>
+ parentOrder.foreach(parentCol =>
fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
fieldRelationMap(col).columnTableRelationList.size
== 1 &&
- parentcol.equalsIgnoreCase(
-
fieldRelationMap(col).columnTableRelationList(0).parentColumnName))
- .map(cols => neworder :+= cols.column))
- if (neworder.nonEmpty) {
- tableProperties.put(CarbonCommonConstants.SORT_COLUMNS,
neworder.mkString(","))
+ parentCol.equalsIgnoreCase(
+
fieldRelationMap(col).columnTableRelationList.head.columnName))
+ .map(cols => newOrder :+= cols.column))
+ if (newOrder.nonEmpty) {
+ tableProperties.put(CarbonCommonConstants.SORT_COLUMNS,
newOrder.mkString(","))
}
val sort_scope =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
.get("sort_scope")
@@ -610,10 +616,9 @@ object MVHelper {
val relationList = fields._2.columnTableRelationList
// check if columns present in datamap are long_string_col in parent
table. If they are
// long_string_columns in parent, make them long_string_columns in
datamap
- if (aggFunc.isEmpty &&
- relationList.size == 1 &&
-
longStringColumnInParents.contains(relationList.head.parentColumnName)) {
- varcharDatamapFields += relationList.head.parentColumnName
+ if (aggFunc.isEmpty && relationList.size == 1 &&
longStringColumnInParents
+ .contains(relationList.head.columnName)) {
+ varcharDatamapFields += relationList.head.columnName
}
})
if (varcharDatamapFields.nonEmpty) {
@@ -621,7 +626,7 @@ object MVHelper {
}
}
- if (longStringColumn != None) {
+ if (longStringColumn.isDefined) {
val fieldNames = fields.map(_.column)
val newLongStringColumn =
longStringColumn.get.split(",").map(_.trim).map { colName =>
val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
@@ -699,7 +704,7 @@ object MVHelper {
case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
fieldRelationMap(col).columnTableRelationList.size == 1 &&
parentcol.equalsIgnoreCase(
-
fieldRelationMap(col).columnTableRelationList.head.parentColumnName) =>
+
fieldRelationMap(col).columnTableRelationList.head.columnName) =>
col.column
})
dataMapColumns
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
index e707cac..c127262 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
@@ -20,15 +20,13 @@ package org.apache.carbondata.mv.extension
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.command.Field
-import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.DataType
import
org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan,
ModularRelation, Select}
import org.apache.carbondata.spark.util.CommonUtil
@@ -44,26 +42,25 @@ class MVUtil {
*/
def getFieldsAndDataMapFieldsFromPlan(
plan: ModularPlan,
- logicalRelation: Seq[LogicalRelation]):
scala.collection.mutable.LinkedHashMap[Field,
- MVField] = {
+ relations: Seq[Relation]): mutable.LinkedHashMap[Field, MVField] = {
plan match {
case select: Select =>
select.children.map {
case groupBy: GroupBy =>
getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
- logicalRelation, groupBy.flagSpec)
+ relations, groupBy.flagSpec)
case _: ModularRelation =>
getFieldsFromProject(select.outputList, select.predicateList,
- logicalRelation, select.flagSpec)
+ relations, select.flagSpec)
}.head
case groupBy: GroupBy =>
groupBy.child match {
case select: Select =>
getFieldsFromProject(groupBy.outputList, select.predicateList,
- logicalRelation, select.flagSpec)
+ relations, select.flagSpec)
case _: ModularRelation =>
getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
- logicalRelation, groupBy.flagSpec)
+ relations, groupBy.flagSpec)
}
}
}
@@ -73,17 +70,17 @@ class MVUtil {
* user query
* @param outputList of the modular plan
* @param predicateList of the modular plan
- * @param logicalRelation list of main table from query
+ * @param relations list of main table from query
* @param flagSpec to get SortOrder attribute if exists
* @return fieldRelationMap
*/
private def getFieldsFromProject(
outputList: Seq[NamedExpression],
predicateList: Seq[Expression],
- logicalRelation: Seq[LogicalRelation],
+ relations: Seq[Relation],
flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, MVField] = {
var fieldToDataMapFieldMap =
scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
- fieldToDataMapFieldMap ++== getFieldsFromProject(outputList,
logicalRelation)
+ fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, relations)
var finalPredicateList: Seq[NamedExpression] = Seq.empty
predicateList.map { p =>
p.collect {
@@ -106,31 +103,30 @@ class MVUtil {
}
}
}
- fieldToDataMapFieldMap ++==
getFieldsFromProject(finalPredicateList.distinct, logicalRelation)
+ fieldToDataMapFieldMap ++==
getFieldsFromProject(finalPredicateList.distinct, relations)
fieldToDataMapFieldMap
}
private def getFieldsFromProject(
projectList: Seq[NamedExpression],
- logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field,
MVField] = {
+ relations: Seq[Relation]): mutable.LinkedHashMap[Field, MVField] = {
var fieldToDataMapFieldMap =
scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
projectList.map {
case attr: AttributeReference =>
- val carbonTable = getCarbonTable(logicalRelation, attr)
- if (null != carbonTable) {
+ val catalogTable = getCatalogTable(relations, attr)
+ if (null != catalogTable) {
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
- val relation = getColumnRelation(attr.name,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
- carbonTable)
+ val relation = getColumnRelation(
+ attr.name,
+ catalogTable.identifier.table,
+ catalogTable.database)
if (null != relation) {
arrayBuffer += relation
}
var qualifier: Option[String] = None
if (attr.qualifier.nonEmpty) {
qualifier = if
(attr.qualifier.headOption.get.startsWith("gen_sub")) {
- Some(carbonTable.getTableName)
+ Some(catalogTable.identifier.table)
} else {
attr.qualifier.headOption
}
@@ -142,17 +138,16 @@ class MVUtil {
qualifier.headOption,
"",
arrayBuffer,
- carbonTable.getTableName)
+ catalogTable.identifier.table)
}
case Alias(attr: AttributeReference, name) =>
- val carbonTable = getCarbonTable(logicalRelation, attr)
- if (null != carbonTable) {
+ val catalogTable = getCatalogTable(relations, attr)
+ if (null != catalogTable) {
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
- val relation = getColumnRelation(attr.name,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
- carbonTable)
+ val relation = getColumnRelation(
+ attr.name,
+ catalogTable.identifier.table,
+ catalogTable.database)
if (null != relation) {
arrayBuffer += relation
}
@@ -165,13 +160,12 @@ class MVUtil {
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
a.collect {
case attr: AttributeReference =>
- val carbonTable = getCarbonTable(logicalRelation, attr)
- if (null != carbonTable) {
- val relation = getColumnRelation(attr.name,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
- carbonTable)
+ val catalogTable = getCatalogTable(relations, attr)
+ if (null != catalogTable) {
+ val relation = getColumnRelation(
+ attr.name,
+ catalogTable.identifier.table,
+ catalogTable.database)
if (null != relation) {
arrayBuffer += relation
}
@@ -190,13 +184,12 @@ class MVUtil {
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
a.collect {
case attr: AttributeReference =>
- val carbonTable = getCarbonTable(logicalRelation, attr)
- if (null != carbonTable) {
- val relation = getColumnRelation(attr.name,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
- carbonTable)
+ val catalogTable = getCatalogTable(relations, attr)
+ if (null != catalogTable) {
+ val relation = getColumnRelation(
+ attr.name,
+ catalogTable.identifier.table,
+ catalogTable.database)
if (null != relation) {
arrayBuffer += relation
}
@@ -213,37 +206,26 @@ class MVUtil {
*/
private def getColumnRelation(
parentColumnName: String,
- parentTableId: String,
parentTableName: String,
- parentDatabaseName: String,
- carbonTable: CarbonTable): ColumnTableRelation = {
- val parentColumn = carbonTable.getColumnByName(parentColumnName)
- var columnTableRelation: ColumnTableRelation = null
- if (null != parentColumn) {
- val parentColumnId = parentColumn.getColumnId
- columnTableRelation = ColumnTableRelation(parentColumnName =
parentColumnName,
- parentColumnId = parentColumnId,
- parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
- columnTableRelation
- } else {
- columnTableRelation
- }
+ parentDatabaseName: String) = {
+ ColumnTableRelation(
+ columnName = parentColumnName,
+ tableName = parentTableName,
+ databaseName = parentDatabaseName)
}
/**
- * This method is used to get carbon table for corresponding attribute
reference
- * from logical relation
+ * Return the catalog table after matching the attr in logicalRelation
*/
- private def getCarbonTable(logicalRelation: Seq[LogicalRelation],
- attr: AttributeReference) = {
- val relations = logicalRelation
- .filter(lr => lr.output
- .exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) &&
- attrRef.exprId.equals(attr.exprId)))
- if (relations.nonEmpty) {
- relations
-
.head.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.carbonTable
+ private def getCatalogTable(
+ relations: Seq[Relation],
+ attr: AttributeReference): CatalogTable = {
+ val filteredRelation = relations.filter { relation =>
+ relation.output.exists(attrRef =>
attrRef.name.equalsIgnoreCase(attr.name) &&
+ attrRef.exprId.equals(attr.exprId))
+ }
+ if (filteredRelation.nonEmpty) {
+ filteredRelation.head.catalogTable.get
} else {
null
}
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 633db24..55ab5e8 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -104,6 +104,133 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE
fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
}
+ test("test create mv on parquet spark table") {
+ sql("drop materialized view if exists mv1")
+ sql("drop table if exists source")
+ sql("create table source using parquet as select * from fact_table1")
+ sql("create materialized view mv1 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ var df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ // load to parquet table and check again
+ sql("insert into source select * from fact_table1")
+ df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ sql(s"drop materialized view mv1")
+ sql("drop table source")
+ }
+
+ test("test create mv on partitioned parquet spark table") {
+ sql("drop materialized view if exists mv1")
+ sql("drop table if exists source")
+ sql("""
+ | create table source (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String, salary int)
+ | using parquet partitioned by (empname)
+ """.stripMargin)
+ sql("insert into source select designation, doj, workgroupcategory,
workgroupcategoryname, " +
+ "deptno, deptname, salary, empname from fact_table1")
+ sql("select * from source limit 2").show(false)
+ sql("create materialized view mv1 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ var df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ // load to parquet table and check again
+ sql("insert into source select designation, doj, workgroupcategory,
workgroupcategoryname, " +
+ "deptno, deptname, salary, empname from fact_table1")
+ df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ sql(s"drop materialized view mv1")
+ sql("drop table source")
+ }
+
+ test("test create mv on orc spark table") {
+ sql("drop materialized view if exists mv1")
+ sql("drop table if exists source")
+ sql("create table source using orc as select * from fact_table1")
+ sql("create materialized view mv1 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ var df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ // load to orc table and check again
+ sql("insert into source select * from fact_table1")
+ df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ sql(s"drop materialized view mv1")
+ sql("drop table source")
+ }
+
+ test("test create mv on partitioned orc spark table") {
+ sql("drop materialized view if exists mv1")
+ sql("drop table if exists source")
+ sql("""
+ | create table source (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String, salary int)
+ | using orc partitioned by (empname)
+ """.stripMargin)
+ sql("insert into source select designation, doj, workgroupcategory,
workgroupcategoryname, " +
+ "deptno, deptname, salary, empname from fact_table1")
+ sql("select * from source limit 2").show(false)
+ sql("create materialized view mv1 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ var df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ // load to parquet table and check again
+ sql("insert into source select designation, doj, workgroupcategory,
workgroupcategoryname, " +
+ "deptno, deptname, salary, empname from fact_table1")
+ df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ sql(s"drop materialized view mv1")
+ sql("drop table source")
+ }
+
+ test("test create mv on parquet hive table") {
+ sql("drop materialized view if exists mv1")
+ sql("drop table if exists source")
+ sql("create table source stored as parquet as select * from fact_table1")
+ sql("create materialized view mv1 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ var df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ // load to parquet table and check again
+ sql("insert into source select * from fact_table1")
+ df = sql("select empname, avg(salary) from source group by empname")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+
+ sql(s"drop materialized view mv1")
+ sql("drop table source")
+ }
+
+ // TODO: orc hive table is not supported since MV rewrite does not handle
HiveTableRelation
+ ignore("test create mv on orc hive table") {
+ sql("drop materialized view if exists mv2")
+ sql("drop table if exists source")
+ sql("create table source stored as orc as select * from fact_table1")
+ sql("explain extended select empname, avg(salary) from source group by
empname").show(false)
+ sql("create materialized view mv2 as select empname, deptname, avg(salary)
from source group by empname, deptname")
+ sql("select * from mv2_table").show
+ val df = sql("select empname, avg(salary) from source group by empname")
+ sql("explain extended select empname, avg(salary) from source group by
empname").show(false)
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv2"))
+ checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by
empname"))
+ sql(s"drop materialized view mv2")
+ sql("drop table source")
+ }
+
test("test create mv with simple and same projection") {
sql("drop materialized view if exists mv1")
sql("create materialized view mv1 as select empname, designation from
fact_table1")
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 84c8037..a8c9af5 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -235,12 +235,11 @@ class TestAllOperationsOnMV extends QueryTest with
BeforeAndAfterEach {
test("test mv with non-carbon table") {
sql("drop table if exists noncarbon")
- sql("create table noncarbon (product string,amount int)")
+ sql("create table noncarbon (product string,amount int) stored as parquet")
sql("insert into noncarbon values('Mobile',2000)")
sql("drop materialized view if exists p")
- intercept[MalformedCarbonCommandException] {
- sql("Create materialized view p as Select product from noncarbon")
- }.getMessage.contains("Non-Carbon table does not support creating MV
materialized view")
+ sql("Create materialized view p as Select product from noncarbon")
+ sql("drop materialized view p")
sql("drop table if exists noncarbon")
}