This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 88566e0 [CARBONDATA-4076] Fix MV having Subquery alias used in query
projection #4038
88566e0 is described below
commit 88566e076ad98c524009a58e03ecc0e134e27111
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu Dec 3 17:44:50 2020 +0530
[CARBONDATA-4076] Fix MV having Subquery alias used in query projection
#4038
Why is this PR needed?
If alias is used in subquery and that alias is used in main query
projection,
then the query is not hitting MV after creation.
For example:
Select a, sum(b) from (select c, d b from t1)
This is because, currently, we are not checking for subquery alias child
node
for matching the query.
What changes were proposed in this PR?
1. If the outputList contains Alias, then compare the alias child of
subsume/subsumer
2. In GroupbyNoselect, compare the alias.sql subsume/subsumer
3. Added logs for MV Rewrite and matching
This closes #4038
---
.../org/apache/carbondata/core/view/MVManager.java | 6 +-
.../org/apache/spark/sql/optimizer/MVMatcher.scala | 103 +++++++++----
.../org/apache/spark/sql/optimizer/MVRewrite.scala | 167 +++++++++++++--------
.../apache/spark/sql/optimizer/MVRewriteRule.scala | 19 ++-
.../carbondata/view/rewrite/MVCreateTestCase.scala | 34 +++++
5 files changed, 233 insertions(+), 96 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index bc02207..62f0583 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -177,7 +177,11 @@ public abstract class MVManager {
catalog.registerSchema(schema);
} catch (Exception e) {
// Ignore the schema
- LOGGER.error("Error while registering schema", e);
+ LOGGER.error("Error while registering schema for mv: " +
schema.getIdentifier()
+ .getTableName());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(e.getMessage());
+ }
}
}
this.catalog = catalog;
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
index d562b9d..83fb83e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.optimizer
+import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
@@ -25,6 +26,7 @@ import
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
import org.apache.spark.sql.types.{DataType, Metadata}
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable,
ModularPlan, _}
import org.apache.carbondata.mv.plans.modular
import org.apache.carbondata.mv.plans.modular.Flags._
@@ -229,6 +231,34 @@ private abstract class MVMatchPattern extends Logging {
}
/**
+ * Compares the output list of subsumer/subsumee with/without alias. In case
if, expression
+ * is instance of Alias, then compare it's child expression.
+ */
+ protected def compareOutputList(subsumerOutputList: Seq[NamedExpression],
+ subsumeeOutputList: Seq[NamedExpression]): Boolean = {
+ subsumerOutputList.forall {
+ case a@Alias(cast: Cast, _) =>
+ subsumeeOutputList.exists {
+ case Alias(castExp: Cast, _) =>
castExp.child.semanticEquals(cast.child)
+ case alias: Alias => alias.child.semanticEquals(cast.child)
+ case exp => exp.semanticEquals(cast.child)
+ } || isExpressionMatches(a, subsumeeOutputList)
+ case a@Alias(_, _) =>
+ subsumeeOutputList.exists {
+ case Alias(cast: Cast, _) => cast.child.semanticEquals(a.child)
+ case alias: Alias => alias.child.semanticEquals(a.child) ||
+ alias.sql.equalsIgnoreCase(a.sql)
+ case exp => exp.semanticEquals(a.child)
+ } || isExpressionMatches(a, subsumeeOutputList)
+ case ex@exp =>
+ subsumeeOutputList.exists {
+ case alias: Alias => alias.child.semanticEquals(exp)
+ case expr => expr.semanticEquals(exp)
+ } || isExpressionMatches(ex, subsumeeOutputList)
+ }
+ }
+
+ /**
* Check if expr1 and expr2 matches TimeSeriesUDF function. If both
expressions are
* timeseries udf functions, then check it's children are same irrespective
of case.
*/
@@ -592,6 +622,9 @@ private abstract class MVMatchPattern extends Logging {
*/
private object SelectSelectNoChildDelta extends MVMatchPattern with
PredicateHelper {
+
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
private def isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
@@ -647,6 +680,10 @@ private object SelectSelectNoChildDelta extends
MVMatchPattern with PredicateHel
) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+ LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the
plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }. ")
+
// assume children (including harmonized relation) of subsumer and
subsumee
// are 1-1 correspondence.
// Change the following two conditions to more complicated ones if we
want to
@@ -723,12 +760,8 @@ private object SelectSelectNoChildDelta extends
MVMatchPattern with PredicateHel
val isPredicateEmR = sel_1q.predicateList.forall(expr =>
sel_1a.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_1a.predicateList))
- val isOutputEmR = sel_1q.outputList.forall(expr =>
- sel_1a.outputList.exists(_.semanticEquals(expr)) ||
- isExpressionMatches(expr, sel_1a.outputList))
- val isOutputRmE = sel_1a.outputList.forall(expr =>
- sel_1q.outputList.exists(_.semanticEquals(expr)) ||
- isExpressionMatches(expr, sel_1q.outputList))
+ val isOutputEmR = compareOutputList(sel_1q.outputList,
sel_1a.outputList)
+ val isOutputRmE = compareOutputList(sel_1a.outputList,
sel_1q.outputList)
val isLOEmLOR = !(isLeftJoinView(sel_1a) &&
sel_1q.joinEdges.head.joinType == Inner)
if (r2eJoinsMatch) {
@@ -803,6 +836,9 @@ private object SelectSelectNoChildDelta extends
MVMatchPattern with PredicateHel
sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
+ LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the
plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }")
val isPredicateRmE = sel_3a.predicateList.isEmpty ||
sel_3a.predicateList.forall(expr =>
sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
@@ -856,6 +892,9 @@ private object SelectSelectNoChildDelta extends
MVMatchPattern with PredicateHel
}
private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
+
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
@@ -866,31 +905,16 @@ private object GroupbyGroupbyNoChildDelta extends
MVMatchPattern {
gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
None) =>
+ LOGGER.debug(s"Applying pattern: {GroupbyGroupbyNoChildDelta} for the
plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }")
val isGroupingEmR = gb_2q.predicateList.forall(expr =>
gb_2a.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, gb_2a.predicateList))
val isGroupingRmE = gb_2a.predicateList.forall(expr =>
gb_2q.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, gb_2q.predicateList))
- val isOutputEmR = gb_2q.outputList.forall {
- case Alias(cast: Cast, _) =>
- gb_2a.outputList.exists {
- case Alias(castExp: Cast, _) =>
castExp.child.semanticEquals(cast.child)
- case alias: Alias => alias.child.semanticEquals(cast.child)
- case exp => exp.semanticEquals(cast.child)
- }
- case a @ Alias(_, _) =>
- gb_2a.outputList.exists {
- case Alias(cast: Cast, _) => cast.child.semanticEquals(a.child)
- case alias: Alias => alias.child.semanticEquals(a.child)
- case exp => exp.semanticEquals(a.child)
- }
- case exp =>
- gb_2a.outputList.exists {
- case alias: Alias => alias.child.semanticEquals(exp)
- case expr => expr.semanticEquals(exp)
- }
- }
+ val isOutputEmR = compareOutputList(gb_2q.outputList, gb_2a.outputList)
if (isGroupingEmR && isGroupingRmE) {
if (isOutputEmR) {
// Mappings of output of two plans by checking semantic equals.
@@ -898,7 +922,8 @@ private object GroupbyGroupbyNoChildDelta extends
MVMatchPattern {
(exp, gb_2q.outputList.find {
case a: Alias if exp.isInstanceOf[Alias] =>
a.child.semanticEquals(exp.children.head) ||
- isExpressionMatches(a.child, exp.children.head)
+ isExpressionMatches(a.child, exp.children.head) ||
+ a.sql.equalsIgnoreCase(exp.sql)
case a: Alias => a.child.semanticEquals(exp)
case other => exp match {
case alias: Alias =>
@@ -971,6 +996,9 @@ private object GroupbyGroupbyNoChildDelta extends
MVMatchPattern {
private object GroupbyGroupbySelectOnlyChildDelta
extends MVMatchPattern with PredicateHelper {
+
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
private def isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
@@ -1083,6 +1111,11 @@ private object GroupbyGroupbySelectOnlyChildDelta
true)
if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
+ LOGGER.debug(s"Applying pattern: {GroupbyGroupbySelectOnlyChildDelta}
for the plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
+ s"Compensation: { ${ sel_1c1.toString().trim } }")
+
val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
val isGroupingEdR = gb_2q.predicateList.forall(expr =>
isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q,
gb_2a, compensation))
@@ -1169,6 +1202,9 @@ private object GroupbyGroupbySelectOnlyChildDelta
}
private object GroupbyGroupbyGroupbyChildDelta extends MVMatchPattern {
+
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
@@ -1181,6 +1217,9 @@ private object GroupbyGroupbyGroupbyChildDelta extends
MVMatchPattern {
Select(_, _, _, _, _, _, _, _, _, _),
Select(_, _, _, _, _, _, _, _, _, _),
true) =>
+ LOGGER.debug(s"Applying pattern: {GroupbyGroupbyGroupbyChildDelta} for
the plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }")
// TODO: implement me
Nil
@@ -1191,6 +1230,9 @@ private object GroupbyGroupbyGroupbyChildDelta extends
MVMatchPattern {
private object SelectSelectSelectChildDelta extends MVMatchPattern {
+
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
@@ -1206,6 +1248,9 @@ private object SelectSelectSelectChildDelta extends
MVMatchPattern {
modular.Select(_, _, _, _, _, _, _, _, _, _),
modular.Select(_, _, _, _, _, _, _, _, _, _),
true) =>
+ LOGGER.debug(s"Applying pattern: {SelectSelectSelectChildDelta} for
the plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }")
// TODO: implement me
Nil
case _ => Nil
@@ -1216,6 +1261,8 @@ private object SelectSelectSelectChildDelta extends
MVMatchPattern {
private object SelectSelectGroupbyChildDelta
extends MVMatchPattern with PredicateHelper {
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
private def isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
@@ -1422,6 +1469,10 @@ private object SelectSelectGroupbyChildDelta
Some([email protected](_, _, _, _, _, _, _, _)),
_ :: Nil,
_ :: Nil) =>
+ LOGGER.debug(s"Applying pattern: {SelectSelectGroupbyChildDelta} for
the plan: " +
+ s"{ ${ subsumee.toString().trim } }. " +
+ s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
+ s"Compensation: { ${ gb_2c.toString().trim } }")
val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode =>
tbl }
val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
index 7818149..e6d17ae 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks.{break, breakable}
+import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, AttributeReference, Expression, Literal, NamedExpression,
ScalaUDF, SortOrder}
@@ -33,6 +34,7 @@ import
org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum
import org.apache.carbondata.mv.expressions.modular.{ModularSubquery,
ScalarModularSubquery}
import org.apache.carbondata.mv.plans.modular.{ExpressionHelper, GroupBy,
HarmonizedRelation, LeafNode, Matchable, ModularPlan, ModularRelation, Select,
SimpleModularizer}
@@ -49,6 +51,8 @@ import org.apache.carbondata.view.{MVCatalogInSpark,
MVPlanWrapper, MVTimeGranul
class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
session: SparkSession) {
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
private def getAliasName(expression: NamedExpression): String = {
expression match {
case Alias(_, name) => name
@@ -226,6 +230,8 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan:
LogicalPlan,
*/
private def rewrite(modularPlan: ModularPlan): ModularPlan = {
if (modularPlan.find(_.rewritten).isDefined) {
+ LOGGER.debug(s"Getting updated plan for the rewritten modular plan: " +
+ s"{ ${ modularPlan.toString().trim } }")
var updatedPlan = modularPlan transform {
case select: Select =>
updatePlan(select)
@@ -397,6 +403,7 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan:
LogicalPlan,
if (plan.rewritten || !plan.isSPJGH) {
plan
} else {
+ LOGGER.info("Query matching has been initiated with available mv
schema's")
val rewrittenPlans =
for {schemaWrapper <- catalog.lookupFeasibleSchemas(plan).toStream
subsumer <- SimpleModularizer.modularize(
@@ -788,82 +795,110 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan:
LogicalPlan,
val plan = planWrapper.modularPlan.asInstanceOf[Select]
val updatedPlanOutputList = getUpdatedOutputList(plan.outputList,
groupBy.modularPlan)
val outputListMapping = groupBy.outputList zip updatedPlanOutputList
- val outputList = for ((output1, output2) <- outputListMapping) yield {
- output1 match {
- case Alias(aggregate@AggregateExpression(function@Sum(_), _, _,
_), _) =>
- val uFun = function.copy(child = output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(function@Max(_), _, _,
_), _) =>
- val uFun = function.copy(child = output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(function@Min(_), _, _,
_), _) =>
- val uFun = function.copy(child = output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@Count(Seq(_)), _, _,
_), _) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(agg@AggregateExpression(_@Corr(_, _), _, _, _), _) =>
- val uFun = Sum(output2)
- Alias(agg.copy(aggregateFunction = uFun), output1.name)(exprId =
output1.exprId)
- case Alias(aggregate@AggregateExpression(_@VariancePop(_), _, _,
_), _) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@VarianceSamp(_), _, _,
_), _) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@StddevSamp(_), _, _,
_), _) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@StddevPop(_), _, _, _),
_) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@CovPopulation(_, _), _,
_, _), _) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@CovSample(_, _), _, _,
_), _) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@Skewness(_), _, _, _),
_) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case Alias(aggregate@AggregateExpression(_@Kurtosis(_), _, _, _),
_) =>
- val uFun = Sum(output2)
- Alias(aggregate.copy(aggregateFunction = uFun),
output1.name)(exprId = output1.exprId)
- case _ =>
- if (output1.name != output2.name) {
- Alias(output2, output1.name)(exprId = output1.exprId)
- } else {
- output2
- }
- }
- }
- val updatedPredicates = groupBy.predicateList.map {
- predicate =>
- outputListMapping.find {
- case (output1, _) =>
- output1 match {
- case alias: Alias if predicate.isInstanceOf[Alias] =>
- alias.child.semanticEquals(predicate.children.head)
- case alias: Alias =>
- alias.child.semanticEquals(predicate)
- case other =>
- other.semanticEquals(predicate)
- }
- } match {
- case Some((_, output2)) => output2
- case _ => predicate
- }
- }
+ val (outputList: Seq[NamedExpression], updatedPredicates:
Seq[Expression]) =
+ getUpdatedOutputAndPredicateList(
+ groupBy,
+ outputListMapping)
groupBy.copy(
outputList = outputList,
inputList = plan.outputList,
predicateList = updatedPredicates,
child = plan,
modularPlan = None).setRewritten()
+ case groupBy: GroupBy if groupBy.predicateList.nonEmpty => groupBy.child
match {
+ case select: Select if select.modularPlan.isDefined =>
+ val planWrapper = select.modularPlan.get.asInstanceOf[MVPlanWrapper]
+ val plan = planWrapper.modularPlan.asInstanceOf[Select]
+ val updatedPlanOutputList = getUpdatedOutputList(plan.outputList,
select.modularPlan)
+ val outputListMapping = groupBy.outputList zip updatedPlanOutputList
+ val (outputList: Seq[NamedExpression], updatedPredicates:
Seq[Expression]) =
+ getUpdatedOutputAndPredicateList(
+ groupBy,
+ outputListMapping)
+ groupBy.copy(
+ outputList = outputList,
+ inputList = plan.outputList,
+ predicateList = updatedPredicates,
+ child = select,
+ modularPlan = None)
+ case _ => groupBy
+ }
case other => other
}
}
+ private def getUpdatedOutputAndPredicateList(groupBy: GroupBy,
+ outputListMapping: Seq[(NamedExpression, NamedExpression)]):
+ (Seq[NamedExpression], Seq[Expression]) = {
+ val outputList = for ((output1, output2) <- outputListMapping) yield {
+ output1 match {
+ case Alias(aggregate@AggregateExpression(function@Sum(_), _, _, _), _)
=>
+ val uFun = function.copy(child = output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(function@Max(_), _, _, _), _)
=>
+ val uFun = function.copy(child = output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(function@Min(_), _, _, _), _)
=>
+ val uFun = function.copy(child = output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@Count(Seq(_)), _, _, _), _)
=>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(agg@AggregateExpression(_@Corr(_, _), _, _, _), _) =>
+ val uFun = Sum(output2)
+ Alias(agg.copy(aggregateFunction = uFun), output1.name)(exprId =
output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@VariancePop(_), _, _, _),
_) =>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@VarianceSamp(_), _, _, _),
_) =>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@StddevSamp(_), _, _, _), _)
=>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@StddevPop(_), _, _, _), _)
=>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@CovPopulation(_, _), _, _,
_), _) =>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@CovSample(_, _), _, _, _),
_) =>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@Skewness(_), _, _, _), _) =>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case Alias(aggregate@AggregateExpression(_@Kurtosis(_), _, _, _), _) =>
+ val uFun = Sum(output2)
+ Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId
= output1.exprId)
+ case _ =>
+ if (output1.name != output2.name) {
+ Alias(output2, output1.name)(exprId = output1.exprId)
+ } else {
+ output2
+ }
+ }
+ }
+ val updatedPredicates = groupBy.predicateList.map {
+ predicate =>
+ outputListMapping.find {
+ case (output1, _) =>
+ output1 match {
+ case alias: Alias if predicate.isInstanceOf[Alias] =>
+ alias.child.semanticEquals(predicate.children.head)
+ case alias: Alias =>
+ alias.child.semanticEquals(predicate)
+ case other =>
+ other.semanticEquals(predicate)
+ }
+ } match {
+ case Some((_, output2)) => output2
+ case _ => predicate
+ }
+ }
+ (outputList, updatedPredicates)
+ }
+
/**
* Updates the flagspec of given select plan with attributes of relation
select plan
*/
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
index c288d40..10b793b 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.optimizer
import scala.collection.JavaConverters._
+import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias,
UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
@@ -29,7 +30,6 @@ import
org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties,
ThreadLocalSessionInfo}
-import org.apache.carbondata.core.view.{MVCatalog, MVCatalogFactory}
import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
import org.apache.carbondata.view.{MVCatalogInSpark, MVManagerInSpark,
MVSchemaWrapper}
import org.apache.carbondata.view.MVFunctions.DUMMY_FUNCTION
@@ -39,6 +39,8 @@ import org.apache.carbondata.view.MVFunctions.DUMMY_FUNCTION
*/
class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
+ val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
override def apply(logicalPlan: LogicalPlan): LogicalPlan = {
// only query need to check this rule
logicalPlan match {
@@ -51,7 +53,11 @@ class MVRewriteRule(session: SparkSession) extends
Rule[LogicalPlan] {
} catch {
case e =>
// if exception is thrown while rewriting the query, will fallback to
original query plan.
- MVRewriteRule.LOGGER.warn("Failed to rewrite plan with mv: " +
e.getMessage)
+ MVRewriteRule.LOGGER
+ .warn("Failed to rewrite plan with mv. Enable debug log to check the
Exception")
+ if (MVRewriteRule.LOGGER.isDebugEnabled) {
+ MVRewriteRule.LOGGER.debug(e.getMessage)
+ }
logicalPlan
}
}
@@ -88,6 +94,9 @@ class MVRewriteRule(session: SparkSession) extends
Rule[LogicalPlan] {
canApply = false
}
Aggregate(groupBy, aggregations, child)
+ case localRelation@LocalRelation(_, _, _) =>
+ canApply = false
+ localRelation
}
if (!canApply) {
return logicalPlan
@@ -97,6 +106,8 @@ class MVRewriteRule(session: SparkSession) extends
Rule[LogicalPlan] {
}
val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
if (viewCatalog != null && hasSuitableMV(logicalPlan, viewCatalog)) {
+ LOGGER.debug(s"Query Rewrite has been initiated for the plan: " +
+ s"${ logicalPlan.toString().trim }")
val viewRewrite = new MVRewrite(viewCatalog, logicalPlan, session)
val rewrittenPlan = viewRewrite.rewrittenPlan
if (rewrittenPlan.find(_.rewritten).isDefined) {
@@ -131,7 +142,7 @@ class MVRewriteRule(session: SparkSession) extends
Rule[LogicalPlan] {
*/
private def rewriteFunctionWithQualifierName(modularPlan: ModularPlan):
String = {
val compactSQL = modularPlan.asCompactSQL
- modularPlan match {
+ val finalCompactSQL = modularPlan match {
case select: Select =>
var outputColumn = ""
select.outputList.collect {
@@ -154,6 +165,8 @@ class MVRewriteRule(session: SparkSession) extends
Rule[LogicalPlan] {
case _ =>
compactSQL
}
+ LOGGER.debug(s"Rewritten Query: { ${finalCompactSQL.trim} }")
+ finalCompactSQL
}
/**
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
index 891e36d..a4aa1c5 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
@@ -414,6 +414,40 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"drop materialized view mv32")
}
+ test("test create materialized view having sub-query alias used in
projection") {
+ sql("drop materialized view if exists mv_sub")
+ val subQuery = "select empname, sum(result) sum_ut from " +
+ "(select empname, utilization result from fact_table1)
fact_table1 " +
+ "group by empname"
+ sql("create materialized view mv_sub as " + subQuery)
+ val frame = sql(subQuery)
+ assert(TestUtil.verifyMVHit(frame.queryExecution.optimizedPlan, "mv_sub"))
+ checkAnswer(frame,
+ sql("select empname, sum(result) ut from (select empname, utilization
result from " +
+ "fact_table2) fact_table2 group by empname"))
+ sql(s"drop materialized view mv_sub")
+ }
+
+ test("test create materialized view used as sub-query in actual query") {
+ sql("drop materialized view if exists mv_sub")
+ sql("create materialized view mv_sub as select empname, utilization result
from fact_table1")
+ val df1 = sql("select empname, sum(result) sum_ut from " +
+ "(select empname, utilization result from fact_table1)
fact_table1 " +
+ "group by empname")
+ val df2 = sql("select emp, sum(result) sum_ut from " +
+ "(select empname emp, utilization result from fact_table1)
fact_table1 " +
+ "group by emp")
+ assert(TestUtil.verifyMVHit(df1.queryExecution.optimizedPlan, "mv_sub"))
+ assert(TestUtil.verifyMVHit(df2.queryExecution.optimizedPlan, "mv_sub"))
+ checkAnswer(df1,
+ sql("select empname, sum(result) ut from (select empname, utilization
result from " +
+ "fact_table2) fact_table2 group by empname"))
+ checkAnswer(df2,
+ sql("select emp, sum(result) ut from (select empname emp, utilization
result from " +
+ "fact_table2) fact_table2 group by emp"))
+ sql(s"drop materialized view mv_sub")
+ }
+
test("test create materialized view with simple and sub group by query with
filter on materialized view") {
sql("create materialized view mv15 as select empname, sum(utilization)
from fact_table1 where empname='shivani' group by empname")
val frame = sql(