This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d17de6  [CARBONDATA-4147] Fix re-arrange schema in logical relation 
on MV partition table having sort column
8d17de6 is described below

commit 8d17de64513a24bc856210b58d303edc19ab70ce
Author: Indhumathi27 <[email protected]>
AuthorDate: Fri Mar 12 16:06:27 2021 +0530

    [CARBONDATA-4147] Fix re-arrange schema in logical relation on MV partition 
table having sort column
    
    Why is this PR needed?
    After PR-3615, we have avoided rearranging catalog table schema if already 
re-arranged.
    For MV on a partition table, we always move the partition column to the end 
on a MV partition table.
    Catalog table will also have the column schema in same order(partition 
column at last). Hence, in
    this case, we do not re-arrange logical relation in a catalog table again.
    
    But, if there is a sort column present in MV table, then selected column 
schema and catalog table
    schema will not be in same order. In that case, we have to re-arrange the 
catalog table schema.
    Currently, we are using rearrangedIndex to re-arrange the catalog table 
logical relation, but
    rearrangedIndex will keep the index of partition column at the end, 
whereas, catalog table has
    partition column already at the end. Hence, we are re-arranging the 
partition column index
    again in catalog table relation, which leads to insertion failure.
    
    Example:
    Create MV on columns: c1, c2 (partition), c3(sort_column), c4
    Problem:
    Create order: c1,c2,c3,c4
    Create order index: 0,1,2,3
    
    Rearranged Index:
    Existing Catalog table schema order: c1, c3, c4, c2 (for MV, partition 
column will be moved to Last)
    Rearrange index: 2,0,3,1
    After Re-arrange catalog table order: c4,c2,c2, c3(which is wrong)
    
    Solution:
    Change MV create order as below
    New Create order: c1,c4,c3,c2
    Create order index: 0,1,2,3
    
    Rearranged Index:
    Existing Catalog table schema order: c1, c3, c4, c2 (for MV, partition 
column will be moved to Last)
    Rearrange index: 1,0,2,3
    After Re-arrange catalog table order: c3,c1,c4,c2
    
    What changes were proposed in this PR?
    In MV case, if there is any column schema order change apart from partition 
column, then re-arrange
    index of only those columns and use the same to re-arrange catalog table 
logical relation.
    
    This closes #4106
---
 .../management/CarbonInsertIntoCommand.scala       | 40 +++++++++++++++++++---
 .../view/rewrite/TestPartitionWithMV.scala         | 16 +++++++++
 2 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index bd643ed..f59802c 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -163,7 +163,9 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
         null
       }
     val convertedStaticPartition = 
getConvertedStaticPartitionMap(partitionColumnSchema)
-    val (reArrangedIndex, selectedColumnSchema) = 
getReArrangedIndexAndSelectedSchema(tableInfo,
+    val (reArrangedIndex, reArrangedMVIndex, selectedColumnSchema) =
+      getReArrangedIndexAndSelectedSchema(
+      tableInfo,
       partitionColumnSchema,
       carbonLoadModel)
     val newLogicalPlan = getReArrangedLogicalPlan(
@@ -181,7 +183,19 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
       if (isNotReArranged) {
         // Re-arrange the catalog table schema and output for partition 
relation
         logicalPartitionRelation =
-          getReArrangedSchemaLogicalRelation(reArrangedIndex, 
logicalPartitionRelation)
+          if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV) {
+            // Re-arrange non-partition columns in the catalog table schema 
based on rearranged
+            // mv index order. Example: MV columns: 
c1,c2(partition_column),c3(sort_column),c4.
+            // Based on this order, rearranged index will be like (2,0,3,1). 
Catalog table schema
+            // order will be (c1,c3,c4,c2) where the partition column will be 
always at last. If we
+            // rearrange the logical relation based on above order, catalog 
table schema will be
+            // changed to (c4,c1,c2,c3), which will be wrong. Hence, Reorder 
MV create column
+            // order to (c1,c3,c4,c2) and use rearranged mv index (1,0,2,3) to 
rearrange
+            // logical relation schema.
+            getReArrangedSchemaLogicalRelation(reArrangedMVIndex, 
logicalPartitionRelation)
+          } else {
+            getReArrangedSchemaLogicalRelation(reArrangedIndex, 
logicalPartitionRelation)
+          }
       }
     }
     var isUpdateTableStatusRequired = false
@@ -488,10 +502,12 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
   def getReArrangedIndexAndSelectedSchema(
       tableInfo: TableInfo,
       partitionColumnSchema: mutable.Buffer[ColumnSchema],
-      carbonLoadModel: CarbonLoadModel): (Seq[Int], Seq[ColumnSchema]) = {
+      carbonLoadModel: CarbonLoadModel): (Seq[Int], Seq[Int], 
Seq[ColumnSchema]) = {
     var reArrangedIndex: Seq[Int] = Seq()
+    var reArrangedMVIndex: Seq[Int] = Seq()
     var selectedColumnSchema: Seq[ColumnSchema] = Seq()
     var partitionIndex: Seq[Int] = Seq()
+    var partitionMVIndex: Seq[Int] = Seq()
     val properties = tableInfo.getFactTable.getTableProperties.asScala
     val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     // internal order ColumnSchema (non-flat structure)
@@ -503,7 +519,10 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
       null
     }
     var createOrderColumns = table.getCreateOrderColumn.asScala
+    var createOrderMVColumns = table.getCreateOrderColumn.asScala
     val createOrderMap = mutable.Map[String, Int]()
+    val createOrderMVTableMap = mutable.Map[String, Int]()
+    val isMV = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV
     if (partitionColumnNames != null && 
isAlteredSchema(tableInfo.getFactTable)) {
       // For alter table drop/add column scenarios, partition column may not 
be in the end.
       // Need to keep it in the end.
@@ -511,10 +530,20 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
         partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++
                            createOrderColumns.filter(col =>
                              
partitionColumnNames.contains(col.getColumnSchema.getColumnName))
+    } else if (partitionColumnNames != null && isMV) {
+      // For mv as partition table scenarios, partition column may not be in 
the end.
+      // Need to keep it in the end.
+      createOrderMVColumns = createOrderMVColumns.filterNot(col =>
+        partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++
+                             createOrderMVColumns.filter(col =>
+                               
partitionColumnNames.contains(col.getColumnSchema.getColumnName))
     }
     createOrderColumns.zipWithIndex.map {
       case (col, index) => createOrderMap.put(col.getColName, index)
     }
+    createOrderMVColumns.zipWithIndex.map {
+      case (col, index) => createOrderMVTableMap.put(col.getColName, index)
+    }
     columnSchema.foreach {
       col =>
         if (spatialProperty.isDefined &&
@@ -525,9 +554,11 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
         if (partitionColumnNames != null &&
             partitionColumnNames.contains(col.getColumnName)) {
           partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
+          partitionMVIndex = partitionMVIndex :+ 
createOrderMVTableMap(col.getColumnName)
           skipPartitionColumn = true
         } else {
           reArrangedIndex = reArrangedIndex :+ 
createOrderMap(col.getColumnName)
+          reArrangedMVIndex = reArrangedMVIndex :+ 
createOrderMVTableMap(col.getColumnName)
         }
         if (!skipPartitionColumn) {
           selectedColumnSchema = selectedColumnSchema :+ col
@@ -540,8 +571,9 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     if (partitionIndex.nonEmpty) {
       // keep partition columns in the end and in the original create order
       reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+      reArrangedMVIndex = reArrangedMVIndex ++ partitionMVIndex.sortBy(x => x)
     }
-    (reArrangedIndex, selectedColumnSchema)
+    (reArrangedIndex, reArrangedMVIndex, selectedColumnSchema)
   }
 
 }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
index 229d6df..e5b6542 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
@@ -750,6 +750,22 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll with BeforeAn
     sql("drop table if exists partitionone")
   }
 
+  test("test partition on MV with sort column") {
+    sql("drop table if exists partitionone")
+    sql("create table if not exists partitionone (ts timestamp, " +
+        "metric STRING, tags_id STRING, value DOUBLE) partitioned by (ts1 
timestamp,ts2 timestamp) stored as carbondata TBLPROPERTIES 
('SORT_COLUMNS'='metric,ts2')")
+    sql("insert into partitionone values ('2020-09-25 
05:38:00','abc','xyz-e01',392.235,'2020-09-25 05:30:00','2020-09-28 05:38:00')")
+    val mvQuery = "select tags_id ,metric ,ts1, ts2, 
timeseries(ts,'thirty_minute') as ts,sum(value),avg(value)," +
+                  "min(value),max(value) from partitionone group by metric, 
tags_id, timeseries(ts,'thirty_minute') ,ts1, ts2"
+    val result = sql(mvQuery)
+    sql("drop materialized view if exists dm1")
+    sql(s"create materialized view dm1  as $mvQuery")
+    val df = sql(mvQuery)
+    assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "dm1"))
+    checkAnswer(result, df)
+    sql("drop table if exists partitionone")
+  }
+
   test("test partition on timeseries column") {
     sql("drop table if exists partitionone")
     sql("create table partitionone(a int,b int) partitioned by (c timestamp,d 
timestamp) STORED AS carbondata")

Reply via email to