This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff1e1b [CARBONDATA-3734] Fix insert failure on partition table when
parition column is in sort column
3ff1e1b is described below
commit 3ff1e1beeabfc165f9c6ab818969cb4fb023f7a6
Author: ajantha-bhat <[email protected]>
AuthorDate: Mon Mar 2 19:44:45 2020 +0530
[CARBONDATA-3734] Fix insert failure on partition table when parition
column is in sort column
Why is this PR needed?
When partition column is a sort column.
a) currently sort columns won't be the head of attributes, so need to add
logic as per that, as partition columns will be in the end for global sort.
b) While rearranging the data fields at executor, need to keep partition
column in the end even though it is in sort column.
What changes were proposed in this PR?
prepare sort columns for global sort based on the table sort columns.
Keep partition in the end, even though it is in sort columns for data field
attributes
Does this PR introduce any user interface change?
No
Is any new testcase added?
No (already this scenario present in mv test case)
This closes #3654
---
.../command/management/CommonLoadUtils.scala | 14 ++++++++++++-
.../mv/rewrite/TestPartitionWithMV.scala | 3 +++
.../processing/loading/DataLoadProcessBuilder.java | 24 ++++++++++++++++++----
3 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index a0b2676..40a732a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -504,7 +504,19 @@ object CommonLoadUtils {
if (numPartitions <= 0) {
numPartitions = partitionsLen
}
- val sortColumns = attributes.take(table.getSortColumns.size())
+ val sortColumns =
+ if (table.isHivePartitionTable) {
+ // In case of partition column as sort column, attribute will not
have it in the front.
+ // so need to look up the attribute and prepare
+ val sortColsAttr: collection.mutable.ArrayBuffer[AttributeReference]
= ArrayBuffer()
+ val sortCols = table.getSortColumns.asScala
+ for (sortColumn <- sortCols) {
+ sortColsAttr += attributes.find(x =>
x.name.equalsIgnoreCase(sortColumn)).get
+ }
+ sortColsAttr
+ } else {
+ attributes.take(table.getSortColumns.size())
+ }
val dataTypes = sortColumns.map(_.dataType)
val sortedRDD: RDD[InternalRow] =
GlobalSortHelper.sortBy(updatedRdd, numPartitions, dataTypes)
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
index c781598..bf12589 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
@@ -637,7 +637,10 @@ class TestPartitionWithMV extends QueryTest with
BeforeAndAfterAll {
sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',25")
sql("alter table updatetime_8 compact 'minor'")
sql("alter table updatetime_8 compact 'minor'")
+ val df = sql("select sum(hs_len) from updatetime_8 group by imex")
+ assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "ag"))
checkAnswer(sql("select sum(hs_len) from updatetime_8 group by
imex"),Seq(Row(40),Row(42),Row(83)))
+ sql("drop table updatetime_8").show(200, false)
}
test("check partitioning for child tables with various combinations") {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index ab41c40..966f828 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -252,9 +252,9 @@ public final class DataLoadProcessBuilder {
partitionColumns, dataFields);
} else {
getDataFields(loadModel, dimensions, measures, complexDataFields,
dataFields);
+ dataFields = updateDataFieldsBasedOnSortColumns(dataFields);
}
- configuration.setDataFields(
- updateDataFieldsBasedOnSortColumns(dataFields).toArray(new
DataField[dataFields.size()]));
+ configuration.setDataFields(dataFields.toArray(new DataField[0]));
configuration.setBucketingInfo(carbonTable.getBucketingInfo());
configuration.setBucketHashMethod(carbonTable.getBucketHashMethod());
configuration.setPreFetch(loadModel.isPreFetch());
@@ -326,6 +326,10 @@ public final class DataLoadProcessBuilder {
} else {
partitionColumnSchemaList = new ArrayList<>();
}
+ // 1.1 compatibility, dimensions will not have sort columns in the
beginning in 1.1.
+ // Need to keep at the beginning now
+ List<DataField> sortDataFields = new ArrayList<>();
+ List<DataField> noSortDataFields = new ArrayList<>();
for (CarbonColumn column : dimensions) {
DataField dataField = new DataField(column);
if (column.isComplex()) {
@@ -356,11 +360,23 @@ public final class DataLoadProcessBuilder {
.contains(column.getColumnSchema())) {
partitionColumns.add(dataField);
} else {
- dataFields.add(dataField);
+ if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+ sortDataFields.add(dataField);
+ } else {
+ noSortDataFields.add(dataField);
+ }
}
}
}
- dataFields.addAll(complexDataFields);
+ if (sortDataFields.size() != 0) {
+ dataFields.addAll(sortDataFields);
+ }
+ if (noSortDataFields.size() != 0) {
+ dataFields.addAll(noSortDataFields);
+ }
+ if (complexDataFields.size() != 0) {
+ dataFields.addAll(complexDataFields);
+ }
for (CarbonColumn column : measures) {
if (partitionColumnSchemaList.size() != 0 && partitionColumnSchemaList
.contains(column.getColumnSchema())) {