This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 32f5b50 [CARBONDATA-3403]Fix MV is not working for like and filter
AND and OR queries
32f5b50 is described below
commit 32f5b505509731ea1f7ff0fde2c7e25aea4925b4
Author: akashrn5 <[email protected]>
AuthorDate: Tue May 28 11:55:13 2019 +0530
[CARBONDATA-3403]Fix MV is not working for like and filter AND and OR
queries
Problem:
MV table is not hit during query for like and filter AND and OR queries,
When we have like or filter queries, the queries will have literals which
will be case sensitive to fetch the data.
But dring MV modular plan generation, we register the schema for datamap
where we convert the complete datamap query to lower case, which will even
convert the literals.
So after modular plan generation of user query, during matching pahse of
modular plan of datamap and user query, the semantic equals fails for literals,
that is Attribute reference type.
Solution: Do not convert the query to lower case when registering schema,
that is when adding the preagg fun to query. So it will be handled for MV
For preaggregate, instead converting complete query to lowercase, convert
to lower case during ColumnTableRelation generation and createField for
preaggregate generation, so it will be handled for preaggregate.
This closes #3242
---
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 20 ++++++++++++++++++++
.../timeseries/TestTimeSeriesCreateTable.scala | 2 +-
.../command/preaaggregate/PreAggregateUtil.scala | 5 +++--
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 4 ++--
4 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 5e12ad3..25d2542 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -1022,6 +1022,25 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists all_table")
}
+ test(" test MV with like queries and filter queries") {
+ sql("drop table if exists mv_like")
+ sql(
+ "create table mv_like(name string, age int, address string, Country
string, id int) stored by 'carbondata'")
+ sql(
+ "create datamap mvlikedm1 using 'mv' as select name,address from mv_like
where Country NOT LIKE 'US' group by name,address")
+ sql(
+ "create datamap mvlikedm2 using 'mv' as select name,address,Country from
mv_like where Country = 'US' or Country = 'China' group by
name,address,Country")
+ sql("insert into mv_like select 'chandler', 32, 'newYork', 'US', 5")
+ val df1 = sql(
+ "select name,address from mv_like where Country NOT LIKE 'US' group by
name,address")
+ val analyzed1 = df1.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed1, "mvlikedm1"))
+ val df2 = sql(
+ "select name,address,Country from mv_like where Country = 'US' or
Country = 'China' group by name,address,Country")
+ val analyzed2 = df2.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed2, "mvlikedm2"))
+ }
+
def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean
= {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
@@ -1040,6 +1059,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table IF EXISTS fact_streaming_table2")
sql("drop table IF EXISTS fact_table_parquet")
sql("drop table if exists limit_fail")
+ sql("drop table IF EXISTS mv_like")
}
override def afterAll {
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index d68195c..eabe0f5 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -517,7 +517,7 @@ class TestTimeSeriesCreateTable extends QueryTest with
BeforeAndAfterAll with Be
|GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.contains("Table or view not found: maintableno"))
+ assert(e.getMessage.contains("Table or view not found: mainTableNo"))
}
test("test timeseries create table 33: support event_time and granularity
key with space") {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 0314dd8..db52361 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -196,7 +196,7 @@ object PreAggregateUtil {
parentDatabaseName: String,
carbonTable: CarbonTable) : ColumnTableRelation = {
val parentColumnId = carbonTable.getColumnByName(parentTableName,
parentColumnName).getColumnId
- val columnTableRelation = ColumnTableRelation(parentColumnName =
parentColumnName,
+ val columnTableRelation = ColumnTableRelation(parentColumnName =
parentColumnName.toLowerCase(),
parentColumnId = parentColumnId,
parentTableName = parentTableName,
parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
@@ -386,13 +386,14 @@ object PreAggregateUtil {
aggregateType: String = "",
parentTableName: String,
columnTableRelationList: Seq[ColumnTableRelation]): (Field,
DataMapField) = {
- val actualColumnName = if (aggregateType.equals("")) {
+ var actualColumnName = if (aggregateType.equals("")) {
parentTableName + '_' + columnName
} else {
parentTableName + '_' + columnName + '_' + aggregateType
}
val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
val dataMapField = DataMapField(aggregateType,
Some(columnTableRelationList))
+ actualColumnName = actualColumnName.toLowerCase()
if (dataType.typeName.startsWith("decimal")) {
val (precision, scale) =
CommonUtil.getScaleAndPrecision(dataType.catalogString)
(Field(column = actualColumnName,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 1c3b7cf..d2fdb08 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -691,7 +691,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
def addPreAggFunction(sql: String): String = {
- addPreAgg(new lexical.Scanner(sql.toLowerCase)) match {
+ addPreAgg(new lexical.Scanner(sql)) match {
case Success(query, _) => query
case _ =>
throw new MalformedCarbonCommandException(s"Unsupported query")
@@ -699,7 +699,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
def addPreAggLoadFunction(sql: String): String = {
- addPreAggLoad(new lexical.Scanner(sql.toLowerCase)) match {
+ addPreAggLoad(new lexical.Scanner(sql)) match {
case Success(query, _) => query
case _ =>
throw new MalformedCarbonCommandException(s"Unsupported query")