This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 01fd120 [CARBONDATA-4186] Fixed insert failure when partition column
present in local sort scope
01fd120 is described below
commit 01fd12042e3300a5d18eef15a634df66b29580ee
Author: Nihal ojha <[email protected]>
AuthorDate: Wed May 12 12:26:42 2021 +0530
[CARBONDATA-4186] Fixed insert failure when partition column present in
local sort scope
Why is this PR needed?
Currently when we create table with partition column and put the same
column as part of
local sort scope then Insert query fails with ArrayIndexOutOfBounds
exception.
What changes were proposed in this PR?
Handle ArrayIndexOutOfBound exception, earlier array size was not
increasing because data
was inconsistence and in the wrong order for sortcolumn and
isDimNoDictFlags.
This closes #4132
---
.../carbondata/core/datastore/TableSpec.java | 21 +++-
.../StandardPartitionTableLoadingTestCase.scala | 18 ++++
.../loading/sort/unsafe/UnsafeCarbonRowPage.java | 17 ++-
.../processing/sort/sortdata/SortParameters.java | 119 ++++++++++++++++++---
.../processing/sort/sortdata/TableFieldStat.java | 28 +++--
.../processing/util/CarbonDataProcessorUtil.java | 2 +-
6 files changed, 174 insertions(+), 31 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 6cbf07c..68363aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -62,17 +62,28 @@ public class TableSpec {
List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
List<CarbonMeasure> measures = carbonTable.getVisibleMeasures();
if (keepPartitionColumnsToEnd && carbonTable.getPartitionInfo() != null) {
- // keep the partition columns in the end
+ // keep the partition columns in the end if that is not present in sort
columns
List<CarbonDimension> reArrangedDimensions = new ArrayList<>();
List<CarbonMeasure> reArrangedMeasures = new ArrayList<>();
List<CarbonDimension> partitionDimensions = new ArrayList<>();
List<CarbonMeasure> partitionMeasures = new ArrayList<>();
List<ColumnSchema> columnSchemaList =
carbonTable.getPartitionInfo().getColumnSchemaList();
+ String[] sortColumns = carbonTable.getTableInfo()
+ .getFactTable().getTableProperties().getOrDefault("sort_columns",
"").split(",");
+ for (String col : sortColumns) {
+ for (CarbonDimension dim : dimensions) {
+ if (dim.getColName().equalsIgnoreCase(col)) {
+ reArrangedDimensions.add(dim);
+ }
+ }
+ }
for (CarbonDimension dim : dimensions) {
- if (columnSchemaList.contains(dim.getColumnSchema())) {
- partitionDimensions.add(dim);
- } else {
- reArrangedDimensions.add(dim);
+ if (!dim.isSortColumn()) {
+ if (columnSchemaList.contains(dim.getColumnSchema())) {
+ partitionDimensions.add(dim);
+ } else {
+ reArrangedDimensions.add(dim);
+ }
}
}
if (partitionDimensions.size() != 0) {
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 3a143b6..bdeacc1 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -517,6 +517,24 @@ class StandardPartitionTableLoadingTestCase extends
QueryTest with BeforeAndAfte
assert(result.get(0).get(6).equals(dataAndIndexSize._2))
}
+ test("test partition column with different sort scope") {
+ verifySortWithPartition("global_sort")
+ verifySortWithPartition("no_sort")
+ verifySortWithPartition("local_sort")
+ }
+
+ def verifySortWithPartition(scope: String): Unit = {
+ sql("drop table if exists carbon_partition")
+ sql(s"create table carbon_partition(id int, name string, salary double) " +
+ "partitioned by(country string, id1 int)" +
+ s"stored as carbondata
tblproperties('sort_scope'='$scope','sort_columns'='country, id')")
+ sql("insert into carbon_partition select 1, 'Ram',3500,'India', 20")
+ checkAnswer(
+ sql("SELECT * FROM carbon_partition"),
+ Seq(Row(1, "Ram", 3500.0, "India", 20))
+ )
+ }
+
test("test partition with all sort scope") {
sql("drop table if exists origin_csv")
sql(
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index e00467b..91317d1 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -55,6 +55,8 @@ public class UnsafeCarbonRowPage {
private boolean isSaveToDisk;
+ private int[] changedDataFieldOrder;
+
public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock
memoryBlock,
String taskId, boolean isSaveToDisk) {
this.tableFieldStat = tableFieldStat;
@@ -67,17 +69,30 @@ public class UnsafeCarbonRowPage {
this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
this.isSaveToDisk = isSaveToDisk;
+ this.changedDataFieldOrder = tableFieldStat.getChangedDataFieldOrder();
}
public int addRow(Object[] row,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream)
throws MemoryException, IOException {
- int size = addRow(row, dataBlock.getBaseOffset() + lastSize,
reUsableByteArrayDataOutputStream);
+ int size = addRow(getConvertedRow(row), dataBlock
+ .getBaseOffset() + lastSize, reUsableByteArrayDataOutputStream);
buffer.set(lastSize);
lastSize = lastSize + size;
return size;
}
+ private Object[] getConvertedRow(Object[] row) {
+ if (changedDataFieldOrder != null && changedDataFieldOrder.length ==
row.length) {
+ Object[] convertedRow = new Object[row.length];
+ for (int i = 0; i < row.length; i++) {
+ convertedRow[i] = row[changedDataFieldOrder[i]];
+ }
+ return convertedRow;
+ }
+ return row;
+ }
+
/**
* add raw row as intermidiate sort temp row to page
*
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 9339e5b..9de2df7 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -30,6 +30,7 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -124,11 +125,6 @@ public class SortParameters implements Serializable {
*/
private String taskNo;
- /**
- * This will tell whether dimension is dictionary or not.
- */
- private boolean[] noDictionaryDimnesionColumn;
-
private boolean[] noDictionarySortColumn;
private boolean[] sortColumn;
@@ -175,6 +171,12 @@ public class SortParameters implements Serializable {
private boolean isInsertWithoutReArrangeFlow;
+ private int noDictSortDimCnt;
+
+ private int dictSortDimCnt;
+
+ private int[] changedOrderInDataField;
+
public SortParameters getCopy() {
SortParameters parameters = new SortParameters();
parameters.tempFileLocation = tempFileLocation;
@@ -199,7 +201,6 @@ public class SortParameters implements Serializable {
parameters.partitionID = partitionID;
parameters.segmentId = segmentId;
parameters.taskNo = taskNo;
- parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
parameters.sortColumn = sortColumn;
parameters.isVarcharDimensionColumn = isVarcharDimensionColumn;
parameters.noDictionarySortColumn = noDictionarySortColumn;
@@ -217,6 +218,9 @@ public class SortParameters implements Serializable {
parameters.noDictSchemaDataType = noDictSchemaDataType;
parameters.noDictSortColIdxSchemaOrderMapping =
noDictSortColIdxSchemaOrderMapping;
parameters.dictSortColIdxSchemaOrderMapping =
dictSortColIdxSchemaOrderMapping;
+ parameters.noDictSortDimCnt = noDictSortDimCnt;
+ parameters.dictSortDimCnt = dictSortDimCnt;
+ parameters.changedOrderInDataField = changedOrderInDataField;
return parameters;
}
@@ -364,12 +368,20 @@ public class SortParameters implements Serializable {
this.taskNo = taskNo;
}
- public boolean[] getNoDictionaryDimnesionColumn() {
- return noDictionaryDimnesionColumn;
+ public void setNoDictSortDimCnt(int noDictSortDimCnt) {
+ this.noDictSortDimCnt = noDictSortDimCnt;
}
- public void setNoDictionaryDimnesionColumn(boolean[]
noDictionaryDimnesionColumn) {
- this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+ public int getNoDictSortDimCnt() {
+ return this.noDictSortDimCnt;
+ }
+
+ public void setDictSortDimCnt(int dictSortDimCnt) {
+ this.dictSortDimCnt = dictSortDimCnt;
+ }
+
+ public int getDictSortDimCnt() {
+ return this.dictSortDimCnt;
}
public boolean[] getIsVarcharDimensionColumn() {
@@ -436,6 +448,26 @@ public class SortParameters implements Serializable {
isInsertWithoutReArrangeFlow = insertWithoutReArrangeFlow;
}
+ public void setSortDictAndNoDictDimCnt(DataField[] dataFields) {
+ int noDictSortDimCnt = this.getNoDictSortDimCnt();
+ int dictSortDimCnt = this.getDictSortDimCnt();
+ for (DataField field: dataFields) {
+ if (!field.isDateDataType() && field.getColumn().isDimension()
+ && field.getColumn().getColumnSchema().isSortColumn()) {
+ noDictSortDimCnt++;
+ } else if (field.getColumn().isDimension()
+ && field.getColumn().getColumnSchema().isSortColumn()) {
+ dictSortDimCnt++;
+ }
+ }
+ this.setNoDictSortDimCnt(noDictSortDimCnt);
+ this.setDictSortDimCnt(dictSortDimCnt);
+ }
+
+ public int[] getChangedOrderInDataField() {
+ return changedOrderInDataField;
+ }
+
public static SortParameters
createSortParameters(CarbonDataLoadConfiguration configuration) {
SortParameters parameters = new SortParameters();
CarbonTableIdentifier tableIdentifier =
@@ -454,8 +486,7 @@ public class SortParameters implements Serializable {
parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
parameters.setComplexDimColCount(configuration.getComplexDictionaryColumnCount()
+ configuration
.getComplexNonDictionaryColumnCount());
- parameters.setNoDictionaryDimnesionColumn(
-
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+ parameters.setSortDictAndNoDictDimCnt(configuration.getDataFields());
parameters.setIsVarcharDimensionColumn(
CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields()));
parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
@@ -525,10 +556,13 @@ public class SortParameters implements Serializable {
parameters.setNoDictSchemaDataType(CarbonDataProcessorUtil
.getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
+ DataField[] changeDataFields =
changeDataFieldForSortAndPartition(configuration);
+ parameters.changedOrderInDataField = getChangedSchemaOrder(configuration
+ .getDataFields(), changeDataFields);
parameters.setNoDictDataType(CarbonDataProcessorUtil
.getNoDictSortDataTypesAsDataFieldOrder(configuration.getDataFields()));
Map<String, DataType[]> noDictSortAndNoSortDataTypes =
CarbonDataProcessorUtil
-
.getNoDictSortAndNoSortDataTypesAsDataFieldOrder(configuration.getDataFields());
+ .getNoDictSortAndNoSortDataTypes(changeDataFields);
parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
// keep partition columns in the end for table spec by getting
rearranged tale spec
@@ -566,6 +600,48 @@ public class SortParameters implements Serializable {
return parameters;
}
+ private static DataField[] changeDataFieldForSortAndPartition(
+ CarbonDataLoadConfiguration configuration) {
+ DataField[] dataFields = configuration.getDataFields();
+ CarbonTable carbonTable = configuration.getTableSpec().getCarbonTable();
+ String[] sortColumns = carbonTable.getTableInfo()
+ .getFactTable().getTableProperties().getOrDefault("sort_columns",
"").split(",");
+ DataField[] changedDataField = new DataField[dataFields.length];
+ int i = 0;
+ for (String col : sortColumns) {
+ for (DataField dataField : dataFields) {
+ if (dataField.getColumn().getColName().equalsIgnoreCase(col)) {
+ changedDataField[i++] = dataField;
+ }
+ }
+ }
+ for (DataField dataField : dataFields) {
+ if (!dataField.getColumn().getColumnSchema().isSortColumn()) {
+ changedDataField[i++] = dataField;
+ }
+ }
+ return changedDataField;
+ }
+
+ private static int[] getChangedSchemaOrder(
+ DataField[] dataFields, DataField[] changedDataFields) {
+ int[] changedDataFieldOrder = new int[dataFields.length];
+ boolean isChanged = false;
+ for (int i = 0; i < changedDataFields.length; i++) {
+ for (int j = 0; j < dataFields.length; j++) {
+ if (dataFields[j].getColumn().getColName()
+ .equalsIgnoreCase(changedDataFields[i].getColumn().getColName())) {
+ changedDataFieldOrder[i] = j;
+ if (i != j) {
+ isChanged = true;
+ }
+ break;
+ }
+ }
+ }
+ return isChanged ? changedDataFieldOrder : null;
+ }
+
public int getRangeId() {
return rangeId;
}
@@ -593,10 +669,10 @@ public class SortParameters implements Serializable {
parameters.setNoDictionaryCount(noDictionaryCount);
parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
parameters.setComplexDimColCount(complexDimColCount);
- parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
parameters.setSortColumn(sortColumnMapping);
parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn);
parameters.setObserver(new SortObserver());
+ parameters.setSortDictAndNoDictDimCount(noDictionaryColMaping,
sortColumnMapping);
// get sort buffer size
parameters.setSortBufferSize(Integer.parseInt(carbonProperties
.getProperty(CarbonCommonConstants.SORT_SIZE,
@@ -664,6 +740,21 @@ public class SortParameters implements Serializable {
return parameters;
}
+ private void setSortDictAndNoDictDimCount(boolean[] noDictionaryColMaping,
+ boolean[] sortColumnMapping) {
+ int noDictSortDimensionCount = this.getNoDictSortDimCnt();
+ int dictSortDimensionCount = this.getDictSortDimCnt();
+ for (int i = 0; i < noDictionaryColMaping.length; i++) {
+ if (noDictionaryColMaping[i] && sortColumnMapping[i]) {
+ noDictSortDimensionCount++;
+ } else if (!noDictionaryColMaping[i] && sortColumnMapping[i]) {
+ dictSortDimensionCount++;
+ }
+ }
+ this.setNoDictSortDimCnt(noDictSortDimensionCount);
+ this.setDictSortDimCnt(dictSortDimensionCount);
+ }
+
public DataType[] getNoDictSortDataType() {
return noDictSortDataType;
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index e317d56..63bb2c4 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -92,21 +92,16 @@ public class TableFieldStat implements Serializable {
*/
private int[] dictSortColIdxSchemaOrderMapping;
+ private int[] changedOrderInDataField;
+
public TableFieldStat(SortParameters sortParameters) {
int noDictDimCnt = sortParameters.getNoDictionaryCount();
int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
this.complexDimCnt = sortParameters.getComplexDimColCount();
this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
- boolean[] isDimNoDictFlags =
sortParameters.getNoDictionaryDimnesionColumn();
- boolean[] sortColumn = sortParameters.getSortColumn();
- for (int i = 0; i < isDimNoDictFlags.length; i++) {
- if (isDimNoDictFlags[i] && sortColumn[i]) {
- noDictSortDimCnt++;
- } else if (!isDimNoDictFlags[i] && sortColumn[i]) {
- dictSortDimCnt++;
- }
- }
+ noDictSortDimCnt = sortParameters.getNoDictSortDimCnt();
+ dictSortDimCnt = sortParameters.getDictSortDimCnt();
this.measureCnt = sortParameters.getMeasureColCount();
this.measureDataType = sortParameters.getMeasureDataType();
this.noDictDataType = sortParameters.getNoDictDataType();
@@ -121,6 +116,7 @@ public class TableFieldStat implements Serializable {
varcharDimCnt++;
}
}
+ this.changedOrderInDataField = sortParameters.getChangedOrderInDataField();
// be careful that the default value is 0
this.dictSortDimIdx = new int[dictSortDimCnt];
@@ -210,6 +206,10 @@ public class TableFieldStat implements Serializable {
}
}
+ public int[] getChangedDataFieldOrder() {
+ return changedOrderInDataField;
+ }
+
public int getDictSortDimCnt() {
return dictSortDimCnt;
}
@@ -356,6 +356,10 @@ public class TableFieldStat implements Serializable {
for (CarbonDimension dim : visibleDimensions) {
if (!columnSchemaList.contains(dim.getColumnSchema())) {
otherCols.add(dim.getColumnSchema());
+ } else {
+ if (dim.isSortColumn()) {
+ otherCols.add(dim.getColumnSchema());
+ }
}
}
for (CarbonMeasure measure : visibleMeasures) {
@@ -363,7 +367,11 @@ public class TableFieldStat implements Serializable {
otherCols.add(measure.getColumnSchema());
}
}
- otherCols.addAll(columnSchemaList);
+ columnSchemaList.forEach(columnSchema -> {
+ if (!columnSchema.isSortColumn()) {
+ otherCols.add(columnSchema);
+ }
+ });
}
return otherCols;
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index dcf70b0..2be5729 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -655,7 +655,7 @@ public final class CarbonDataProcessorUtil {
* @param dataFields
* @return
*/
- public static Map<String, DataType[]>
getNoDictSortAndNoSortDataTypesAsDataFieldOrder(
+ public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(
DataField[] dataFields) {
List<DataType> noDictSortType = new ArrayList<>();
List<DataType> noDictNoSortType = new ArrayList<>();