This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 581591a  [CARBONDATA-3433]Fix MV issues related to duplicate columns, 
limit and constant columns
581591a is described below

commit 581591a383d3bf7216c6978315ec1377ac790d8a
Author: akashrn5 <[email protected]>
AuthorDate: Thu Jun 13 13:42:19 2019 +0530

    [CARBONDATA-3433]Fix MV issues related to duplicate columns, limit and 
constant columns
    
    Problem:
    MV has below issues:
    when has duplicate columns in select query, MV creation fails, but select 
is valid query
    when used constant column in ctas for datamap creation, it fails
    when limit is used in ctas for datamap creation, it fails
    
    Solution:
    since duplicate columns in query is valid, MV should support, so when 
creating columns, better take distinct columns
    handle getting field relation map when we have constant column in query
    block MV creation for limit ctas query, as it is not a valid case to use MV 
datamap.
    
    This closes #3285
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 18 ++++++++++--
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  |  9 ++++--
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 33 ++++++++++++++--------
 .../mv/rewrite/MVExceptionTestCase.scala           |  9 +++++-
 4 files changed, 52 insertions(+), 17 deletions(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 57082d7..4d43088 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, 
SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, 
LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, PartitionerField, 
TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, 
CarbonDropTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -65,6 +65,13 @@ object MVHelper {
     val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val query = sparkSession.sql(updatedQuery)
     val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
+    // if there is limit in MV ctas query string, throw exception, as its not 
a valid usecase
+    logicalPlan match {
+      case Limit(_, _) =>
+        throw new MalformedCarbonCommandException("MV datamap does not support 
the query with " +
+                                                  "limit")
+      case _ =>
+    }
     val selectTables = getTables(logicalPlan)
     if (selectTables.isEmpty) {
       throw new MalformedCarbonCommandException(
@@ -72,6 +79,8 @@ object MVHelper {
     }
     val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
     val fullRebuild = isFullReload(logicalPlan)
+    // the ctas query can have duplicate columns, so we should take distinct 
and create fields,
+    // so that it won't fail during create mv table
     val fields = logicalPlan.output.map { attr =>
       if (attr.dataType.isInstanceOf[ArrayType] || 
attr.dataType.isInstanceOf[StructType] ||
           attr.dataType.isInstanceOf[MapType]) {
@@ -96,7 +105,8 @@ object MVHelper {
           children = None,
           rawSchema = rawSchema)
       }
-    }
+    }.distinct
+
     val tableProperties = mutable.Map[String, String]()
     val parentTables = new util.ArrayList[String]()
     val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
@@ -403,7 +413,9 @@ object MVHelper {
 
   def getAttributeMap(subsumer: Seq[NamedExpression],
       subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
-    if (subsumer.length == subsume.length) {
+    // when datamap is created with duplicate columns like select 
sum(age),sum(age) from table,
+    // the subsumee will have duplicate, so handle that case here
+    if (subsumer.length == subsume.groupBy(_.name).size) {
       subsume.zip(subsumer).flatMap { case (left, right) =>
         var tuples = left collect {
           case attr: AttributeReference =>
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index b267203..8cb2f1f 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -129,7 +129,12 @@ object MVUtil {
       val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new 
ArrayBuffer[ColumnTableRelation]()
       agg.collect {
         case Alias(attr: AggregateExpression, name) =>
-          if (attr.aggregateFunction.isInstanceOf[Count]) {
+          var isLiteralPresent = false
+          attr.aggregateFunction.collect {
+            case l@Literal(_, _) =>
+              isLiteralPresent = true
+          }
+          if (isLiteralPresent) {
             fieldToDataMapFieldMap +=
             getFieldToDataMapFields(name,
               attr.aggregateFunction.dataType,
@@ -137,7 +142,7 @@ object MVUtil {
               attr.aggregateFunction.nodeName,
               arrayBuffer,
               "")
-            aggregateType = "count"
+            aggregateType = attr.aggregateFunction.nodeName
           } else {
             aggregateType = attr.aggregateFunction.nodeName
           }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index a8abdc3..d136b27 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -19,8 +19,6 @@ package org.apache.carbondata.mv.rewrite
 import java.io.File
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -992,7 +990,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
 
     var frame = sql(querySQL)
     var analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
     assert(2 == frame.collect().size)
     frame.collect().foreach { each =>
       if (1 == each.get(0)) {
@@ -1008,7 +1006,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
 
     frame = sql(querySQL2)
     analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
     assert(1 == frame.collect().size)
     frame.collect().foreach { each =>
       if (2 == each.get(0)) {
@@ -1034,11 +1032,11 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val df1 = sql(
       "select name,address from mv_like where Country NOT LIKE 'US' group by 
name,address")
     val analyzed1 = df1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "mvlikedm1"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "mvlikedm1"))
     val df2 = sql(
       "select name,address,Country from mv_like where Country = 'US' or 
Country = 'China' group by name,address,Country")
     val analyzed2 = df2.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed2, "mvlikedm2"))
+    assert(TestUtil.verifyMVDataMap(analyzed2, "mvlikedm2"))
   }
 
   test("test distinct, count, sum on MV with single projection column") {
@@ -1074,11 +1072,24 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists mvtable1")
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
+  test("test mv with duplicate columns in query and constant column") {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, age int, add string) stored by 
'carbondata'")
+    sql("create datamap dupli_mv using 'mv' as select name, sum(age),sum(age) 
from maintable group by name")
+    sql("create datamap constant_mv using 'mv' as select name, sum(1) ex1 from 
maintable group by name")
+    sql("insert into maintable select 'pheobe',31,'NY'")
+    val df1 = sql("select sum(age),name from maintable group by name")
+    val df2 = sql("select sum(age),sum(age),name from maintable group by name")
+    val df3 = sql("select name, sum(1) ex1 from maintable group by name")
+    val df4 = sql("select sum(1) ex1 from maintable group by name")
+    val analyzed1 = df1.queryExecution.analyzed
+    val analyzed2 = df2.queryExecution.analyzed
+    val analyzed3 = df3.queryExecution.analyzed
+    val analyzed4 = df4.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed1, "dupli_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed2, "dupli_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed3, "constant_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv"))
   }
 
   def drop(): Unit = {
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
index 7823d46..b2e6376 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.mv.rewrite
 
-import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -42,6 +42,13 @@ class MVExceptionTestCase  extends QueryTest with 
BeforeAndAfterAll {
     assertResult("DataMap with name main_table_mv1 already exists in 
storage")(ex.getMessage)
   }
 
+  test("test mv creation with limit in query") {
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql("create datamap maintable_mv2 on table main_table using 'mv' as 
select sum(age),name from main_table group by name limit 10")
+    }
+    assertResult("MV datamap does not support the query with 
limit")(ex.getMessage)
+  }
+
   def drop(): Unit = {
     sql("drop table IF EXISTS main_table")
     sql("drop table if exists main_table_error")

Reply via email to