This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8d17de6 [CARBONDATA-4147] Fix re-arrange schema in logical relation
on MV partition table having sort column
8d17de6 is described below
commit 8d17de64513a24bc856210b58d303edc19ab70ce
Author: Indhumathi27 <[email protected]>
AuthorDate: Fri Mar 12 16:06:27 2021 +0530
[CARBONDATA-4147] Fix re-arrange schema in logical relation on MV partition
table having sort column
Why is this PR needed?
After PR-3615, we have avoided rearranging catalog table schema if already
re-arranged.
For MV on a partition table, we always move the partition column to the end
on a MV partition table.
Catalog table will also have the column schema in same order(partition
column at last). Hence, in
this case, we do not re-arrange logical relation in a catalog table again.
But, if there is a sort column present in MV table, then selected column
schema and catalog table
schema will not be in same order. In that case, we have to re-arrange the
catalog table schema.
Currently, we are using rearrangedIndex to re-arrange the catalog table
logical relation, but
rearrangedIndex will keep the index of partition column at the end,
whereas, catalog table has
partition column already at the end. Hence, we are re-arranging the
partition column index
again in catalog table relation, which leads to insertion failure.
Example:
Create MV on columns: c1, c2 (partition), c3(sort_column), c4
Problem:
Create order: c1,c2,c3,c4
Create order index: 0,1,2,3
Rearranged Index:
Existing Catalog table schema order: c1, c3, c4, c2 (for MV, partition
column will be moved to Last)
Rearrange index: 2,0,3,1
After Re-arrange catalog table order: c4,c2,c2, c3(which is wrong)
Solution:
Change MV create order as below
New Create order: c1,c4,c3,c2
Create order index: 0,1,2,3
Rearranged Index:
Existing Catalog table schema order: c1, c3, c4, c2 (for MV, partition
column will be moved to Last)
Rearrange index: 1,0,2,3
After Re-arrange catalog table order: c3,c1,c4,c2
What changes were proposed in this PR?
In MV case, if there is any column schema order change apart from partition
column, then re-arrange
index of only those columns and use the same to re-arrange catalog table
logical relation.
This closes #4106
---
.../management/CarbonInsertIntoCommand.scala | 40 +++++++++++++++++++---
.../view/rewrite/TestPartitionWithMV.scala | 16 +++++++++
2 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index bd643ed..f59802c 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -163,7 +163,9 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
null
}
val convertedStaticPartition =
getConvertedStaticPartitionMap(partitionColumnSchema)
- val (reArrangedIndex, selectedColumnSchema) =
getReArrangedIndexAndSelectedSchema(tableInfo,
+ val (reArrangedIndex, reArrangedMVIndex, selectedColumnSchema) =
+ getReArrangedIndexAndSelectedSchema(
+ tableInfo,
partitionColumnSchema,
carbonLoadModel)
val newLogicalPlan = getReArrangedLogicalPlan(
@@ -181,7 +183,19 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
if (isNotReArranged) {
// Re-arrange the catalog table schema and output for partition
relation
logicalPartitionRelation =
- getReArrangedSchemaLogicalRelation(reArrangedIndex,
logicalPartitionRelation)
+ if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV) {
+ // Re-arrange non-partition columns in the catalog table schema
based on rearranged
+ // mv index order. Example: MV columns:
c1,c2(partition_column),c3(sort_column),c4.
+ // Based on this order, rearranged index will be like (2,0,3,1).
Catalog table schema
+ // order will be (c1,c3,c4,c2) where the partition column will be
always at last. If we
+ // rearrange the logical relation based on above order, catalog
table schema will be
+ // changed to (c4,c1,c2,c3), which will be wrong. Hence, Reorder
MV create column
+ // order to (c1,c3,c4,c2) and use rearranged mv index (1,0,2,3) to
rearrange
+ // logical relation schema.
+ getReArrangedSchemaLogicalRelation(reArrangedMVIndex,
logicalPartitionRelation)
+ } else {
+ getReArrangedSchemaLogicalRelation(reArrangedIndex,
logicalPartitionRelation)
+ }
}
}
var isUpdateTableStatusRequired = false
@@ -488,10 +502,12 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
def getReArrangedIndexAndSelectedSchema(
tableInfo: TableInfo,
partitionColumnSchema: mutable.Buffer[ColumnSchema],
- carbonLoadModel: CarbonLoadModel): (Seq[Int], Seq[ColumnSchema]) = {
+ carbonLoadModel: CarbonLoadModel): (Seq[Int], Seq[Int],
Seq[ColumnSchema]) = {
var reArrangedIndex: Seq[Int] = Seq()
+ var reArrangedMVIndex: Seq[Int] = Seq()
var selectedColumnSchema: Seq[ColumnSchema] = Seq()
var partitionIndex: Seq[Int] = Seq()
+ var partitionMVIndex: Seq[Int] = Seq()
val properties = tableInfo.getFactTable.getTableProperties.asScala
val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
// internal order ColumnSchema (non-flat structure)
@@ -503,7 +519,10 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
null
}
var createOrderColumns = table.getCreateOrderColumn.asScala
+ var createOrderMVColumns = table.getCreateOrderColumn.asScala
val createOrderMap = mutable.Map[String, Int]()
+ val createOrderMVTableMap = mutable.Map[String, Int]()
+ val isMV = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV
if (partitionColumnNames != null &&
isAlteredSchema(tableInfo.getFactTable)) {
// For alter table drop/add column scenarios, partition column may not
be in the end.
// Need to keep it in the end.
@@ -511,10 +530,20 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++
createOrderColumns.filter(col =>
partitionColumnNames.contains(col.getColumnSchema.getColumnName))
+ } else if (partitionColumnNames != null && isMV) {
+ // For mv as partition table scenarios, partition column may not be in
the end.
+ // Need to keep it in the end.
+ createOrderMVColumns = createOrderMVColumns.filterNot(col =>
+ partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++
+ createOrderMVColumns.filter(col =>
+
partitionColumnNames.contains(col.getColumnSchema.getColumnName))
}
createOrderColumns.zipWithIndex.map {
case (col, index) => createOrderMap.put(col.getColName, index)
}
+ createOrderMVColumns.zipWithIndex.map {
+ case (col, index) => createOrderMVTableMap.put(col.getColName, index)
+ }
columnSchema.foreach {
col =>
if (spatialProperty.isDefined &&
@@ -525,9 +554,11 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
if (partitionColumnNames != null &&
partitionColumnNames.contains(col.getColumnName)) {
partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
+ partitionMVIndex = partitionMVIndex :+
createOrderMVTableMap(col.getColumnName)
skipPartitionColumn = true
} else {
reArrangedIndex = reArrangedIndex :+
createOrderMap(col.getColumnName)
+ reArrangedMVIndex = reArrangedMVIndex :+
createOrderMVTableMap(col.getColumnName)
}
if (!skipPartitionColumn) {
selectedColumnSchema = selectedColumnSchema :+ col
@@ -540,8 +571,9 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
if (partitionIndex.nonEmpty) {
// keep partition columns in the end and in the original create order
reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+ reArrangedMVIndex = reArrangedMVIndex ++ partitionMVIndex.sortBy(x => x)
}
- (reArrangedIndex, selectedColumnSchema)
+ (reArrangedIndex, reArrangedMVIndex, selectedColumnSchema)
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
index 229d6df..e5b6542 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
@@ -750,6 +750,22 @@ class TestPartitionWithMV extends QueryTest with
BeforeAndAfterAll with BeforeAn
sql("drop table if exists partitionone")
}
+ test("test partition on MV with sort column") {
+ sql("drop table if exists partitionone")
+ sql("create table if not exists partitionone (ts timestamp, " +
+ "metric STRING, tags_id STRING, value DOUBLE) partitioned by (ts1
timestamp,ts2 timestamp) stored as carbondata TBLPROPERTIES
('SORT_COLUMNS'='metric,ts2')")
+ sql("insert into partitionone values ('2020-09-25
05:38:00','abc','xyz-e01',392.235,'2020-09-25 05:30:00','2020-09-28 05:38:00')")
+ val mvQuery = "select tags_id ,metric ,ts1, ts2,
timeseries(ts,'thirty_minute') as ts,sum(value),avg(value)," +
+ "min(value),max(value) from partitionone group by metric,
tags_id, timeseries(ts,'thirty_minute') ,ts1, ts2"
+ val result = sql(mvQuery)
+ sql("drop materialized view if exists dm1")
+ sql(s"create materialized view dm1 as $mvQuery")
+ val df = sql(mvQuery)
+ assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "dm1"))
+ checkAnswer(result, df)
+ sql("drop table if exists partitionone")
+ }
+
test("test partition on timeseries column") {
sql("drop table if exists partitionone")
sql("create table partitionone(a int,b int) partitioned by (c timestamp,d
timestamp) STORED AS carbondata")