This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 2a28dba [CARBONDATA-3357] Support TableProperties from single parent
table and restrict alter/delete/partition on mv
2a28dba is described below
commit 2a28dba04236ce976984d9cbc398eb8fa517d6f5
Author: Indhumathi27 <[email protected]>
AuthorDate: Wed Apr 24 01:04:21 2019 +0530
[CARBONDATA-3357] Support TableProperties from single parent table and
restrict alter/delete/partition on mv
Inherit Table Properties from main table to mv datamap table, if datamap
has single parent table, else use
default table properties.
Restrict Alter/Delete/Partition operations on MV
This closes #3184
---
.../core/datamap/DataMapStoreManager.java | 27 +-
.../carbondata/core/datamap/DataMapUtil.java | 1 +
.../core/metadata/schema/table/CarbonTable.java | 17 --
.../core/metadata/schema/table/DataMapSchema.java | 14 +
.../carbondata/mv/datamap/MVDataMapProvider.scala | 19 +-
.../apache/carbondata/mv/datamap/MVHelper.scala | 110 ++++++--
.../org/apache/carbondata/mv/datamap/MVUtil.scala | 287 +++++++++++++++++++++
.../mv/rewrite/MVCountAndCaseTestCase.scala | 2 -
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 29 +--
.../mv/rewrite/MVIncrementalLoadingTestcase.scala | 1 -
.../mv/rewrite/MVMultiJoinTestCase.scala | 8 +-
.../carbondata/mv/rewrite/MVTpchTestCase.scala | 10 +-
.../mv/rewrite/TestAllOperationsOnMV.scala | 255 ++++++++++++++++++
.../mv/rewrite/matching/TestSQLBatch.scala | 4 +-
.../preaggregate/TestPreAggregateLoad.scala | 2 +-
.../TestTimeSeriesUnsupportedSuite.scala | 8 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 9 +-
.../command/datamap/CarbonDropDataMapCommand.scala | 9 +
.../management/CarbonCleanFilesCommand.scala | 3 +-
.../execution/command/mv/DataMapListeners.scala | 146 ++++++++++-
.../CarbonAlterTableDropHivePartitionCommand.scala | 7 +-
.../preaaggregate/PreAggregateListeners.scala | 6 +-
.../preaaggregate/PreAggregateTableHelper.scala | 102 +-------
.../schema/CarbonAlterTableRenameCommand.scala | 7 +-
.../spark/sql/execution/strategy/DDLStrategy.scala | 4 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 10 +-
.../scala/org/apache/spark/util/DataMapUtil.scala | 160 ++++++++++++
27 files changed, 1054 insertions(+), 203 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 81b1fb2..89402c2 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -281,19 +281,22 @@ public final class DataMapStoreManager {
dataMapCatalogs = new ConcurrentHashMap<>();
List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
for (DataMapSchema schema : dataMapSchemas) {
- DataMapCatalog dataMapCatalog =
dataMapCatalogs.get(schema.getProviderName());
- if (dataMapCatalog == null) {
- dataMapCatalog = dataMapProvider.createDataMapCatalog();
- if (null == dataMapCatalog) {
- throw new RuntimeException("Internal Error.");
+ if (schema.getProviderName()
+
.equalsIgnoreCase(dataMapProvider.getDataMapSchema().getProviderName())) {
+ DataMapCatalog dataMapCatalog =
dataMapCatalogs.get(schema.getProviderName());
+ if (dataMapCatalog == null) {
+ dataMapCatalog = dataMapProvider.createDataMapCatalog();
+ if (null == dataMapCatalog) {
+ throw new RuntimeException("Internal Error.");
+ }
+ dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
+ }
+ try {
+ dataMapCatalog.registerSchema(schema);
+ } catch (Exception e) {
+ // Ignore the schema
+ LOGGER.error("Error while registering schema", e);
}
- dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
- }
- try {
- dataMapCatalog.registerSchema(schema);
- } catch (Exception e) {
- // Ignore the schema
- LOGGER.error("Error while registering schema", e);
}
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 0a604fb..e20f19a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -270,4 +270,5 @@ public class DataMapUtil {
}
return segmentList;
}
+
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index afb5fd3..4f4475d 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -63,7 +63,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import static
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
import static
org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
import com.google.common.collect.Lists;
@@ -1321,22 +1320,6 @@ public class CarbonTable implements Serializable,
Writable {
}
/**
- * Return true if MV datamap present in the specified table
- * @param carbonTable
- * @return timeseries data map present
- */
- public static boolean hasMVDataMap(CarbonTable carbonTable) throws
IOException {
- List<DataMapSchema> dataMapSchemaList = DataMapStoreManager.getInstance()
- .getDataMapSchemasOfTable(carbonTable);
- for (DataMapSchema dataMapSchema : dataMapSchemaList) {
- if (dataMapSchema.getProviderName().equalsIgnoreCase(MV.toString())) {
- return true;
- }
- }
- return false;
- }
-
- /**
* Return all inverted index columns in this table
*/
public List<ColumnSchema> getInvertedIndexColumns() {
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index a48b03c..b927ce0 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
@@ -80,6 +81,11 @@ public class DataMapSchema implements Serializable, Writable
{
*/
protected TableSchema childSchema;
+ /**
+ * main table column list mapped to datamap table
+ */
+ private Map<String, Set<String>> mainTableColumnList;
+
public DataMapSchema(String dataMapName, String providerName) {
this.dataMapName = dataMapName;
this.providerName = providerName;
@@ -250,4 +256,12 @@ public class DataMapSchema implements Serializable,
Writable {
@Override public int hashCode() {
return Objects.hash(dataMapName);
}
+
+ public Map<String, Set<String>> getMainTableColumnList() {
+ return mainTableColumnList;
+ }
+
+ public void setMainTableColumnList(Map<String, Set<String>>
mainTableColumnList) {
+ this.mainTableColumnList = mainTableColumnList;
+ }
}
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 26c4cb6..90b7dbc 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -58,7 +58,21 @@ class MVDataMapProvider(
"select statement is mandatory")
}
MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement,
true)
- DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+ try {
+ DataMapStoreManager.getInstance.registerDataMapCatalog(this,
dataMapSchema)
+ if (dataMapSchema.isLazy) {
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ }
+ } catch {
+ case exception: Exception =>
+ dropTableCommand = new CarbonDropTableCommand(true,
+ new
Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+ dataMapSchema.getRelationIdentifier.getTableName,
+ true)
+ dropTableCommand.run(sparkSession)
+
DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+ throw exception
+ }
}
override def initData(): Unit = {
@@ -141,7 +155,8 @@ class MVDataMapProvider(
dataFrame = Some(queryPlan),
updateModel = None,
tableInfoOp = None,
- internalOptions = Map("mergedSegmentName" -> newLoadName),
+ internalOptions = Map("mergedSegmentName" -> newLoadName,
+ CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
partition = Map.empty)
try {
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 810449c..4bcaa1d 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -29,17 +29,17 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
Attribute, AttributeRef
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join,
LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, TableModel,
TableNewProcessor}
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand,
CarbonDropTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.util.DataMapUtil
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
-import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema,
RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
DataMapSchema, RelationIdentifier}
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable,
ModularPlan, Select}
import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite,
SummaryDatasetCatalog}
@@ -58,7 +58,12 @@ object MVHelper {
val updatedQuery = new
CarbonSpark2SqlParser().addPreAggFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
- validateMVQuery(sparkSession, logicalPlan)
+ val selectTables = getTables(logicalPlan)
+ if (selectTables.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ s"Non-Carbon table does not support creating MV datamap")
+ }
+ val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
val fields = logicalPlan.output.map { attr =>
val name = updateColumnName(attr)
@@ -81,29 +86,41 @@ object MVHelper {
}
}
val tableProperties = mutable.Map[String, String]()
- dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-
- val selectTables = getTables(logicalPlan)
val parentTables = new util.ArrayList[String]()
+ val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
selectTables.foreach { selectTable =>
val mainCarbonTable = try {
Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
selectTable.identifier.table)(sparkSession))
} catch {
// Exception handling if it's not a CarbonTable
- case ex : Exception => None
+ case ex: Exception =>
+ throw new MalformedCarbonCommandException(
+ s"Non-Carbon table does not support creating MV datamap")
}
parentTables.add(mainCarbonTable.get.getTableName)
if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
throw new MalformedCarbonCommandException(
s"Streaming table does not support creating MV datamap")
}
+ parentTablesList.add(mainCarbonTable.get)
}
tableProperties.put(CarbonCommonConstants.DATAMAP_NAME,
dataMapSchema.getDataMapName)
tableProperties.put(CarbonCommonConstants.PARENT_TABLES,
parentTables.asScala.mkString(","))
- // TODO inherit the table properties like sort order, sort scope and block
size from parent
- // tables to mv datamap table
+ val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan(
+ logicalPlan, queryString, sparkSession)
+ // If dataMap is mapped to single main table, then inherit table
properties from main table,
+ // else, will use default table properties. If DMProperties contains table
properties, then
+ // table properties of datamap table will be updated
+ if (parentTablesList.size() == 1) {
+ DataMapUtil
+ .inheritTablePropertiesFromMainTable(parentTablesList.get(0),
+ fields,
+ fieldRelationMap,
+ tableProperties)
+ }
+ dmProperties.foreach(t => tableProperties.put(t._1, t._2))
// TODO Use a proper DB
val tableIdentifier =
TableIdentifier(dataMapSchema.getDataMapName + "_table",
@@ -129,7 +146,28 @@ object MVHelper {
CarbonCreateTableCommand(TableNewProcessor(tableModel),
tableModel.ifNotExistsSet, Some(tablePath), isVisible =
false).run(sparkSession)
- dataMapSchema.setCtasQuery(queryString)
+ // Map list of main table columns mapped to datamap table and add to
dataMapSchema
+ val mainTableToColumnsMap = new java.util.HashMap[String,
util.Set[String]]()
+ val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator()
+ while (mainTableFieldIterator.hasNext) {
+ val value = mainTableFieldIterator.next()
+ value.columnTableRelationList.foreach {
+ columnTableRelation =>
+ columnTableRelation.foreach {
+ mainTable =>
+ if (null ==
mainTableToColumnsMap.get(mainTable.parentTableName)) {
+ val columns = new util.HashSet[String]()
+ columns.add(mainTable.parentColumnName)
+ mainTableToColumnsMap.put(mainTable.parentTableName, columns)
+ } else {
+ mainTableToColumnsMap.get(mainTable.parentTableName)
+ .add(mainTable.parentColumnName)
+ }
+ }
+ }
+ }
+ dataMapSchema.setMainTableColumnList(mainTableToColumnsMap)
+ dataMapSchema.setCtasQuery(updatedQueryWithDb)
dataMapSchema
.setRelationIdentifier(new
RelationIdentifier(tableIdentifier.database.get,
tableIdentifier.table,
@@ -143,14 +181,44 @@ object MVHelper {
dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
dataMapSchema.setParentTables(new
util.ArrayList[RelationIdentifier](parentIdents.asJava))
dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString)
- DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
- if (dataMapSchema.isLazy) {
- DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ try {
+ DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+ } catch {
+ case ex: Exception =>
+ val dropTableCommand = CarbonDropTableCommand(true,
+ new
Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+ dataMapSchema.getRelationIdentifier.getTableName,
+ true)
+ dropTableCommand.run(sparkSession)
+ throw ex
+ }
+ }
+
+ private def isValidSelect(isValidExp: Boolean,
+ s: Select): Boolean = {
+ // Make sure all predicates are present in projections.
+ var predicateList: Seq[AttributeReference] = Seq.empty
+ s.predicateList.map { f =>
+ f.children.collect {
+ case p: AttributeReference =>
+ predicateList = predicateList.+:(p)
+ }
+ }
+ if (predicateList.nonEmpty) {
+ predicateList.forall { p =>
+ s.outputList.exists {
+ case a: Alias =>
+ a.semanticEquals(p) || a.child.semanticEquals(p)
+ case other => other.semanticEquals(p)
+ }
+ }
+ } else {
+ isValidExp
}
}
private def validateMVQuery(sparkSession: SparkSession,
- logicalPlan: LogicalPlan): Unit = {
+ logicalPlan: LogicalPlan): String = {
val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
new DataMapSchema("", DataMapClassProvider.MV.getShortName),
sparkSession)
var catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
@@ -169,17 +237,24 @@ object MVHelper {
val isValid = modularPlan match {
case g: GroupBy =>
// Make sure all predicates are present in projections.
- g.predicateList.forall{p =>
+ val isValidExp = g.predicateList.forall{p =>
g.outputList.exists{
case a: Alias =>
a.semanticEquals(p) || a.child.semanticEquals(p)
case other => other.semanticEquals(p)
}
}
+ g.child match {
+ case s: Select =>
+ isValidSelect(isValidExp, s)
+ }
+ case s: Select =>
+ isValidSelect(true, s)
case _ => true
}
if (!isValid) {
- throw new UnsupportedOperationException("Group by columns must be
present in project columns")
+ throw new UnsupportedOperationException(
+ "Group by/Filter columns must be present in project columns")
}
if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
throw new UnsupportedOperationException("MV with same query present")
@@ -196,6 +271,7 @@ object MVHelper {
if (!expressionValid) {
throw new UnsupportedOperationException("MV doesn't support Coalesce")
}
+ modularPlan.asCompactSQL
}
def updateColumnName(attr: Attribute): String = {
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
new file mode 100644
index 0000000..6852695
--- /dev/null
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.datamap
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
+import org.apache.spark.sql.catalyst.expressions._
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{ColumnTableRelation,
DataMapField, Field}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility class for keeping all the utility method for mv datamap
+ */
+object MVUtil {
+
+ /**
+ * Below method will be used to validate and get the required fields from
select plan
+ */
+ def getFieldsAndDataMapFieldsFromPlan(plan: LogicalPlan,
+ selectStmt: String,
+ sparkSession: SparkSession):
scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+ plan match {
+ case Project(projectList, child: Sort) =>
+ getFieldsFromProject(projectList, plan, child)
+ case Project(projectList, _) =>
+ getFieldsFromProject(projectList, plan)
+ case Aggregate(groupByExp, aggExp, _) =>
+ getFieldsFromAggregate(groupByExp, aggExp, plan)
+ }
+ }
+
+ def getFieldsFromProject(projectList: Seq[NamedExpression],
+ plan: LogicalPlan, sort: LogicalPlan): mutable.LinkedHashMap[Field,
DataMapField] = {
+ var fieldToDataMapFieldMap =
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ sort.transformDown {
+ case agg@Aggregate(groupByExp, aggExp, _) =>
+ fieldToDataMapFieldMap ++== getFieldsFromAggregate(groupByExp, aggExp,
plan)
+ agg
+ }
+ fieldToDataMapFieldMap ++== getFieldsFromProject(projectList, plan)
+ fieldToDataMapFieldMap
+ }
+
+ def getFieldsFromProject(projectList: Seq[NamedExpression],
+ plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+ var fieldToDataMapFieldMap =
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ val logicalRelation =
+ plan.collect {
+ case lr: LogicalRelation =>
+ lr
+ }
+ projectList.map {
+ case attr: AttributeReference =>
+ val carbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
+ val relation = getColumnRelation(attr.name,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ if (null != relation) {
+ arrayBuffer += relation
+ }
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(attr.name,
+ attr.dataType,
+ attr.qualifier,
+ "",
+ arrayBuffer,
+ carbonTable.getTableName)
+ }
+ case Alias(attr: AttributeReference, name) =>
+ val carbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
+ val relation = getColumnRelation(attr.name,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ if (null != relation) {
+ arrayBuffer += relation
+ }
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(name, attr.dataType, attr.qualifier, "",
arrayBuffer, "")
+ }
+ }
+ fieldToDataMapFieldMap
+ }
+
+ def getFieldsFromAggregate(groupByExp: Seq[Expression],
+ aggExp: Seq[NamedExpression],
+ plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+ var fieldToDataMapFieldMap =
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ val logicalRelation =
+ plan.collect {
+ case lr: LogicalRelation =>
+ lr
+ }
+ aggExp.map { agg =>
+ var aggregateType: String = ""
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
ArrayBuffer[ColumnTableRelation]()
+ agg.collect {
+ case Alias(attr: AggregateExpression, name) =>
+ if (attr.aggregateFunction.isInstanceOf[Count]) {
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(name,
+ attr.aggregateFunction.dataType,
+ None,
+ attr.aggregateFunction.nodeName,
+ arrayBuffer,
+ "")
+ } else {
+ aggregateType = attr.aggregateFunction.nodeName
+ }
+ case Alias(_, name) =>
+ // In case of arithmetic expressions like sum(a)+sum(b)
+ aggregateType = "arithmetic"
+ }
+ agg.collect {
+ case attr: AttributeReference =>
+ val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val relation = getColumnRelation(attr.name,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ if (null != relation) {
+ arrayBuffer += relation
+ }
+ if (aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+ val tableName = carbonTable.getTableName
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(agg.name,
+ agg.dataType,
+ attr.qualifier,
+ aggregateType,
+ arrayBuffer,
+ tableName)
+ }
+ }
+ }
+ if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(agg.name,
+ agg.dataType,
+ agg.qualifier,
+ aggregateType,
+ arrayBuffer,
+ "")
+ }
+ }
+ groupByExp map { grp =>
+ grp.collect {
+ case attr: AttributeReference =>
+ val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
+ ArrayBuffer[ColumnTableRelation]()
+ arrayBuffer += getColumnRelation(attr.name,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(attr.name,
+ attr.dataType,
+ attr.qualifier,
+ "",
+ arrayBuffer,
+ carbonTable.getTableName)
+ }
+ }
+ }
+ fieldToDataMapFieldMap
+ }
+
+ /**
+ * Below method will be used to get the column relation with the parent
column
+ */
+ def getColumnRelation(parentColumnName: String,
+ parentTableId: String,
+ parentTableName: String,
+ parentDatabaseName: String,
+ carbonTable: CarbonTable): ColumnTableRelation = {
+ val parentColumn = carbonTable.getColumnByName(parentTableName,
parentColumnName)
+ var columnTableRelation: ColumnTableRelation = null
+ if (null != parentColumn) {
+ val parentColumnId = parentColumn.getColumnId
+ columnTableRelation = ColumnTableRelation(parentColumnName =
parentColumnName,
+ parentColumnId = parentColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ columnTableRelation
+ } else {
+ columnTableRelation
+ }
+ }
+
+ /**
+ * This method is used to get carbon table for corresponding attribute
reference
+ * from logical relation
+ */
+ private def getCarbonTable(logicalRelation: Seq[LogicalRelation],
+ attr: AttributeReference) = {
+ val relations = logicalRelation
+ .filter(lr => lr.output
+ .exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) &&
+ attrRef.exprId.equals(attr.exprId)))
+ if (relations.nonEmpty) {
+ relations
+
.head.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+ .metaData.carbonTable
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Below method will be used to get the fields object for mv table
+ */
+ private def getFieldToDataMapFields(name: String,
+ dataType: DataType,
+ qualifier: Option[String],
+ aggregateType: String,
+ columnTableRelationList: ArrayBuffer[ColumnTableRelation],
+ parenTableName: String) = {
+ var actualColumnName =
+ name.replace("(", "_")
+ .replace(")", "")
+ .replace(" ", "_")
+ .replace("=", "")
+ .replace(",", "")
+ if (qualifier.isDefined) {
+ actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
+ .getOrElse(actualColumnName)
+ }
+ if (qualifier.isEmpty) {
+ if (aggregateType.isEmpty && !parenTableName.isEmpty) {
+ actualColumnName = parenTableName + "_" + actualColumnName
+ }
+ }
+ val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
+ val dataMapField = DataMapField(aggregateType,
Some(columnTableRelationList))
+ if (dataType.typeName.startsWith("decimal")) {
+ val (precision, scale) =
CommonUtil.getScaleAndPrecision(dataType.catalogString)
+ (Field(column = actualColumnName,
+ dataType = Some(dataType.typeName),
+ name = Some(actualColumnName),
+ children = None,
+ precision = precision,
+ scale = scale,
+ rawSchema = rawSchema), dataMapField)
+ } else {
+ (Field(column = actualColumnName,
+ dataType = Some(dataType.typeName),
+ name = Some(actualColumnName),
+ children = None,
+ rawSchema = rawSchema), dataMapField)
+ }
+ }
+}
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
index 567d6a9..af4afb6 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
@@ -47,8 +47,6 @@ class MVCountAndCaseTestCase extends QueryTest with
BeforeAndAfterAll{
| FROM data_table
| GROUP BY STARTTIME,LAYER4ID""".stripMargin)
- sql("rebuild datamap data_table_mv")
-
var frame = sql(s"""SELECT MT.`3600` AS `3600`,
| MT.`2250410101` AS `2250410101`,
| count(1) over() as countNum,
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 5016bbe..4f5423e 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -180,7 +180,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test create datamap with simple and same projection with datamap
filter on non projection column and extra column filter") {
- sql("create datamap datamap9 using 'mv' as select empname, designation
from fact_table1 where deptname='cloud'")
+ sql("create datamap datamap9 using 'mv' as select empname,
designation,deptname from fact_table1 where deptname='cloud'")
val frame = sql("select empname,designation from fact_table1 where
deptname='cloud'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -189,7 +189,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test create datamap with simple and same projection with datamap
filter on non projection column and no column filter") {
- sql("create datamap datamap10 using 'mv' as select empname, designation
from fact_table1 where deptname='cloud'")
+ sql("create datamap datamap10 using 'mv' as select empname,
designation,deptname from fact_table1 where deptname='cloud'")
val frame = sql("select empname,designation from fact_table1")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap10"))
@@ -198,7 +198,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test create datamap with simple and same projection with datamap
filter on non projection column and different column filter") {
- sql("create datamap datamap11 using 'mv' as select empname, designation
from fact_table1 where deptname='cloud'")
+ sql("create datamap datamap11 using 'mv' as select empname,
designation,deptname from fact_table1 where deptname='cloud'")
val frame = sql("select empname,designation from fact_table1 where
designation='SA'")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap11"))
@@ -327,7 +327,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with simple join and filter on query") {
sql("drop datamap if exists datamap22")
- sql("create datamap datamap22 using 'mv' as select t1.empname,
t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname =
t2.empname)")
+ sql("create datamap datamap22 using 'mv' as select t1.empname,
t2.designation,t2.empname from fact_table1 t1 inner join fact_table2 t2 on
(t1.empname = t2.empname)")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2
where t1.empname = " +
"t2.empname and t1.empname='shivani'")
@@ -341,7 +341,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with simple join and filter on query and datamap")
{
sql("drop datamap if exists datamap23")
- sql("create datamap datamap23 using 'mv' as select t1.empname,
t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname =
t2.empname) where t1.empname='shivani'")
+ sql("create datamap datamap23 using 'mv' as select t1.empname,
t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on
(t1.empname = t2.empname) where t1.empname='shivani'")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2
where t1.empname = " +
"t2.empname and t1.empname='shivani'")
@@ -354,7 +354,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with simple join and filter on datamap and no
filter on query") {
sql("drop datamap if exists datamap24")
- sql("create datamap datamap24 using 'mv' as select t1.empname,
t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname =
t2.empname) where t1.empname='shivani'")
+ sql("create datamap datamap24 using 'mv' as select t1.empname,
t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on
(t1.empname = t2.empname) where t1.empname='shivani'")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2
where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -365,7 +365,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with multiple join") {
sql("drop datamap if exists datamap25")
- sql("create datamap datamap25 using 'mv' as select t1.empname as c1,
t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname =
t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)")
+ sql("create datamap datamap25 using 'mv' as select t1.empname as c1,
t2.designation, t2.empname, t3.empname from fact_table1 t1 inner join
fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on
(t1.empname=t3.empname)")
val frame = sql(
"select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2
t2 where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -379,7 +379,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
ignore("test create datamap with simple join on datamap and multi join on
query") {
- sql("create datamap datamap26 using 'mv' as select t1.empname,
t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname =
t2.empname)")
+ sql("create datamap datamap26 using 'mv' as select t1.empname,
t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on
(t1.empname = t2.empname)")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2
t2,fact_table3 " +
"t3 where t1.empname = t2.empname and t1.empname=t3.empname")
@@ -391,7 +391,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test create datamap with join with group by") {
- sql("create datamap datamap27 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2
t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap27 using 'mv' as select t1.empname ,
t2.designation, sum(t1.utilization), sum(t2.empname) from fact_table1 t1 inner
join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname,
t2.designation")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1
t1,fact_table2 t2 " +
"where t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -404,7 +404,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with join with group by and sub projection") {
sql("drop datamap if exists datamap28")
- sql("create datamap datamap28 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2
t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap28 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner
join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname,
t2.designation")
val frame = sql(
"select t2.designation, sum(t1.utilization) from fact_table1
t1,fact_table2 t2 where " +
"t1.empname = t2.empname group by t2.designation")
@@ -417,7 +417,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with join with group by and sub projection with
filter") {
sql("drop datamap if exists datamap29")
- sql("create datamap datamap29 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2
t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap29 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner
join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname,
t2.designation")
val frame = sql(
"select t2.designation, sum(t1.utilization) from fact_table1
t1,fact_table2 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by
t2.designation")
@@ -430,7 +430,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
ignore("test create datamap with join with group by with filter") {
sql("drop datamap if exists datamap30")
- sql("create datamap datamap30 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2
t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap30 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner
join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname,
t2.designation")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1
t1,fact_table2 t2 " +
"where t1.empname = t2.empname and t2.designation='SA' group by
t1.empname, t2.designation")
@@ -612,7 +612,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with left join on query and equi join on mv with
group by with filter") {
sql("drop datamap if exists datamap45")
- sql("create datamap datamap45 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2 on
t1.empname = t2.empname group by t1.empname, t2.designation")
+ sql("create datamap datamap45 using 'mv' as select t1.empname,
t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 join
fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
// During spark optimizer it converts the left outer join queries with
equi join if any filter present on right side table
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1
t1 left join fact_table2 t2 " +
@@ -649,7 +649,6 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("jira carbondata-2528-2") {
-
sql("drop datamap if exists MV_order")
sql("drop datamap if exists MV_desc_order")
sql("create datamap MV_order using 'mv' as select
empname,sum(salary)+sum(utilization) as total from fact_table1 group by
empname")
@@ -910,7 +909,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
val exception_tb_mv2: Exception = intercept[Exception] {
sql("create datamap dm_stream_test2 using 'mv' as select t1.empname as
c1, t2.designation, " +
- "t2.empname as c2 from (fact_table1 t1 inner join
fact_streaming_table2 t2 " +
+ "t2.empname as c2,t3.empname from (fact_table1 t1 inner join
fact_streaming_table2 t2 " +
"on (t1.empname = t2.empname)) inner join fact_table_parquet t3 " +
"on (t1.empname = t3.empname)")
}
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index bbd7b4c..2e64055 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -182,7 +182,6 @@ class MVIncrementalLoadingTestcase extends QueryTest with
BeforeAndAfterAll {
sql(s"rebuild datamap datamap1")
loadDataToFactTable("test_table")
sql(s"rebuild datamap datamap1")
- checkExistence(sql("show segments for table datamap1_table"), false, "0.1")
sql("alter datamap datamap1 compact 'major'")
val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
CarbonCommonConstants.DATABASE_DEFAULT_NAME,
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
index bfd621d..4e3eb10 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
@@ -48,8 +48,9 @@ class MVMultiJoinTestCase extends QueryTest with
BeforeAndAfterAll {
|inner join areas as c on c.pid=p.aid
|where p.title = 'hebei'
""".stripMargin
- sql("create datamap table_mv using 'mv' as " + mvSQL)
- sql("rebuild datamap table_mv")
+ sql("create datamap table_mv using 'mv' as " +
+ "select p.title,c.title,c.pid,p.aid from areas as p inner join areas
as c on " +
+ "c.pid=p.aid where p.title = 'hebei'")
val frame = sql(mvSQL)
assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan")))
@@ -70,8 +71,7 @@ class MVMultiJoinTestCase extends QueryTest with
BeforeAndAfterAll {
| left join dim_table dim_other on sdr.name = dim_other.name
| group by sdr.name,dim.age,dim_other.height
""".stripMargin
- sql("create datamap table_mv using 'mv' as " + mvSQL)
- sql("rebuild datamap table_mv")
+ sql("create datamap table_mv using 'mv' as " + "select
sdr.name,sum(sdr.score),dim.age,dim_other.height,count(dim.name) as c1,
count(dim_other.name) as c2 from sdr_table sdr left join dim_table dim on
sdr.name = dim.name left join dim_table dim_other on sdr.name = dim_other.name
group by sdr.name,dim.age,dim_other.height")
val frame = sql(mvSQL)
assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170)))
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
index 5788a23..b5d874a 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
@@ -110,7 +110,7 @@ class MVTpchTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with tpch3 with no filters on mv") {
sql(s"drop datamap if exists datamap5")
- sql("create datamap datamap5 using 'mv' as select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate,
o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where
c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey,
o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
+ sql("create datamap datamap5 using 'mv' as select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate,
o_shippriority,c_mktsegment,l_shipdate, c_custkey as c1, o_custkey as
c2,o_orderkey as o1 from customer, orders, lineitem where c_custkey =
o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate,
o_shippriority,c_mktsegment,l_shipdate,c_custkey,o_custkey, o_orderkey ")
val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount))
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate
limit 10")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -150,7 +150,7 @@ class MVTpchTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with tpch5 with no filters on mv") {
sql(s"drop datamap if exists datamap8")
- sql("create datamap datamap8 using 'mv' as select
n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue
from customer, orders, lineitem, supplier, nation, region where c_custkey =
o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey
= s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey
group by n_name,o_orderdate,r_name")
+ sql("create datamap datamap8 using 'mv' as select
n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue,
sum(c_custkey), sum(o_custkey), sum(l_orderkey),sum(o_orderkey),
sum(l_suppkey), sum(s_suppkey), sum(c_nationkey), sum(s_nationkey),
sum(n_nationkey), sum(n_regionkey), sum(r_regionkey) from customer, orders,
lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey =
o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey an [...]
val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as
revenue from customer, orders, lineitem, supplier, nation, region where
c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and
c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey =
r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and
o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap8"))
@@ -160,7 +160,7 @@ class MVTpchTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with tpch6") {
sql(s"drop datamap if exists datamap9")
- sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice *
l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and
l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and
l_quantity < 24")
+ sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice *
l_discount) as revenue, count(l_shipdate), sum(l_discount),sum(l_quantity)
from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate <
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
val df = sql("select sum(l_extendedprice * l_discount) as revenue from
lineitem where l_shipdate >= date('1994-01-01') and l_shipdate <
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -182,7 +182,7 @@ class MVTpchTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with tpch7 part of query1") {
sql(s"drop datamap if exists datamap11")
- sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name ,
l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey =
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+ sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name ,
l_extendedprice , l_discount, s_suppkey,l_suppkey, o_orderkey,l_orderkey,
c_custkey, o_custkey, s_nationkey, n1.n_nationkey, c_nationkey from
supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and
o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey =
n1.n_nationkey and c_nationkey = n1.n_nationkey")
val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 -
l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where
s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and
s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name
= 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between
date('1995-01-01') and date('1996-12-31')")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap11"))
@@ -192,7 +192,7 @@ class MVTpchTestCase extends QueryTest with
BeforeAndAfterAll {
test("test create datamap with tpch7 part of query2 (core issue)") {
sql(s"drop datamap if exists datamap12")
- sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate,
l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey =
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+ sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate,
l_extendedprice ,l_discount,s_suppkey, l_suppkey,o_orderkey,l_orderkey,
c_custkey,o_custkey,s_nationkey, n1.n_nationkey,c_nationkey from
supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and
o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey =
n1.n_nationkey and c_nationkey = n1.n_nationkey")
val df = sql("select supp_nation, l_year, sum(volume) as revenue from (
select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice *
(1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey =
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and
( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between
date('1995-01-01') and date('19 [...]
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap12"))
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
new file mode 100644
index 0000000..3978bd1
--- /dev/null
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -0,0 +1,255 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
+/**
+ * Test Class for MV Datamap to verify all scenerios
+ */
+class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
+
+ override def beforeEach(): Unit = {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop table IF EXISTS testtable")
+ sql("create table testtable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table testtable select 'abc',21,2000")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select
name,sum(price) " +
+ "from maintable group by name")
+ sql("rebuild datamap dm1")
+ checkResult()
+ }
+
+ private def checkResult(): Unit = {
+ checkAnswer(sql("select name,sum(price) from maintable group by name"),
+ sql("select name,sum(price) from maintable group by name"))
+ }
+
+ override def afterEach(): Unit = {
+ sql("drop table IF EXISTS maintable")
+ sql("drop table IF EXISTS testtable")
+ sql("drop table if exists par_table")
+ }
+
+ test("test alter add column on maintable") {
+ sql("alter table maintable add columns(d int)")
+ sql("insert into table maintable select 'abc',21,2000,30")
+ sql("rebuild datamap dm1")
+ checkResult()
+ }
+
+ test("test alter add column on datamaptable") {
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table add columns(d int)")
+ }.getMessage.contains("Cannot add columns in a DataMap table
default.dm1_table")
+ }
+
+ test("test drop column on maintable") {
+ // check drop column not present in datamap table
+ sql("alter table maintable drop columns(c_code)")
+ checkResult()
+ // check drop column present in datamap table
+ intercept[ProcessMetaDataException] {
+ sql("alter table maintable drop columns(name)")
+ }.getMessage.contains("Column name cannot be dropped because it exists in
mv datamap: dm1")
+ }
+
+ test("test alter drop column on datamaptable") {
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table drop columns(maintable_name)")
+ }.getMessage.contains("Cannot drop columns present in a datamap table
default.dm1_table")
+ }
+
+ test("test rename column on maintable") {
+ // check rename column not present in datamap table
+ sql("alter table maintable change c_code d_code int")
+ checkResult()
+ // check rename column present in mv datamap table
+ intercept[ProcessMetaDataException] {
+ sql("alter table maintable change name name1 string")
+ }.getMessage.contains("Column name exists in a MV datamap. Drop MV datamap
to continue")
+ }
+
+ test("test alter rename column on datamaptable") {
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table change sum_price sum_cost int")
+ }.getMessage.contains("Cannot change data type or rename column for
columns " +
+ "present in mv datamap table default.dm1_table")
+ }
+
+ test("test alter rename table") {
+ //check rename maintable
+ intercept[MalformedCarbonCommandException] {
+ sql("alter table maintable rename to maintable_rename")
+ }.getMessage.contains("alter rename is not supported for datamap table or
for tables which have child datamap")
+ //check rename datamaptable
+ intercept[MalformedCarbonCommandException] {
+ sql("alter table dm1_table rename to dm11_table")
+ }.getMessage.contains("alter rename is not supported for datamap table or
for tables which have child datamap")
+ }
+
+ test("test alter change datatype") {
+ //change datatype for column
+ intercept[ProcessMetaDataException] {
+ sql("alter table maintable change price price bigint")
+ }.getMessage.contains("Column price exists in a MV datamap. Drop MV
datamap to continue")
+ //change datatype for column not present in datamap table
+ sql("alter table maintable change c_code c_code bigint")
+ checkResult()
+ //change datatype for column present in datamap table
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table change sum_price sum_price bigint")
+ }.getMessage.contains("Cannot change data type or rename column for
columns " +
+ "present in mv datamap table default.dm1_table")
+ }
+
+ test("test dmproperties") {
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED
REBUILD dmproperties" +
+ "('LOCAL_DICTIONARY_ENABLE'='false') as select name,price from
maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Local
Dictionary Enabled false")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED
REBUILD dmproperties('TABLE_BLOCKSIZE'='256 MB') " +
+ "as select name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Table Block
Size 256 MB")
+ }
+
+ test("test table properties") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata' tblproperties('LOCAL_DICTIONARY_ENABLE'='false')")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select
name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Local
Dictionary Enabled false")
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata' tblproperties('TABLE_BLOCKSIZE'='256 MB')")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select
name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Table Block
Size 256 MB")
+ }
+
+ test("test delete segment by id on main table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("Delete from table maintable where segment.id in (0)")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select
name,sum(price) " +
+ "from maintable group by name")
+ sql("rebuild datamap dm1")
+ intercept[UnsupportedOperationException] {
+ sql("Delete from table maintable where segment.id in (1)")
+ }.getMessage.contains("Delete segment operation is not supported on tables
which have mv datamap")
+ intercept[UnsupportedOperationException] {
+ sql("Delete from table dm1_table where segment.id in (0)")
+ }.getMessage.contains("Delete segment operation is not supported on mv
table")
+ }
+
+ test("test delete segment by date on main table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("Delete from table maintable where segment.id in (0)")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select
name,sum(price) " +
+ "from maintable group by name")
+ sql("rebuild datamap dm1")
+ intercept[UnsupportedOperationException] {
+ sql("DELETE FROM TABLE maintable WHERE SEGMENT.STARTTIME BEFORE
'2017-06-01 12:05:06'")
+ }.getMessage.contains("Delete segment operation is not supported on tables
which have mv datamap")
+ intercept[UnsupportedOperationException] {
+ sql("DELETE FROM TABLE dm1_table WHERE SEGMENT.STARTTIME BEFORE
'2017-06-01 12:05:06'")
+ }.getMessage.contains("Delete segment operation is not supported on mv
table")
+ }
+
+ test("test partition table with mv") {
+ sql("drop table if exists par_table")
+ sql("CREATE TABLE par_table(id INT, name STRING, age INT) PARTITIONED
BY(city string) STORED BY 'carbondata'")
+ sql("insert into par_table values(1,'abc',3,'def')")
+ sql("drop datamap if exists p1")
+ sql("create datamap p1 using 'mv' WITH DEFERRED REBUILD as select city, id
from par_table")
+ sql("rebuild datamap p1")
+ intercept[MalformedCarbonCommandException] {
+ sql("alter table par_table drop partition (city='def')")
+ }.getMessage.contains("Drop Partition is not supported for datamap table
or for tables which have child datamap")
+ sql("drop datamap if exists p1")
+ }
+
+ test("test direct load to mv datamap table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name " +
+ "from maintable")
+ sql("rebuild datamap dm1")
+ intercept[UnsupportedOperationException] {
+ sql("insert into dm1_table select 2")
+ }.getMessage.contains("Cannot insert/load data directly into
pre-aggregate/child table")
+ sql("drop table IF EXISTS maintable")
+ }
+
+
+ test("test drop datamap with tablename") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm1 on table maintable")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select price "
+
+ "from maintable")
+ sql("rebuild datamap dm1")
+ checkAnswer(sql("select price from maintable"), Seq(Row(2000)))
+ checkExistence(sql("show datamap on table maintable"), true, "dm1")
+ sql("drop datamap dm1 on table maintable")
+ checkExistence(sql("show datamap on table maintable"), false, "dm1")
+ sql("drop table IF EXISTS maintable")
+ }
+
+ test("test mv with attribute having qualifier") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (product string) partitioned by (amount int)
stored by 'carbondata' ")
+ sql("insert into maintable values('Mobile',2000)")
+ sql("drop datamap if exists p")
+ sql("Create datamap p using 'mv' as Select p.product, p.amount from
maintable p where p.product = 'Mobile'")
+ sql("rebuild datamap p")
+ checkAnswer(sql("Select p.product, p.amount from maintable p where
p.product = 'Mobile'"), Seq(Row("Mobile", 2000)))
+ sql("drop table IF EXISTS maintable")
+ }
+
+ test("test mv with non-carbon table") {
+ sql("drop table if exists noncarbon")
+ sql("create table noncarbon (product string,amount int)")
+ sql("insert into noncarbon values('Mobile',2000)")
+ sql("drop datamap if exists p")
+ intercept[MalformedCarbonCommandException] {
+ sql("Create datamap p using 'mv' as Select product from noncarbon")
+ }.getMessage.contains("Non-Carbon table does not support creating MV
datamap")
+ sql("drop table if exists noncarbon")
+ }
+
+}
+
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
index 96f1816..07cd232 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
@@ -23,9 +23,9 @@ object TestSQLBatch {
val sampleTestCases = Seq(
("case_1",
s"""
- |SELECT i_item_id
+ |SELECT i_item_id, i_item_sk
|FROM Item
- |WHERE i_item_sk = 1
+ |WHERE i_item_sk = 2
""".stripMargin.trim,
s"""
|SELECT i_item_id, i_item_sk
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 49a62a2..7ba8300 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -211,7 +211,7 @@ class TestPreAggregateLoad extends SparkQueryTest with
BeforeAndAfterAll with Be
.stripMargin)
assert(intercept[RuntimeException] {
sql(s"insert into maintable_preagg_sum values(1, 30)")
- }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into
pre-aggregate table"))
+ }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into
pre-aggregate/child table"))
}
test("test whether all segments are loaded into pre-aggregate table if
segments are set on main table") {
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
index a7e425c..7fa2672 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
@@ -71,7 +71,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with
BeforeAndAfterAll wi
sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0',
60)")
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
// check value after inserting
checkAnswer(sql("SELECT * FROM maintable_agg1_minute"),
@@ -94,7 +94,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with
BeforeAndAfterAll wi
sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0',
60)")
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test timeseries unsupported 3: don't support insert") {
@@ -118,7 +118,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with
BeforeAndAfterAll wi
sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0',
'hello', 60)")
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test timeseries unsupported 4: don't support load") {
@@ -147,7 +147,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with
BeforeAndAfterAll wi
""".stripMargin)
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test timeseries unsupported 5: don't support update") {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d9dec68..3c6b265 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener,
MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache._
-import
org.apache.spark.sql.execution.command.mv.{AlterDataMaptableCompactionPostListener,
LoadPostDataMapListener}
+import org.apache.spark.sql.execution.command.mv._
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
@@ -195,6 +195,13 @@ object CarbonEnv {
.addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener)
+ .addListener(classOf[DeleteSegmentByIdPreEvent],
DataMapDeleteSegmentPreListener)
+ .addListener(classOf[DeleteSegmentByDatePreEvent],
DataMapDeleteSegmentPreListener)
+ .addListener(classOf[AlterTableDropColumnPreEvent],
DataMapDropColumnPreListener)
+ .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
+ DataMapChangeDataTypeorRenameColumnPreListener)
+ .addListener(classOf[AlterTableAddColumnPreEvent],
DataMapAddColumnsPreListener)
+
}
/**
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 0bafe04..b4e60fb 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -122,6 +122,15 @@ case class CarbonDropDataMapCommand(
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
+ } else if (mainTable != null) {
+ // If table is defined and datamap is MV datamap, then drop the
datamap
+ val dmSchema =
DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
+ .filter(dataMapSchema =>
dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
+ if (dmSchema.nonEmpty && (!dmSchema.head.isIndexDataMap &&
+ null !=
dmSchema.head.getRelationIdentifier)) {
+ dropDataMapFromSystemFolder(sparkSession)
+ return Seq.empty
+ }
}
// drop preaggregate datamap.
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index c142398..a2a9d3c 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker,
DataCommand}
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.util.DataMapUtil
import org.apache.carbondata.api.CarbonStore
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -71,7 +72,7 @@ case class CarbonCleanFilesCommand(
isInternalCleanCall = true)
}.toList
cleanFileCommands.foreach(_.processMetadata(sparkSession))
- } else if (CarbonTable.hasMVDataMap(carbonTable)) {
+ } else if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
index f94b73a..5b1d7e5 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -18,22 +18,32 @@
package org.apache.spark.sql.execution.command.mv
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.AlterTableModel
import
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import org.apache.spark.util.DataMapUtil
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
DataMapSchema}
import org.apache.carbondata.datamap.DataMapManager
-import org.apache.carbondata.events.{
- AlterTableCompactionPreStatusUpdateEvent,
- DeleteFromTablePostEvent, Event, OperationContext, OperationEventListener,
UpdateTablePostEvent
-}
+import org.apache.carbondata.events._
import
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
import org.apache.carbondata.processing.merger.CompactionType
+
+object DataMapListeners {
+ def getDataMapTableColumns(dataMapSchema: DataMapSchema,
+ carbonTable: CarbonTable): mutable.Buffer[String] = {
+ val listOfColumns: mutable.Buffer[String] = new
mutable.ArrayBuffer[String]()
+ listOfColumns.asJava
+
.addAll(dataMapSchema.getMainTableColumnList.get(carbonTable.getTableName))
+ listOfColumns
+ }
+}
+
/**
* Listener to trigger compaction on mv datamap after main table compaction
*/
@@ -139,3 +149,129 @@ object LoadPostDataMapListener extends
OperationEventListener {
}
}
}
+
+/**
+ * Listeners to block operations like delete segment on id or by date on tables
+ * having an mv datamap or on mv datamap tables
+ */
+object DataMapDeleteSegmentPreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit
= {
+ val carbonTable = event match {
+ case e: DeleteSegmentByIdPreEvent =>
+ e.asInstanceOf[DeleteSegmentByIdPreEvent].carbonTable
+ case e: DeleteSegmentByDatePreEvent =>
+ e.asInstanceOf[DeleteSegmentByDatePreEvent].carbonTable
+ }
+ if (null != carbonTable) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ throw new UnsupportedOperationException(
+ "Delete segment operation is not supported on tables having child
datamap")
+ }
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ "Delete segment operation is not supported on datamap table")
+ }
+ }
+ }
+}
+
+object DataMapAddColumnsPreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit
= {
+ val dataTypeChangePreListener =
event.asInstanceOf[AlterTableAddColumnPreEvent]
+ val carbonTable = dataTypeChangePreListener.carbonTable
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ s"Cannot add columns in a DataMap table " +
+ s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+ }
+ }
+}
+
+
+object DataMapDropColumnPreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit
= {
+ val dropColumnChangePreListener =
event.asInstanceOf[AlterTableDropColumnPreEvent]
+ val carbonTable = dropColumnChangePreListener.carbonTable
+ val alterTableDropColumnModel =
dropColumnChangePreListener.alterTableDropColumnModel
+ val columnsToBeDropped = alterTableDropColumnModel.columns
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ val dataMapSchemaList = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ for (dataMapSchema <- dataMapSchemaList) {
+ if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
+ val listOfColumns =
DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
+ val columnExistsInChild = listOfColumns.collectFirst {
+ case parentColumnName if
columnsToBeDropped.contains(parentColumnName) =>
+ parentColumnName
+ }
+ if (columnExistsInChild.isDefined) {
+ throw new UnsupportedOperationException(
+ s"Column ${ columnExistsInChild.head } cannot be dropped because
it exists " +
+ s"in " + dataMapSchema.getProviderName + " datamap:" +
+ s"${ dataMapSchema.getDataMapName }")
+ }
+ }
+ }
+ }
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ s"Cannot drop columns present in a datamap table ${
carbonTable.getDatabaseName }." +
+ s"${ carbonTable.getTableName }")
+ }
+ }
+}
+
+object DataMapChangeDataTypeorRenameColumnPreListener
+ extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit
= {
+ val colRenameDataTypeChangePreListener = event
+ .asInstanceOf[AlterTableColRenameAndDataTypeChangePreEvent]
+ val carbonTable = colRenameDataTypeChangePreListener.carbonTable
+ val alterTableDataTypeChangeModel = colRenameDataTypeChangePreListener
+ .alterTableDataTypeChangeModel
+ val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ val dataMapSchemaList = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ for (dataMapSchema <- dataMapSchemaList) {
+ if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
+ val listOfColumns =
DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
+ if (listOfColumns.contains(columnToBeAltered)) {
+ throw new UnsupportedOperationException(
+ s"Column $columnToBeAltered exists in a " +
dataMapSchema.getProviderName +
+ " datamap. Drop " + dataMapSchema.getProviderName + " datamap
to continue")
+ }
+ }
+ }
+ }
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ s"Cannot change data type or rename column for columns present in mv
datamap table " +
+ s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+ }
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index a9b581c..9119375 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -26,8 +26,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand,
AlterTableDropPartitionCommand, AtomicRunnableCommand}
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, DataMapUtil}
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -69,6 +70,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
setAuditTable(table)
setAuditInfo(Map("partition" -> specs.mkString(",")))
+ if (DataMapUtil.hasMVDataMap(table) || table.isChildTable) {
+ throw new MalformedCarbonCommandException(
+ "Drop Partition is not supported for datamap table or for tables which
have child datamap")
+ }
if (table.isHivePartitionTable) {
var locks = List.empty[ICarbonLock]
try {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 4a0b492..c4c3539 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil,
Segment}
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
@@ -717,9 +717,9 @@ object LoadPreAggregateTablePreListener extends
OperationEventListener {
val carbonLoadModel = loadEvent.getCarbonLoadModel
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
- if (table.isChildDataMap && !isInternalLoadCall) {
+ if ((table.isChildDataMap || table.isChildTable) && !isInternalLoadCall) {
throw new UnsupportedOperationException(
- "Cannot insert/load data directly into pre-aggregate table")
+ "Cannot insert/load data directly into pre-aggregate/child table")
}
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index b729347..86b2d00 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -29,7 +29,7 @@ import
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.util.PartitionUtils
+import org.apache.spark.util.{DataMapUtil, PartitionUtils}
import org.apache.carbondata.common.exceptions.MetadataProcessException
import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
@@ -90,104 +90,8 @@ case class PreAggregateTableHelper(
throw new MalformedDataMapCommandException(
"Parent table name is different in select and create")
}
- var neworder = Seq[String]()
- val parentOrder =
parentTable.getSortColumns(parentTable.getTableName).asScala
- parentOrder.foreach(parentcol =>
- fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
- parentcol.equals(fieldRelationMap(col).
- columnTableRelationList.get(0).parentColumnName))
- .map(cols => neworder :+= cols.column))
- tableProperties.put(CarbonCommonConstants.SORT_COLUMNS,
neworder.mkString(","))
- tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
- getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
- .LOAD_SORT_SCOPE_DEFAULT))
- tableProperties
- .put(CarbonCommonConstants.TABLE_BLOCKSIZE,
parentTable.getBlockSizeInMB.toString)
- tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
-
parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
- CarbonCommonConstants.FLAT_FOLDER,
CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
-
- // Datamap table name and columns are automatically added prefix with
parent table name
- // in carbon. For convenient, users can type column names same as the ones
in select statement
- // when config dmproperties, and here we update column names with prefix.
- // If longStringColumn is not present in dm properties then we take
long_string_columns from
- // the parent table.
- var longStringColumn =
tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
- if (longStringColumn.isEmpty) {
- val longStringColumnInParents =
parentTable.getTableInfo.getFactTable.getTableProperties
- .asScala
- .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS,
"").split(",").map(_.trim)
- val varcharDatamapFields =
scala.collection.mutable.ArrayBuffer.empty[String]
- fieldRelationMap foreach (fields => {
- val aggFunc = fields._2.aggregateFunction
- val relationList = fields._2.columnTableRelationList
- // check if columns present in datamap are long_string_col in parent
table. If they are
- // long_string_columns in parent, make them long_string_columns in
datamap
- if (aggFunc.isEmpty && relationList.size == 1 &&
longStringColumnInParents
- .contains(relationList.head.head.parentColumnName)) {
- varcharDatamapFields += relationList.head.head.parentColumnName
- }
- })
- if (!varcharDatamapFields.isEmpty) {
- longStringColumn = Option(varcharDatamapFields.mkString(","))
- }
- }
-
- if (longStringColumn != None) {
- val fieldNames = fields.map(_.column)
- val newLongStringColumn =
longStringColumn.get.split(",").map(_.trim).map{ colName =>
- val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
- if (!fieldNames.contains(newColName)) {
- throw new MalformedDataMapCommandException(
- CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" +
colName
- + " does not in datamap")
- }
- newColName
- }
- tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
- newLongStringColumn.mkString(","))
- }
-
- // inherit the local dictionary properties of main parent table
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
- parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
- parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
- CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
- val parentDictInclude =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
-
- val parentDictExclude =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
-
- val newDictInclude =
- parentDictInclude.flatMap(parentcol =>
- fields.collect {
- case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
- parentcol.equals(fieldRelationMap(col).
- columnTableRelationList.get.head.parentColumnName) =>
- col.column
- })
-
- val newDictExclude = parentDictExclude.flatMap(parentcol =>
- fields.collect {
- case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
- parentcol.equals(fieldRelationMap(col).
- columnTableRelationList.get.head.parentColumnName) =>
- col.column
- })
- if (newDictInclude.nonEmpty) {
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE,
newDictInclude.mkString(","))
- }
- if (newDictExclude.nonEmpty) {
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE,
newDictExclude.mkString(","))
- }
+ DataMapUtil
+ .inheritTablePropertiesFromMainTable(parentTable, fields,
fieldRelationMap, tableProperties)
val tableIdentifier =
TableIdentifier(parentTable.getTableName + "_" + dataMapName,
Some(parentTable.getDatabaseName))
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f41cfc1..6e3e398 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.execution.command.{AlterTableRenameModel,
MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, DataMapUtil}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -81,8 +81,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
throw new MalformedCarbonCommandException("alter rename is not supported
for index datamap")
}
// if table have create mv datamap, not support table rename
- if (CarbonTable.hasMVDataMap(oldCarbonTable)) {
- throw new MalformedCarbonCommandException("alter rename is not supported
for mv datamap")
+ if (DataMapUtil.hasMVDataMap(oldCarbonTable) ||
oldCarbonTable.isChildTable) {
+ throw new MalformedCarbonCommandException(
+ "alter rename is not supported for datamap table or for tables which
have child datamap")
}
var timeStamp = 0L
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 7d449b5..91d7675 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable
import org.apache.spark.sql.hive.{CarbonRelation,
CreateCarbonSourceTableAsSelectCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.StructField
-import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils,
SparkUtil}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -322,7 +322,7 @@ class DDLStrategy(sparkSession: SparkSession) extends
SparkStrategy {
throw new MalformedCarbonCommandException(
"Streaming property value is incorrect")
}
- if (CarbonTable.hasMVDataMap(carbonTable)) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
throw new MalformedCarbonCommandException(
"The table which has MV datamap does not support set streaming
property")
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index d875fdf..96b6000 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
@@ -30,12 +29,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import
org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex,
FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
-import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
@@ -71,7 +69,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession)
extends Rule[Logica
"Update operation is not supported for pre-aggregate table")
}
val indexSchemas =
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- if (CarbonTable.hasMVDataMap(carbonTable)) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null !=
dataMapSchema.getRelationIdentifier &&
@@ -214,7 +212,7 @@ case class CarbonIUDAnalysisRule(sparkSession:
SparkSession) extends Rule[Logica
"Delete operation is not supported for pre-aggregate table")
}
val indexSchemas =
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- if (CarbonTable.hasMVDataMap(carbonTable)) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null !=
dataMapSchema.getRelationIdentifier &&
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
new file mode 100644
index 0000000..24e28ad
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.execution.command.{DataMapField, Field}
+
+import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
DataMapSchema}
+
+/**
+ * Utility class for keeping all the utility methods common for pre-aggregate
and mv datamap
+ */
+object DataMapUtil {
+
+ def inheritTablePropertiesFromMainTable(parentTable: CarbonTable,
+ fields: Seq[Field],
+ fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field,
DataMapField],
+ tableProperties: mutable.Map[String, String]): Unit = {
+ var neworder = Seq[String]()
+ val parentOrder =
parentTable.getSortColumns(parentTable.getTableName).asScala
+ parentOrder.foreach(parentcol =>
+ fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+ fieldRelationMap(col).columnTableRelationList.size
== 1 &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get(0).parentColumnName))
+ .map(cols => neworder :+= cols.column))
+ tableProperties.put(CarbonCommonConstants.SORT_COLUMNS,
neworder.mkString(","))
+ tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+ getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
+ .LOAD_SORT_SCOPE_DEFAULT))
+ tableProperties
+ .put(CarbonCommonConstants.TABLE_BLOCKSIZE,
parentTable.getBlockSizeInMB.toString)
+ tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
+
parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
+ CarbonCommonConstants.FLAT_FOLDER,
CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
+
+ // Datamap table name and columns are automatically added prefix with
parent table name
+ // in carbon. For convenient, users can type column names same as the ones
in select statement
+ // when config dmproperties, and here we update column names with prefix.
+ // If longStringColumn is not present in dm properties then we take
long_string_columns from
+ // the parent table.
+ var longStringColumn =
tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+ if (longStringColumn.isEmpty) {
+ val longStringColumnInParents =
parentTable.getTableInfo.getFactTable.getTableProperties
+ .asScala
+ .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS,
"").split(",").map(_.trim)
+ val varcharDatamapFields =
scala.collection.mutable.ArrayBuffer.empty[String]
+ fieldRelationMap foreach (fields => {
+ val aggFunc = fields._2.aggregateFunction
+ val relationList = fields._2.columnTableRelationList
+ // check if columns present in datamap are long_string_col in parent
table. If they are
+ // long_string_columns in parent, make them long_string_columns in
datamap
+ if (aggFunc.isEmpty && relationList.size == 1 &&
longStringColumnInParents
+ .contains(relationList.head.head.parentColumnName)) {
+ varcharDatamapFields += relationList.head.head.parentColumnName
+ }
+ })
+ if (!varcharDatamapFields.isEmpty) {
+ longStringColumn = Option(varcharDatamapFields.mkString(","))
+ }
+ }
+
+ if (longStringColumn != None) {
+ val fieldNames = fields.map(_.column)
+ val newLongStringColumn =
longStringColumn.get.split(",").map(_.trim).map { colName =>
+ val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
+ if (!fieldNames.contains(newColName)) {
+ throw new MalformedDataMapCommandException(
+ CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" +
colName
+ + " does not in datamap")
+ }
+ newColName
+ }
+ tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
+ newLongStringColumn.mkString(","))
+ }
+
+ // inherit the local dictionary properties of main parent table
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
+ val parentDictInclude =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
+
+ val parentDictExclude =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
+
+ val newDictInclude =
+ parentDictInclude.flatMap(parentcol =>
+ fields.collect {
+ case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
+ fieldRelationMap(col).columnTableRelationList.size == 1
&&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get.head.parentColumnName) =>
+ col.column
+ })
+
+ val newDictExclude = parentDictExclude.flatMap(parentcol =>
+ fields.collect {
+ case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
+ fieldRelationMap(col).columnTableRelationList.size == 2 &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get.head.parentColumnName) =>
+ col.column
+ })
+ if (newDictInclude.nonEmpty) {
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE,
newDictInclude.mkString(","))
+ }
+ if (newDictExclude.nonEmpty) {
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE,
newDictExclude.mkString(","))
+ }
+ }
+
+ /**
+ * Return true if MV datamap present in the specified table
+ */
+ @throws[IOException]
+ def hasMVDataMap(carbonTable: CarbonTable): Boolean = {
+ val dataMapSchemaList = DataMapStoreManager.getInstance.
+ getDataMapSchemasOfTable(carbonTable).asScala
+ dataMapSchemaList.foreach { dataMapSchema =>
+ if (dataMapSchema.getProviderName.equalsIgnoreCase(MV.toString)) {
+ return true
+ }
+ }
+ false
+ }
+}