This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 75417fe [CARBONDATA-3552]Fix dataload failure for column added in 1.1
which is a sort column in latest version
75417fe is described below
commit 75417fec2248a5be6af44ce186dadaf84842f9eb
Author: akashrn5 <[email protected]>
AuthorDate: Mon Oct 21 11:29:00 2019 +0530
[CARBONDATA-3552]Fix dataload failure for column added in 1.1 which is a
sort column in latest version
Problem:
When dimension is added in old version like carbon1.1, by default it will
be sort column. In sort step we assume data will be coming as sort column in
begining. But the added column is at last eventhogh sort column. So mapping for
comparator and actual data mismatch happens as carbon row still contains the
added column data at last. So data comparision fails and data load fails.
Solution:
While building the dataload configuration for loading data, rearrange the
columns in orer to bring the sort column to beginning and non sort to last. So
the carbonRow will be correct. But before the final merge setp, we update the
sortRow based on schema, so again the sort column comes at end, so In final
merger step, added one more comparator class, which will have the information
about actual index of data in the Carbon row. Which helps to solve this issue.
This closes #3410
---
.../processing/loading/DataLoadProcessBuilder.java | 24 +++++-
.../unsafe/holder/UnsafeFinalMergePageHolder.java | 13 +--
.../sort/unsafe/holder/UnsafeInmemoryHolder.java | 13 +--
.../holder/UnsafeSortTempFileChunkHolder.java | 15 +++-
.../merger/UnsafeIntermediateFileMerger.java | 2 +-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 7 +-
.../sort/sortdata/FileMergeSortComparator.java | 96 ++++++++++++++++++++++
.../processing/sort/sortdata/SortParameters.java | 18 ++++
.../sort/sortdata/SortTempFileChunkHolder.java | 14 +++-
.../processing/sort/sortdata/TableFieldStat.java | 46 ++++++++++-
.../processing/util/CarbonDataProcessorUtil.java | 34 +++++++-
11 files changed, 254 insertions(+), 28 deletions(-)
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 520943e..233b4cd 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -302,7 +302,8 @@ public final class DataLoadProcessBuilder {
dataFields.add(new DataField(column));
}
}
- configuration.setDataFields(dataFields.toArray(new
DataField[dataFields.size()]));
+ configuration.setDataFields(
+ updateDataFieldsBasedOnSortColumns(dataFields).toArray(new
DataField[dataFields.size()]));
configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName()));
// configuration for one pass load: dictionary server info
configuration.setUseOnePass(loadModel.getUseOnePass());
@@ -399,4 +400,25 @@ public final class DataLoadProcessBuilder {
CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_FIELD_DELIMITER);
configuration.setSortColumnRangeInfo(sortColumnRangeInfo);
}
+
+ /**
+ * This method rearrange the data fields where all the sort columns are
added at first. Because
+ * if the column gets added in old version like carbon1.1, it will be added
at last, so if it is
+ * sort column, bring it to first.
+ */
+ private static List<DataField>
updateDataFieldsBasedOnSortColumns(List<DataField> dataFields) {
+ List<DataField> updatedDataFields = new ArrayList<>();
+ List<DataField> sortFields = new ArrayList<>();
+ List<DataField> nonSortFields = new ArrayList<>();
+ for (DataField dataField : dataFields) {
+ if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+ sortFields.add(dataField);
+ } else {
+ nonSortFields.add(dataField);
+ }
+ }
+ updatedDataFields.addAll(sortFields);
+ updatedDataFields.addAll(nonSortFields);
+ return updatedDataFields;
+ }
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 896af60..af0e431 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -17,12 +17,15 @@
package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+import java.util.Comparator;
+
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.FileMergeSortComparator;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
import org.apache.log4j.Logger;
@@ -41,14 +44,14 @@ public class UnsafeFinalMergePageHolder implements
SortTempChunkHolder {
private UnsafeCarbonRowPage[] rowPages;
- private IntermediateSortTempRowComparator comparator;
+ private Comparator<IntermediateSortTempRow> comparator;
private IntermediateSortTempRow currentRow;
private DataType[] noDictDataType;
public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger
merger,
- boolean[] noDictSortColumnMapping) {
+ TableFieldStat tableFieldStat) {
this.actualSize = merger.getEntryCount();
this.mergedAddresses = merger.getMergedAddresses();
this.rowPageIndexes = merger.getRowPageIndexes();
@@ -58,8 +61,8 @@ public class UnsafeFinalMergePageHolder implements
SortTempChunkHolder {
}
this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
LOGGER.info("Processing unsafe inmemory rows page with size : " +
actualSize);
- this.comparator =
- new IntermediateSortTempRowComparator(noDictSortColumnMapping,
noDictDataType);
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+ tableFieldStat.getNoDictDataType(),
tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
}
public boolean hasNext() {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index e5680de..b8dd98f 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -17,10 +17,12 @@
package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+import java.util.Comparator;
+
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.FileMergeSortComparator;
import org.apache.log4j.Logger;
@@ -39,15 +41,16 @@ public class UnsafeInmemoryHolder implements
SortTempChunkHolder {
private long address;
- private IntermediateSortTempRowComparator comparator;
+ private Comparator<IntermediateSortTempRow> comparator;
public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
this.actualSize = rowPage.getBuffer().getActualSize();
this.rowPage = rowPage;
LOGGER.info("Processing unsafe inmemory rows page with size : " +
actualSize);
- this.comparator = new IntermediateSortTempRowComparator(
- rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
- rowPage.getTableFieldStat().getNoDictDataType());
+ this.comparator =
+ new
FileMergeSortComparator(rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
+ rowPage.getTableFieldStat().getNoDictDataType(),
+
rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping());
this.rowPage.setReadConvertedNoSortField();
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 7fcfc0e..c3d9038 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -36,6 +36,7 @@ import
org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.SortTempRowUpdater;
import
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.FileMergeSortComparator;
import
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -105,18 +106,24 @@ public class UnsafeSortTempFileChunkHolder implements
SortTempChunkHolder {
* Constructor to initialize
*/
public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters
parameters,
- boolean convertNoSortFields) {
+ boolean convertNoSortFields, TableFieldStat tableFieldStat) {
// set temp file
this.tempFile = tempFile;
this.readBufferSize = parameters.getBufferSize();
this.compressorName = parameters.getSortTempCompressorName();
- this.tableFieldStat = new TableFieldStat(parameters);
+ this.tableFieldStat = tableFieldStat;
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.executorService = Executors.newFixedThreadPool(1);
- comparator = new
IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
- parameters.getNoDictDataType());
this.convertNoSortFields = convertNoSortFields;
this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
+ if (!this.convertNoSortFields) {
+ comparator = new
IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
+ parameters.getNoDictDataType());
+ } else {
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+ tableFieldStat.getNoDictDataType(),
+ tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+ }
initialize();
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index f7e38b3..fcb69ea 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -211,7 +211,7 @@ public class UnsafeIntermediateFileMerger implements
Callable<Void> {
for (File tempFile : intermediateFiles) {
// create chunk holder
sortTempFileChunkHolder =
- new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false);
+ new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false,
tableFieldStat);
sortTempFileChunkHolder.readRow();
this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index e7cadec..2d0fbd3 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -38,6 +38,7 @@ import
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMe
import
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryHolder;
import
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
import org.apache.log4j.Logger;
@@ -117,7 +118,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends
CarbonIterator<Objec
// create record holder heap
createRecordHolderQueue();
-
+ TableFieldStat tableFieldStat = new TableFieldStat(parameters);
// iterate over file list and create chunk holder and add to heap
LOGGER.info("Started adding first record from each page");
for (final UnsafeCarbonRowPage rowPage : rowPages) {
@@ -133,7 +134,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends
CarbonIterator<Objec
for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeFinalMergePageHolder(merger,
parameters.getNoDictionarySortColumn());
+ new UnsafeFinalMergePageHolder(merger, tableFieldStat);
// initialize
sortTempFileChunkHolder.readRow();
@@ -144,7 +145,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends
CarbonIterator<Objec
for (final File file : filesToMergeSort) {
SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeSortTempFileChunkHolder(file, parameters, true);
+ new UnsafeSortTempFileChunkHolder(file, parameters, true,
tableFieldStat);
// initialize
sortTempFileChunkHolder.readRow();
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
new file mode 100644
index 0000000..1724262
--- /dev/null
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+public class FileMergeSortComparator implements
Comparator<IntermediateSortTempRow> {
+
+ private boolean[] isSortColumnNoDictionary;
+
+ private DataType[] noDicSortDataTypes;
+
+ /**
+ * Index of the no dict Sort columns in the carbonRow for final merge step
of sorting.
+ */
+ private int[] noDictPrimitiveIndex;
+
+ /**
+ * Comparator for IntermediateSortTempRow for compatibility cases where
column added in old
+ * version and it is sort column
+ * @param isSortColumnNoDictionary isSortColumnNoDictionary
+ */
+ public FileMergeSortComparator(boolean[] isSortColumnNoDictionary,
DataType[] noDicSortDataTypes,
+ int[] columnIdBasedOnSchemaInRow) {
+ this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+ this.noDicSortDataTypes = noDicSortDataTypes;
+ this.noDictPrimitiveIndex = columnIdBasedOnSchemaInRow;
+ }
+
+ @Override public int compare(IntermediateSortTempRow rowA,
IntermediateSortTempRow rowB) {
+ int diff = 0;
+ int dictIndex = 0;
+ int nonDictIndex = 0;
+ int noDicTypeIdx = 0;
+ int schemaRowIdx = 0;
+
+ for (boolean isNoDictionary : isSortColumnNoDictionary) {
+
+ if (isNoDictionary) {
+ if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) {
+ // use data types based comparator for the no dictionary measure
columns
+ SerializableComparator comparator =
org.apache.carbondata.core.util.comparator.Comparator
+ .getComparator(noDicSortDataTypes[noDicTypeIdx]);
+ int difference = comparator
+
.compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
+
rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
+ schemaRowIdx++;
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
+ byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
+
+ int difference =
ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ }
+ nonDictIndex++;
+ noDicTypeIdx++;
+ } else {
+ int dimFieldA = rowA.getDictSortDims()[dictIndex];
+ int dimFieldB = rowB.getDictSortDims()[dictIndex];
+ dictIndex++;
+
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+ return diff;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index ffc7416..e7a0173 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -153,6 +153,11 @@ public class SortParameters implements Serializable {
private int[] noDictActualPosition;
+ /**
+ * Index of the no dict Sort columns in the carbonRow for final merge step
of sorting.
+ */
+ private int[] noDictSortColumnSchemaOrderMapping;
+
public SortParameters getCopy() {
SortParameters parameters = new SortParameters();
parameters.tempFileLocation = tempFileLocation;
@@ -191,6 +196,7 @@ public class SortParameters implements Serializable {
parameters.isUpdateNonDictDims = isUpdateNonDictDims;
parameters.dictDimActualPosition = dictDimActualPosition;
parameters.noDictActualPosition = noDictActualPosition;
+ parameters.noDictSortColumnSchemaOrderMapping =
noDictSortColumnSchemaOrderMapping;
return parameters;
}
@@ -402,6 +408,14 @@ public class SortParameters implements Serializable {
return carbonTable;
}
+ int[] getNoDictSortColumnSchemaOrderMapping() {
+ return noDictSortColumnSchemaOrderMapping;
+ }
+
+ private void setNoDictSortColumnSchemaOrderMapping(int[]
noDictSortColumnSchemaOrderMapping) {
+ this.noDictSortColumnSchemaOrderMapping =
noDictSortColumnSchemaOrderMapping;
+ }
+
public static SortParameters
createSortParameters(CarbonDataLoadConfiguration configuration) {
SortParameters parameters = new SortParameters();
@@ -431,6 +445,8 @@ public class SortParameters implements Serializable {
parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
.getNoDictSortColMapping(parameters.getCarbonTable()));
+ parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+ .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
parameters.setSortColumn(configuration.getSortColumnMapping());
parameters.setObserver(new SortObserver());
// get sort buffer size
@@ -573,6 +589,8 @@ public class SortParameters implements Serializable {
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
.getNoDictSortColMapping(parameters.getCarbonTable()));
+ parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+ .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
TableSpec tableSpec = new TableSpec(carbonTable);
parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 9e9bac1..5dfce20 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -105,9 +105,8 @@ public class SortTempFileChunkHolder implements
Comparable<SortTempFileChunkHold
public SortTempFileChunkHolder(SortParameters sortParameters) {
this.tableFieldStat = new TableFieldStat(sortParameters);
- this.comparator =
- new
IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
- tableFieldStat.getNoDictDataType());
+ this.comparator = new
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+ tableFieldStat.getNoDictDataType(),
tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
}
@@ -130,6 +129,15 @@ public class SortTempFileChunkHolder implements
Comparable<SortTempFileChunkHold
.newFixedThreadPool(1, new
CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName,
true));
this.convertToActualField = convertToActualField;
+ if (this.convertToActualField) {
+ this.comparator = new FileMergeSortComparator(
+ tableFieldStat.getIsSortColNoDictFlags(),
tableFieldStat.getNoDictDataType(),
+ tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+ } else {
+ this.comparator =
+ new
IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
+ tableFieldStat.getNoDictDataType());
+ }
}
/**
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 9553bc9..63d0525 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.processing.sort.sortdata;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -71,6 +72,11 @@ public class TableFieldStat implements Serializable {
private SortTempRowUpdater sortTempRowUpdater;
+ /**
+ * Index of the no dict Sort columns in the carbonRow for final merge step
of sorting.
+ */
+ private int[] noDictSortColumnSchemaOrderMapping;
+
public TableFieldStat(SortParameters sortParameters) {
int noDictDimCnt = sortParameters.getNoDictionaryCount();
int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
@@ -105,6 +111,8 @@ public class TableFieldStat implements Serializable {
this.complexDimIdx = new int[complexDimCnt];
this.varcharDimIdx = new int[varcharDimCnt];
this.measureIdx = new int[measureCnt];
+ this.noDictSortColumnSchemaOrderMapping =
+ sortParameters.getNoDictSortColumnSchemaOrderMapping();
int tmpNoDictSortCnt = 0;
int tmpNoDictNoSortCnt = 0;
@@ -115,9 +123,9 @@ public class TableFieldStat implements Serializable {
List<CarbonDimension> allDimensions =
sortParameters.getCarbonTable().getDimensionByTableName(sortParameters.getTableName());
-
- for (int i = 0; i < allDimensions.size(); i++) {
- CarbonDimension carbonDimension = allDimensions.get(i);
+ List<CarbonDimension> updatedDimensions =
updateDimensionsBasedOnSortColumns(allDimensions);
+ for (int i = 0; i < updatedDimensions.size(); i++) {
+ CarbonDimension carbonDimension = updatedDimensions.get(i);
if (carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.isComplex()) {
if (carbonDimension.isSortColumn()) {
dictSortDimIdx[tmpDictSortCnt++] = i;
@@ -140,7 +148,7 @@ public class TableFieldStat implements Serializable {
dictNoSortDimCnt = tmpDictNoSortCnt;
noDictNoSortDimCnt = tmpNoDictNoSortCnt;
- int base = allDimensions.size();
+ int base = updatedDimensions.size();
// indices for measure columns
for (int i = 0; i < measureCnt; i++) {
@@ -223,6 +231,10 @@ public class TableFieldStat implements Serializable {
return measureIdx;
}
+ public int[] getNoDictSortColumnSchemaOrderMapping() {
+ return noDictSortColumnSchemaOrderMapping;
+ }
+
@Override public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TableFieldStat)) return false;
@@ -257,4 +269,30 @@ public class TableFieldStat implements Serializable {
public SortTempRowUpdater getSortTempRowUpdater() {
return sortTempRowUpdater;
}
+
+ private static List<CarbonDimension> updateDimensionsBasedOnSortColumns(
+ List<CarbonDimension> carbonDimensions) {
+ return getCarbonDimensions(carbonDimensions);
+ }
+
+ /**
+ * This method rearrange the dimensions where all the sort columns are added
at first. Because
+ * if the column gets added in old version like carbon1.1, it will be added
at last, so if it is
+ * sort column, this method will bring it to first.
+ */
+ private static List<CarbonDimension>
getCarbonDimensions(List<CarbonDimension> carbonDimensions) {
+ List<CarbonDimension> updatedDataFields = new ArrayList<>();
+ List<CarbonDimension> sortFields = new ArrayList<>();
+ List<CarbonDimension> nonSortFields = new ArrayList<>();
+ for (CarbonDimension carbonDimension : carbonDimensions) {
+ if (carbonDimension.getColumnSchema().isSortColumn()) {
+ sortFields.add(carbonDimension);
+ } else {
+ nonSortFields.add(carbonDimension);
+ }
+ }
+ updatedDataFields.addAll(sortFields);
+ updatedDataFields.addAll(nonSortFields);
+ return updatedDataFields;
+ }
}
\ No newline at end of file
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 588d4ac..17ff811 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -439,15 +439,45 @@ public final class CarbonDataProcessorUtil {
}
}
}
- Boolean[] mapping = noDicSortColMap.toArray(new
Boolean[noDicSortColMap.size()]);
+ Boolean[] mapping = noDicSortColMap.toArray(new Boolean[0]);
boolean[] noDicSortColMapping = new boolean[mapping.length];
for (int i = 0; i < mapping.length; i++) {
- noDicSortColMapping[i] = mapping[i].booleanValue();
+ noDicSortColMapping[i] = mapping[i];
}
return noDicSortColMapping;
}
/**
+ * If the dimension is added in older version 1.1, by default it will be
sort column, So during
+ * initial sorting, carbonrow will be in order where added sort column is at
the beginning, But
+ * before final merger of sort, the data should be in schema order
+ * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the
carbonRow in schema
+ * order), so This method helps to find the index of no dictionary sort
column in the carbonrow
+ * data.
+ */
+ public static int[] getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable) {
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
+ List<Integer> noDicSortColMap = new ArrayList<>();
+ int counter = 0;
+ for (CarbonDimension dimension : dimensions) {
+ if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+ continue;
+ }
+ if (dimension.isSortColumn() &&
DataTypeUtil.isPrimitiveColumn(dimension.getDataType())) {
+ noDicSortColMap.add(counter);
+ }
+ counter++;
+ }
+ Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
+ int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
+ for (int i = 0; i < mapping.length; i++) {
+ columnIdxBasedOnSchemaInRow[i] = mapping[i];
+ }
+ return columnIdxBasedOnSchemaInRow;
+ }
+
+ /**
* Get the data types of the no dictionary sort columns
*
* @param carbonTable