This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c31a809 [CARBONDATA-3715]Fix Timeseries Query Rollup failure for
timeseries column of Date type
c31a809 is described below
commit c31a809ba26b9f8cd7d84d50d220d750e3994c24
Author: Indhumathi27 <[email protected]>
AuthorDate: Fri Feb 21 11:29:32 2020 +0530
[CARBONDATA-3715]Fix Timeseries Query Rollup failure for timeseries column
of Date type
Why is this PR needed?
Issue 1:
Timeseries query with timeseries column as date,is throwing parsing
exception after rollup, because while forming sql for cast expression, it is
taking wrong attribute name
Issue 2:
to_date() function has case sensitive issues while rewriting the plan
What changes were proposed in this PR?
Issue 1:
If query is rolled up for date, then take attribute name for forming sql
for cast expression.
Issue 2:
Convert cast expression child to lower case during rewrite
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3630
---
.../carbondata/mv/extension/MVDataMapProvider.scala | 2 --
.../carbondata/mv/rewrite/DefaultMatchMaker.scala | 4 ++++
.../org/apache/carbondata/mv/rewrite/Utils.scala | 17 +++++++++++++++++
.../carbondata/mv/rewrite/TestAllOperationsOnMV.scala | 6 ++++++
.../mv/timeseries/TestMVTimeSeriesQueryRollUp.scala | 19 +++++++++++++++++++
.../apache/carbondata/mv/plans/util/Printers.scala | 9 +++++++++
6 files changed, 55 insertions(+), 2 deletions(-)
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
index 5b400b2..5d5da68 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
@@ -23,8 +23,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonUtils, SparkSession}
import
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
-import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
-import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.annotations.InterfaceAudience
import
org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 3646ae2..e72ce94 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -62,6 +62,8 @@ abstract class DefaultMatchPattern extends
MatchPattern[ModularPlan] {
a.child match {
case s: ScalaUDF if s.function.isInstanceOf[TimeSeriesFunction] =>
Utils.getTransformedTimeSeriesUDF(s) -> a.toAttribute
+ case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
+ Utils.getTransformedCastExpression(cast) -> a.toAttribute
case _ =>
a.child -> a.toAttribute
}
@@ -89,6 +91,8 @@ abstract class DefaultMatchPattern extends
MatchPattern[ModularPlan] {
val newExp = a transform {
case s: ScalaUDF if
s.function.isInstanceOf[TimeSeriesFunction] =>
Utils.getTransformedTimeSeriesUDF(s)
+ case cast: Cast if cast.child.isInstanceOf[AttributeReference]
=>
+ Utils.getTransformedCastExpression(cast)
}
attribute = aliasMapExp.get(newExp)
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
index 716afbc..fdbad3a 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -17,6 +17,7 @@
package org.apache.carbondata.mv.rewrite
+import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, AttributeReference, Cast, Divide, Expression, Literal, Multiply,
NamedExpression, PredicateHelper, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan,
Project}
@@ -591,6 +592,22 @@ object Utils extends PredicateHelper {
}
/**
+ * transform cast expression to change it's child attribute reference name
to lower case
+ */
+ def getTransformedCastExpression(cast: Cast): Expression = {
+ cast.transform {
+ case attr: AttributeReference =>
+ CarbonToSparkAdapter.createAttributeReference(
+ attr.name.toLowerCase,
+ attr.dataType,
+ attr.nullable,
+ attr.metadata,
+ attr.exprId,
+ attr.qualifier)
+ }
+ }
+
+ /**
* Check if expr1 and expr2 matches TimeSeriesUDF function. If both
expressions are
* timeseries udf functions, then check it's childrens are same irrespective
of case.
*/
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index a8c9af5..66b0c38 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -423,6 +423,12 @@ class TestAllOperationsOnMV extends QueryTest with
BeforeAndAfterEach {
sql("drop materialized view if exists dm ")
sql("create materialized view dm as select max(to_date(dob)) ,
min(to_date(dob)) from maintable where to_date(dob)='1975-06-11' or
to_date(dob)='1975-06-23'")
checkExistence(sql("select max(to_date(dob)) , min(to_date(dob)) from
maintable where to_date(dob)='1975-06-11' or to_date(dob)='1975-06-23'"), true,
"1975-06-11 1975-06-11")
+ sql("drop materialized view if exists dm2 ")
+ sql("create materialized view dm2 as select to_date(dob) from maintable
where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =120 or
DECIMAL_COLUMN1 = 4.34 or Double_COLUMN1 =12345 or INTEGER_COLUMN1 IS NULL")
+ checkExistence(sql("select to_date(DOB) from maintable where CUST_ID IS
NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =120 or DECIMAL_COLUMN1 = 4.34 or
Double_COLUMN1 =12345 or INTEGER_COLUMN1 IS NULL"), true, "1975-06-11")
+ val df = sql("select to_date(DOB) from maintable where CUST_ID IS NULL or
DOB IS NOT NULL or BIGINT_COLUMN1 =120 or DECIMAL_COLUMN1 = 4.34 or
Double_COLUMN1 =12345 or INTEGER_COLUMN1 IS NULL")
+ TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "dm2")
+ sql("drop table IF EXISTS maintable")
}
test("test preagg and mv") {
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
index 71c0286..5ea4ae3 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
@@ -261,6 +261,25 @@
sql("drop materialized view if exists datamap2")
}
+ test("test rollup for timeseries column of Date type") {
+ drop()
+ sql("CREATE TABLE maintable (empno int,empname string, projectcode int,
projectjoindate " +
+ "date,salary double) STORED AS CARBONDATA")
+ sql("insert into maintable select 11,'joey',2,'2016-02-23',300")
+ sql("insert into maintable select 13,'pheobe',1,'2016-02-23',450")
+ sql("insert into maintable select 22,'cathy',1,'2016-02-25',450.5")
+ sql("drop materialized view if exists datamap1")
+ val result = sql("select
timeseries(projectjoindate,'week'),sum(projectcode) from maintable group by
timeseries(projectjoindate,'week')")
+ sql("create materialized view datamap1 as select
timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by
timeseries(projectjoindate,'day')")
+ val dayDF= sql("select
timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by
timeseries(projectjoindate,'day')")
+ assert(TestUtil.verifyMVDataMap(dayDF.queryExecution.optimizedPlan,
"datamap1"))
+ val weekDF = sql("select
timeseries(projectjoindate,'week'),sum(projectcode) from maintable group by
timeseries(projectjoindate,'week')")
+ assert(TestUtil.verifyMVDataMap(weekDF.queryExecution.optimizedPlan,
"datamap1"))
+ checkAnswer(result, weekDF)
+ sql("drop materialized view if exists datamap1")
+ drop()
+ }
+
def drop(): Unit = {
sql("drop table if exists maintable")
}
diff --git
a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index 9112005..af8d184 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -362,6 +362,15 @@ trait Printers {
def formatExpressionsInUDF(exp: Seq[Expression]): String = {
val result = exp.map {
+ case cast: Cast =>
+ // for rolledUp queries of timeseries column with Date type, make
+ // Cast sql with attribute name
+ cast.child match {
+ case attr: AttributeReference if
attr.name.startsWith("gen_subsumer_") =>
+ s"CAST(${ attr.name } AS ${ cast.dataType.sql })"
+ case _ =>
+ cast.sql
+ }
case attr: AttributeReference =>
if (attr.name.startsWith("gen_subsumer_")) {
attr.name