This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 29ecd5f [CARBONDATA-4077] Refactor and Fix Insert into partition
issue with FileMergeSortComparator
29ecd5f is described below
commit 29ecd5fee7af97633b756bfd944251a18dc76c18
Author: Indhumathi27 <[email protected]>
AuthorDate: Sat Dec 5 19:35:58 2020 +0530
[CARBONDATA-4077] Refactor and Fix Insert into partition issue with
FileMergeSortComparator
Why is this PR needed?
From PR-3995 changes, insert into partition flow scenario is missed. Using
Map for getting
Dict/No-Dict sort column info during final sort task will affect load
performance,
if number of sort columns is more.
What changes were proposed in this PR?
Handled the insert into partition flow
Refactored the code, to use list of only Dict/No-Dict sort column indexes
instead of
Map to fix performance issue.
This closes #4039
---
.../query/SecondaryIndexQueryResultProcessor.java | 2 -
.../unsafe/holder/UnsafeFinalMergePageHolder.java | 6 +-
.../sort/unsafe/holder/UnsafeInmemoryHolder.java | 6 +-
.../holder/UnsafeSortTempFileChunkHolder.java | 6 +-
.../merger/CompactionResultSortProcessor.java | 11 +-
.../sort/sortdata/FileMergeSortComparator.java | 100 ++++++++--------
.../processing/sort/sortdata/SortParameters.java | 69 +++++++----
.../sort/sortdata/SortTempFileChunkHolder.java | 12 +-
.../processing/sort/sortdata/TableFieldStat.java | 27 +++--
.../processing/util/CarbonDataProcessorUtil.java | 127 ++++++++++++++-------
.../sort/sortdata/FileMergeSortComparatorTest.java | 27 ++---
11 files changed, 239 insertions(+), 154 deletions(-)
diff --git
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 4d045f0..41a5c43 100644
---
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -528,8 +528,6 @@ public class SecondaryIndexQueryResultProcessor {
CarbonCommonConstants.FILE_SEPARATOR,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
sortParameters.setNoDictionarySortColumn(
CarbonDataProcessorUtil.getNoDictSortColMapping(indexTable));
- sortParameters.setSortColumnSchemaOrderMap(
- CarbonDataProcessorUtil.getSortColSchemaOrderMapping(indexTable));
finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation,
indexTable.getTableName(), sortParameters);
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 717bb91..7abd3a9 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -61,10 +61,10 @@ public class UnsafeFinalMergePageHolder implements
SortTempChunkHolder {
}
this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
LOGGER.info("Processing unsafe inmemory rows page with size : " +
actualSize);
- this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
- tableFieldStat.getNoDictSchemaDataType(),
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
- tableFieldStat.getSortColSchemaOrderMap());
+ tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+ tableFieldStat.getDictSortColIdxSchemaOrderMapping());
}
public boolean hasNext() {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index a46811f..a21e802 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -48,10 +48,10 @@ public class UnsafeInmemoryHolder implements
SortTempChunkHolder {
this.rowPage = rowPage;
LOGGER.info("Processing unsafe inmemory rows page with size : " +
actualSize);
this.comparator =
- new
FileMergeSortComparator(rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
- rowPage.getTableFieldStat().getNoDictSchemaDataType(),
+ new
FileMergeSortComparator(rowPage.getTableFieldStat().getNoDictSchemaDataType(),
rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping(),
- rowPage.getTableFieldStat().getSortColSchemaOrderMap());
+
rowPage.getTableFieldStat().getNoDictSortColIdxSchemaOrderMapping(),
+ rowPage.getTableFieldStat().getDictSortColIdxSchemaOrderMapping());
this.rowPage.setReadConvertedNoSortField();
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 5144f57..77b19f4 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -120,10 +120,10 @@ public class UnsafeSortTempFileChunkHolder implements
SortTempChunkHolder {
comparator = new
IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
parameters.getNoDictDataType());
} else {
- this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
- tableFieldStat.getNoDictSchemaDataType(),
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
- tableFieldStat.getSortColSchemaOrderMap());
+ tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+ tableFieldStat.getDictSortColIdxSchemaOrderMapping());
}
initialize();
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 9cfda1d..3f27eee 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -480,8 +481,14 @@ public class CompactionResultSortProcessor extends
AbstractResultProcessor {
boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
.getNoDictSortColMapping(carbonTable);
sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
- sortParameters.setSortColumnSchemaOrderMap(
- CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable));
+ Map<String, int[]> columnIdxMap = CarbonDataProcessorUtil
+ .getColumnIdxBasedOnSchemaInRow(carbonTable);
+ sortParameters.setNoDictSortColumnSchemaOrderMapping(
+ columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+ sortParameters.setNoDictSortColIdxSchemaOrderMapping(
+ columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+ sortParameters.setDictSortColIdxSchemaOrderMapping(
+ columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
String[] sortTempFileLocation =
CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
finalMerger =
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
index 687af7b..337bf73 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
@@ -18,8 +18,6 @@
package org.apache.carbondata.processing.sort.sortdata;
import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil;
@@ -29,8 +27,6 @@ import
org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
public class FileMergeSortComparator implements
Comparator<IntermediateSortTempRow> {
- private boolean[] isSortColumnNoDictionary;
-
/**
* Datatype of all the no-dictionary columns in the table in schema order
*/
@@ -42,21 +38,25 @@ public class FileMergeSortComparator implements
Comparator<IntermediateSortTempR
private int[] noDictPrimitiveIndex;
/**
- * Sort and Dictionary info of all the columns in schema order
+ * Index of the No-Dict Sort columns in schema order for final merge step of
sorting.
+ */
+ private int[] noDictSortColIdxSchemaOrderMapping;
+
+ /**
+ * Index of the dict Sort columns in schema order for final merge step of
sorting.
*/
- private final Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
+ private int[] dictSortColIdxSchemaOrderMapping;
/**
* Comparator for IntermediateSortTempRow for compatibility cases where
column added in old
* version and it is sort column
- * @param isSortColumnNoDictionary isSortColumnNoDictionary
*/
- public FileMergeSortComparator(boolean[] isSortColumnNoDictionary,
DataType[] noDictDataTypes,
- int[] columnIdBasedOnSchemaInRow, Map<Integer, List<Boolean>>
sortColumnSchemaOrderMap) {
- this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+ public FileMergeSortComparator(DataType[] noDictDataTypes, int[]
columnIdBasedOnSchemaInRow,
+ int[] noDictSortColIdxSchemaOrderMapping, int[]
dictSortColIdxSchemaOrderMapping) {
this.noDictDataTypes = noDictDataTypes;
this.noDictPrimitiveIndex = columnIdBasedOnSchemaInRow;
- this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+ this.noDictSortColIdxSchemaOrderMapping =
noDictSortColIdxSchemaOrderMapping;
+ this.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
}
/**
@@ -68,8 +68,8 @@ public class FileMergeSortComparator implements
Comparator<IntermediateSortTempR
* 1. only Dictionary sort column data
* 2. dictionary sort and dictionary no-sort column data
* On this temp data row, we have to identify the sort column data and only
compare those.
- * From the sortColumnSchemaOrderMap, get the sort column and dict info. If
the column
- * is sort column, then perform the comparison, else increment the
respective index.
+ * Use noDictSortColIdxSchemaOrderMapping and
dictSortColIdxSchemaOrderMapping to get
+ * the indexes of sort column in schema order
*/
@Override
public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow
rowB) {
@@ -78,51 +78,47 @@ public class FileMergeSortComparator implements
Comparator<IntermediateSortTempR
int nonDictIndex = 0;
int noDicTypeIdx = 0;
int schemaRowIdx = 0;
- int sortIndex = 0;
-
- for (Map.Entry<Integer, List<Boolean>> schemaEntry :
sortColumnSchemaOrderMap.entrySet()) {
- boolean isSortColumn = schemaEntry.getValue().get(0);
- boolean isDictColumn = schemaEntry.getValue().get(1);
- if (isSortColumn) {
- if (isSortColumnNoDictionary[sortIndex++]) {
- if (DataTypeUtil.isPrimitiveColumn(noDictDataTypes[noDicTypeIdx])) {
- // use data types based comparator for the no dictionary measure
columns
- SerializableComparator comparator =
- org.apache.carbondata.core.util.comparator.Comparator
- .getComparator(noDictDataTypes[noDicTypeIdx]);
- int difference = comparator
-
.compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
-
rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
- schemaRowIdx++;
- if (difference != 0) {
- return difference;
- }
- } else {
- byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
- byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
- int difference =
ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
- }
- }
- } else {
- int dimFieldA = rowA.getDictSortDims()[dictIndex];
- int dimFieldB = rowB.getDictSortDims()[dictIndex];
+ // compare no-Dict sort columns
+ for (int i = 0; i < noDictSortColIdxSchemaOrderMapping.length; i++) {
+ if (DataTypeUtil
+
.isPrimitiveColumn(noDictDataTypes[noDictSortColIdxSchemaOrderMapping[noDicTypeIdx]]))
{
+ // use data types based comparator for the no dictionary measure
columns
+ SerializableComparator comparator =
org.apache.carbondata.core.util.comparator.Comparator
+
.getComparator(noDictDataTypes[noDictSortColIdxSchemaOrderMapping[noDicTypeIdx]]);
+ int difference = comparator
+
.compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
+ rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
+ schemaRowIdx++;
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ byte[] byteArr1 =
+ (byte[])
rowA.getNoDictSortDims()[noDictSortColIdxSchemaOrderMapping[nonDictIndex]];
+ byte[] byteArr2 =
+ (byte[])
rowB.getNoDictSortDims()[noDictSortColIdxSchemaOrderMapping[nonDictIndex]];
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
+ int difference = ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1,
byteArr2);
+ if (difference != 0) {
+ return difference;
}
}
- if (isDictColumn) {
- dictIndex++;
- } else {
- nonDictIndex++;
- noDicTypeIdx++;
+ nonDictIndex++;
+ noDicTypeIdx++;
+ }
+ // compare dict sort columns
+ for (int i = 0; i < dictSortColIdxSchemaOrderMapping.length; i++) {
+ int dimFieldA =
rowA.getDictSortDims()[dictSortColIdxSchemaOrderMapping[dictIndex]];
+ int dimFieldB =
rowB.getDictSortDims()[dictSortColIdxSchemaOrderMapping[dictIndex]];
+ dictIndex++;
+
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
}
}
+
return diff;
}
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index fc93d52..e8946b9 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.processing.sort.sortdata;
import java.io.File;
import java.io.Serializable;
-import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -108,9 +107,6 @@ public class SortParameters implements Serializable {
// used while performing final sort of intermediate files
private DataType[] noDictSchemaDataType;
- // Sort and Dictionary info of all the columns in schema order, used in
final sort
- private Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
-
/**
* To know how many columns are of high cardinality.
*/
@@ -167,6 +163,16 @@ public class SortParameters implements Serializable {
*/
private int[] noDictSortColumnSchemaOrderMapping;
+ /**
+ * Index of the no dict Sort columns in schema order used for final merge
step of sorting.
+ */
+ private int[] noDictSortColIdxSchemaOrderMapping;
+
+ /**
+ * Index of the dict Sort columns in schema order for final merge step of
sorting.
+ */
+ private int[] dictSortColIdxSchemaOrderMapping;
+
private boolean isInsertWithoutReArrangeFlow;
public SortParameters getCopy() {
@@ -209,7 +215,8 @@ public class SortParameters implements Serializable {
parameters.noDictSortColumnSchemaOrderMapping =
noDictSortColumnSchemaOrderMapping;
parameters.isInsertWithoutReArrangeFlow = isInsertWithoutReArrangeFlow;
parameters.noDictSchemaDataType = noDictSchemaDataType;
- parameters.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+ parameters.noDictSortColIdxSchemaOrderMapping =
noDictSortColIdxSchemaOrderMapping;
+ parameters.dictSortColIdxSchemaOrderMapping =
dictSortColIdxSchemaOrderMapping;
return parameters;
}
@@ -417,7 +424,7 @@ public class SortParameters implements Serializable {
return noDictSortColumnSchemaOrderMapping;
}
- private void setNoDictSortColumnSchemaOrderMapping(int[]
noDictSortColumnSchemaOrderMapping) {
+ public void setNoDictSortColumnSchemaOrderMapping(int[]
noDictSortColumnSchemaOrderMapping) {
this.noDictSortColumnSchemaOrderMapping =
noDictSortColumnSchemaOrderMapping;
}
@@ -507,8 +514,14 @@ public class SortParameters implements Serializable {
parameters.setInsertWithoutReArrangeFlow(true);
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
.getNoDictSortColMappingAsDataFieldOrder(configuration.getDataFields()));
- parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-
.getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields()));
+ Map<String, int[]> columnIdxMap = CarbonDataProcessorUtil
+
.getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields());
+ parameters.setNoDictSortColumnSchemaOrderMapping(
+ columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+ parameters.setNoDictSortColIdxSchemaOrderMapping(
+ columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+ parameters.setDictSortColIdxSchemaOrderMapping(
+ columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
parameters.setNoDictDataType(CarbonDataProcessorUtil
.getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
@@ -525,13 +538,17 @@ public class SortParameters implements Serializable {
} else {
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
.getNoDictSortColMapping(parameters.getCarbonTable()));
- parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
- .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+ Map<String, int[]> columnIdxMap =
+
CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable());
+ parameters
+
.setNoDictSortColumnSchemaOrderMapping(columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+ parameters.setNoDictSortColIdxSchemaOrderMapping(
+ columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+ parameters
+
.setDictSortColIdxSchemaOrderMapping(columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
parameters.setMeasureDataType(configuration.getMeasureDataType());
parameters.setNoDictDataType(CarbonDataProcessorUtil
.getNoDictSortDataTypes(configuration.getTableSpec().getCarbonTable()));
- parameters.setSortColumnSchemaOrderMap(
-
CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
parameters.setNoDictSchemaDataType(
CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
Map<String, DataType[]> noDictSortAndNoSortDataTypes =
CarbonDataProcessorUtil
@@ -627,12 +644,16 @@ public class SortParameters implements Serializable {
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
.getNoDictSortColMapping(parameters.getCarbonTable()));
- parameters.setSortColumnSchemaOrderMap(
-
CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
parameters.setNoDictSchemaDataType(
CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
- parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
- .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+ Map<String, int[]> columnIdxMap =
+
CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable());
+ parameters
+
.setNoDictSortColumnSchemaOrderMapping(columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+ parameters
+
.setNoDictSortColIdxSchemaOrderMapping(columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+ parameters
+
.setDictSortColIdxSchemaOrderMapping(columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
TableSpec tableSpec = new TableSpec(carbonTable, false);
parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
@@ -713,11 +734,19 @@ public class SortParameters implements Serializable {
this.noDictSchemaDataType = noDictSchemaDataType;
}
- public void setSortColumnSchemaOrderMap(Map<Integer, List<Boolean>>
sortColumnSchemaOrderMap) {
- this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+ public int[] getNoDictSortColIdxSchemaOrderMapping() {
+ return noDictSortColIdxSchemaOrderMapping;
+ }
+
+ public void setNoDictSortColIdxSchemaOrderMapping(int[]
noDictSortColIdxSchemaOrderMapping) {
+ this.noDictSortColIdxSchemaOrderMapping =
noDictSortColIdxSchemaOrderMapping;
+ }
+
+ public int[] getDictSortColIdxSchemaOrderMapping() {
+ return dictSortColIdxSchemaOrderMapping;
}
- public Map<Integer, List<Boolean>> getSortColumnSchemaOrderMap() {
- return sortColumnSchemaOrderMap;
+ public void setDictSortColIdxSchemaOrderMapping(int[]
dictSortColIdxSchemaOrderMapping) {
+ this.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
}
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 84fb4d0..f4cfe69 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -105,10 +105,10 @@ public class SortTempFileChunkHolder implements
Comparable<SortTempFileChunkHold
public SortTempFileChunkHolder(SortParameters sortParameters) {
this.tableFieldStat = new TableFieldStat(sortParameters);
- this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
- tableFieldStat.getNoDictSchemaDataType(),
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
- tableFieldStat.getSortColSchemaOrderMap());
+ tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+ tableFieldStat.getDictSortColIdxSchemaOrderMapping());
this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
}
@@ -132,10 +132,10 @@ public class SortTempFileChunkHolder implements
Comparable<SortTempFileChunkHold
true));
this.convertToActualField = convertToActualField;
if (this.convertToActualField) {
- this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
- tableFieldStat.getNoDictSchemaDataType(),
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
- tableFieldStat.getSortColSchemaOrderMap());
+ tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+ tableFieldStat.getDictSortColIdxSchemaOrderMapping());
} else {
this.comparator =
new
IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index df7b873..e317d56 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.sort.sortdata;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -83,7 +82,15 @@ public class TableFieldStat implements Serializable {
private DataType[] noDictSchemaDataType;
- private Map<Integer, List<Boolean>> sortColSchemaOrderMap;
+ /**
+ * Index of the no dict Sort columns in schema order used for final merge
step of sorting.
+ */
+ private int[] noDictSortColIdxSchemaOrderMapping;
+
+ /**
+ * Index of the dict Sort columns in schema order for final merge step of
sorting.
+ */
+ private int[] dictSortColIdxSchemaOrderMapping;
public TableFieldStat(SortParameters sortParameters) {
int noDictDimCnt = sortParameters.getNoDictionaryCount();
@@ -106,7 +113,9 @@ public class TableFieldStat implements Serializable {
this.noDictSortDataType = sortParameters.getNoDictSortDataType();
this.noDictNoSortDataType = sortParameters.getNoDictNoSortDataType();
this.noDictSchemaDataType = sortParameters.getNoDictSchemaDataType();
- this.sortColSchemaOrderMap = sortParameters.getSortColumnSchemaOrderMap();
+ this.noDictSortColIdxSchemaOrderMapping =
+ sortParameters.getNoDictSortColIdxSchemaOrderMapping();
+ this.dictSortColIdxSchemaOrderMapping =
sortParameters.getDictSortColIdxSchemaOrderMapping();
for (boolean flag : isVarcharDimFlags) {
if (flag) {
varcharDimCnt++;
@@ -359,11 +368,15 @@ public class TableFieldStat implements Serializable {
return otherCols;
}
- public Map<Integer, List<Boolean>> getSortColSchemaOrderMap() {
- return sortColSchemaOrderMap;
- }
-
public DataType[] getNoDictSchemaDataType() {
return noDictSchemaDataType;
}
+
+ public int[] getNoDictSortColIdxSchemaOrderMapping() {
+ return noDictSortColIdxSchemaOrderMapping;
+ }
+
+ public int[] getDictSortColIdxSchemaOrderMapping() {
+ return dictSortColIdxSchemaOrderMapping;
+ }
}
\ No newline at end of file
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ceaa05c..d8f8673 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -441,38 +441,6 @@ public final class CarbonDataProcessorUtil {
}
/**
- * Get the sort/no_sort column map based on schema order.
- * This will be used in the final sort step to find the index of sort
column, to compare the
- * intermediate row data based on schema.
- */
- public static Map<Integer, List<Boolean>>
getSortColSchemaOrderMapping(CarbonTable carbonTable) {
- List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
- Map<Integer, List<Boolean>> sortColSchemaOrderMap = new HashMap<>();
- for (CarbonDimension dimension : dimensions) {
- List<Boolean> sortDictOrNoDictMap = new ArrayList<>();
- // check if the dimension is sort column or not and add to first index
of sortDictOrNoDictMap
- // check if the dimension is dict column or not and add to second index
of sortDictOrNoDictMap
- if (dimension.isSortColumn()) {
- sortDictOrNoDictMap.add(true);
- if (dimension.hasEncoding(Encoding.DICTIONARY)) {
- sortDictOrNoDictMap.add(true);
- } else {
- sortDictOrNoDictMap.add(false);
- }
- } else {
- sortDictOrNoDictMap.add(false);
- if (dimension.hasEncoding(Encoding.DICTIONARY)) {
- sortDictOrNoDictMap.add(true);
- } else {
- sortDictOrNoDictMap.add(false);
- }
- }
- sortColSchemaOrderMap.put(dimension.getOrdinal(), sortDictOrNoDictMap);
- }
- return sortColSchemaOrderMap;
- }
-
- /**
* get mapping based on data fields order
*
* @param dataFields
@@ -504,14 +472,26 @@ public final class CarbonDataProcessorUtil {
* initial sorting, carbonrow will be in order where added sort column is at
the beginning, But
* before final merger of sort, the data should be in schema order
* (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the
carbonRow in schema
- * order), so This method helps to find the index of no dictionary sort
column in the carbonrow
- * data.
+ * order), so This method helps to find the index of no dictionary/
dictionary sort column in
+ * the carbonrow data.
*/
- public static int[] getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable) {
+ public static Map<String, int[]> getColumnIdxBasedOnSchemaInRow(CarbonTable
carbonTable) {
List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
List<Integer> noDicSortColMap = new ArrayList<>();
+ // get no-dict / dict sort dimensions
+ List<CarbonDimension> noDictSortDimensions = new ArrayList<>();
+ List<CarbonDimension> dictSortDimensions = new ArrayList<>();
+
+ List<Integer> noDictSortColIdx = new ArrayList<>();
+ List<Integer> dictSortColIdx = new ArrayList<>();
+
int counter = 0;
for (CarbonDimension dimension : dimensions) {
+ if (dimension.hasEncoding(Encoding.DICTIONARY) ||
dimension.getDataType() == DataTypes.DATE) {
+ dictSortDimensions.add(dimension);
+ } else {
+ noDictSortDimensions.add(dimension);
+ }
if (dimension.getDataType() == DataTypes.DATE) {
continue;
}
@@ -520,12 +500,39 @@ public final class CarbonDataProcessorUtil {
}
counter++;
}
+ // add no-Dict sort column index
+ for (int i = 0; i < noDictSortDimensions.size(); i++) {
+ if (noDictSortDimensions.get(i).isSortColumn()) {
+ noDictSortColIdx.add(i);
+ }
+ }
+ // add dict sort column index
+ for (int i = 0; i < dictSortDimensions.size(); i++) {
+ if (dictSortDimensions.get(i).isSortColumn()) {
+ dictSortColIdx.add(i);
+ }
+ }
Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
for (int i = 0; i < mapping.length; i++) {
columnIdxBasedOnSchemaInRow[i] = mapping[i];
}
- return columnIdxBasedOnSchemaInRow;
+ Integer[] noDictSortIdx = noDictSortColIdx.toArray(new Integer[0]);
+ int[] noDictSortIdxBasedOnSchemaInRow = new int[noDictSortIdx.length];
+ for (int i = 0; i < noDictSortIdx.length; i++) {
+ noDictSortIdxBasedOnSchemaInRow[i] = noDictSortIdx[i];
+ }
+ Integer[] dictSortIdx = dictSortColIdx.toArray(new Integer[0]);
+ int[] dictSortIdxBasedOnSchemaInRow = new int[dictSortIdx.length];
+ for (int i = 0; i < dictSortIdx.length; i++) {
+ dictSortIdxBasedOnSchemaInRow[i] = dictSortIdx[i];
+ }
+
+ Map<String, int[]> dictOrNoDictSortInfoMap = new HashMap<>();
+ dictOrNoDictSortInfoMap.put("columnIdxBasedOnSchemaInRow",
columnIdxBasedOnSchemaInRow);
+ dictOrNoDictSortInfoMap.put("noDictSortIdxBasedOnSchemaInRow",
noDictSortIdxBasedOnSchemaInRow);
+ dictOrNoDictSortInfoMap.put("dictSortIdxBasedOnSchemaInRow",
dictSortIdxBasedOnSchemaInRow);
+ return dictOrNoDictSortInfoMap;
}
/**
@@ -533,14 +540,27 @@ public final class CarbonDataProcessorUtil {
* initial sorting, carbonrow will be in order where added sort column is at
the beginning, But
* before final merger of sort, the data should be in schema order
* (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the
carbonRow in schema
- * order), so This method helps to find the index of no dictionary sort
column in the carbonrow
- * data.
+ * order), so This method helps to find the index of no dictionary/
dictionary sort column in
+ * the carbonrow data.
*/
- public static int[]
getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(DataField[] dataFields) {
+ public static Map<String, int[]>
getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(
+ DataField[] dataFields) {
List<Integer> noDicSortColMap = new ArrayList<>();
int counter = 0;
+ // get no-dict / dict sort column schema
+ List<CarbonColumn> noDictSortColumns = new ArrayList<>();
+ List<CarbonColumn> dictSortColumns = new ArrayList<>();
+
+ List<Integer> noDictSortColIdx = new ArrayList<>();
+ List<Integer> dictSortColIdx = new ArrayList<>();
for (DataField dataField : dataFields) {
if (!dataField.getColumn().isInvisible() &&
dataField.getColumn().isDimension()) {
+ if
(dataField.getColumn().getColumnSchema().hasEncoding(Encoding.DICTIONARY)
+ || dataField.getColumn().getColumnSchema().getDataType() ==
DataTypes.DATE) {
+ dictSortColumns.add(dataField.getColumn());
+ } else {
+ noDictSortColumns.add(dataField.getColumn());
+ }
if (dataField.getColumn().getColumnSchema().getDataType() ==
DataTypes.DATE) {
continue;
}
@@ -551,12 +571,39 @@ public final class CarbonDataProcessorUtil {
counter++;
}
}
+ // add no-Dict sort column index
+ for (int i = 0; i < noDictSortColumns.size(); i++) {
+ if (noDictSortColumns.get(i).getColumnSchema().isSortColumn()) {
+ noDictSortColIdx.add(i);
+ }
+ }
+ // add dict sort column index
+ for (int i = 0; i < dictSortColumns.size(); i++) {
+ if (dictSortColumns.get(i).getColumnSchema().isSortColumn()) {
+ dictSortColIdx.add(i);
+ }
+ }
Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
for (int i = 0; i < mapping.length; i++) {
columnIdxBasedOnSchemaInRow[i] = mapping[i];
}
- return columnIdxBasedOnSchemaInRow;
+ Integer[] noDictSortIdx = noDictSortColIdx.toArray(new Integer[0]);
+ int[] noDictSortIdxBasedOnSchemaInRow = new int[noDictSortIdx.length];
+ for (int i = 0; i < noDictSortIdx.length; i++) {
+ noDictSortIdxBasedOnSchemaInRow[i] = noDictSortIdx[i];
+ }
+ Integer[] dictSortIdx = dictSortColIdx.toArray(new Integer[0]);
+ int[] dictSortIdxBasedOnSchemaInRow = new int[dictSortIdx.length];
+ for (int i = 0; i < dictSortIdx.length; i++) {
+ dictSortIdxBasedOnSchemaInRow[i] = dictSortIdx[i];
+ }
+
+ Map<String, int[]> dictOrNoSortInfoMap = new HashMap<>();
+ dictOrNoSortInfoMap.put("columnIdxBasedOnSchemaInRow",
columnIdxBasedOnSchemaInRow);
+ dictOrNoSortInfoMap.put("noDictSortIdxBasedOnSchemaInRow",
noDictSortIdxBasedOnSchemaInRow);
+ dictOrNoSortInfoMap.put("dictSortIdxBasedOnSchemaInRow",
dictSortIdxBasedOnSchemaInRow);
+ return dictOrNoSortInfoMap;
}
/**
diff --git
a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
index 46c7e7b..f1d4e17 100644
---
a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
+++
b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
@@ -86,25 +86,20 @@ public class FileMergeSortComparatorTest {
&& noDictDataTypes[1].equals(DataTypes.STRING) && noDictDataTypes[2]
.equals(DataTypes.LONG));
- // test getSortColSchemaOrderMapping
- Map<Integer, List<Boolean>> sortColSchemaOrderMapping =
- CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable);
- assert (sortColSchemaOrderMapping.get(0).get(0).equals(false) &&
sortColSchemaOrderMapping
- .get(0).get(1).equals(false));
- assert (sortColSchemaOrderMapping.get(1).get(0).equals(true) &&
sortColSchemaOrderMapping.get(1)
- .get(1).equals(false));
- assert (sortColSchemaOrderMapping.get(2).get(0).equals(true) &&
sortColSchemaOrderMapping
- .get(2).get(1).equals(false));
- assert (sortColSchemaOrderMapping.get(3).get(0).equals(true) &&
sortColSchemaOrderMapping.get(3)
- .get(1).equals(true));
-
// test comparator
- boolean[] isSortColNoDict = { true, false };
- int[] columnIdxBasedOnSchemaInRow =
+ Map<String, int[]> columnIdxMap =
CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(carbonTable);
+ int[] columnIdxBasedOnSchemaInRows =
columnIdxMap.get("columnIdxBasedOnSchemaInRow");
+ int[] noDictSortIdxBasedOnSchemaInRows =
columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow");
+ int[] dictSortIdxBasedOnSchemaInRows =
columnIdxMap.get("dictSortIdxBasedOnSchemaInRow");
+
+ assert (noDictSortIdxBasedOnSchemaInRows.length == 2 &&
noDictSortIdxBasedOnSchemaInRows[0] == 1
+ && noDictSortIdxBasedOnSchemaInRows[1] == 2);
+ assert (dictSortIdxBasedOnSchemaInRows.length == 1 &&
dictSortIdxBasedOnSchemaInRows[0] == 0);
+
FileMergeSortComparator comparator =
- new FileMergeSortComparator(isSortColNoDict, noDictDataTypes,
columnIdxBasedOnSchemaInRow,
- sortColSchemaOrderMapping);
+ new FileMergeSortComparator(noDictDataTypes,
columnIdxBasedOnSchemaInRows,
+ noDictSortIdxBasedOnSchemaInRows, dictSortIdxBasedOnSchemaInRows);
// prepare data for final sort
int[] dictSortDims1 = { 1 };