This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 75417fe  [CARBONDATA-3552]Fix dataload failure for column added in 1.1 
which is a sort column in latest version
75417fe is described below

commit 75417fec2248a5be6af44ce186dadaf84842f9eb
Author: akashrn5 <[email protected]>
AuthorDate: Mon Oct 21 11:29:00 2019 +0530

    [CARBONDATA-3552]Fix dataload failure for column added in 1.1 which is a 
sort column in latest version
    
    Problem:
    
    When dimension is added in old version like carbon1.1, by default it will 
be sort column. In sort step we assume data will be coming as sort column in 
begining. But the added column is at last eventhogh sort column. So mapping for 
comparator and actual data mismatch happens as carbon row still contains the 
added column data at last. So data comparision fails and data load fails.
    
    Solution:
    
    While building the dataload configuration for loading data, rearrange the 
columns in orer to bring the sort column to beginning and non sort to last. So 
the carbonRow will be correct. But before the final merge setp, we update the 
sortRow based on schema, so again the sort column comes at end, so In final 
merger step, added one more comparator class, which will have the information 
about actual index of data in the Carbon row. Which helps to solve this issue.
    
    This closes #3410
---
 .../processing/loading/DataLoadProcessBuilder.java | 24 +++++-
 .../unsafe/holder/UnsafeFinalMergePageHolder.java  | 13 +--
 .../sort/unsafe/holder/UnsafeInmemoryHolder.java   | 13 +--
 .../holder/UnsafeSortTempFileChunkHolder.java      | 15 +++-
 .../merger/UnsafeIntermediateFileMerger.java       |  2 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java    |  7 +-
 .../sort/sortdata/FileMergeSortComparator.java     | 96 ++++++++++++++++++++++
 .../processing/sort/sortdata/SortParameters.java   | 18 ++++
 .../sort/sortdata/SortTempFileChunkHolder.java     | 14 +++-
 .../processing/sort/sortdata/TableFieldStat.java   | 46 ++++++++++-
 .../processing/util/CarbonDataProcessorUtil.java   | 34 +++++++-
 11 files changed, 254 insertions(+), 28 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 520943e..233b4cd 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -302,7 +302,8 @@ public final class DataLoadProcessBuilder {
         dataFields.add(new DataField(column));
       }
     }
-    configuration.setDataFields(dataFields.toArray(new 
DataField[dataFields.size()]));
+    configuration.setDataFields(
+        updateDataFieldsBasedOnSortColumns(dataFields).toArray(new 
DataField[dataFields.size()]));
     
configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName()));
     // configuration for one pass load: dictionary server info
     configuration.setUseOnePass(loadModel.getUseOnePass());
@@ -399,4 +400,25 @@ public final class DataLoadProcessBuilder {
         CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_FIELD_DELIMITER);
     configuration.setSortColumnRangeInfo(sortColumnRangeInfo);
   }
+
+  /**
+   * This method rearrange the data fields where all the sort columns are 
added at first. Because
+   * if the column gets added in old version like carbon1.1, it will be added 
at last, so if it is
+   * sort column, bring it to first.
+   */
+  private static List<DataField> 
updateDataFieldsBasedOnSortColumns(List<DataField> dataFields) {
+    List<DataField> updatedDataFields = new ArrayList<>();
+    List<DataField> sortFields = new ArrayList<>();
+    List<DataField> nonSortFields = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+        sortFields.add(dataField);
+      } else {
+        nonSortFields.add(dataField);
+      }
+    }
+    updatedDataFields.addAll(sortFields);
+    updatedDataFields.addAll(nonSortFields);
+    return updatedDataFields;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 896af60..af0e431 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -17,12 +17,15 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
+import java.util.Comparator;
+
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import 
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.FileMergeSortComparator;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 import org.apache.log4j.Logger;
 
@@ -41,14 +44,14 @@ public class UnsafeFinalMergePageHolder implements 
SortTempChunkHolder {
 
   private UnsafeCarbonRowPage[] rowPages;
 
-  private IntermediateSortTempRowComparator comparator;
+  private Comparator<IntermediateSortTempRow> comparator;
 
   private IntermediateSortTempRow currentRow;
 
   private DataType[] noDictDataType;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger 
merger,
-      boolean[] noDictSortColumnMapping) {
+      TableFieldStat tableFieldStat) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
@@ -58,8 +61,8 @@ public class UnsafeFinalMergePageHolder implements 
SortTempChunkHolder {
     }
     this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
     LOGGER.info("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator =
-        new IntermediateSortTempRowComparator(noDictSortColumnMapping, 
noDictDataType);
+    this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+        tableFieldStat.getNoDictDataType(), 
tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
   }
 
   public boolean hasNext() {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index e5680de..b8dd98f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -17,10 +17,12 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
+import java.util.Comparator;
+
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import 
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.FileMergeSortComparator;
 
 import org.apache.log4j.Logger;
 
@@ -39,15 +41,16 @@ public class UnsafeInmemoryHolder implements 
SortTempChunkHolder {
 
   private long address;
 
-  private IntermediateSortTempRowComparator comparator;
+  private Comparator<IntermediateSortTempRow> comparator;
 
   public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.info("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator = new IntermediateSortTempRowComparator(
-        rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
-        rowPage.getTableFieldStat().getNoDictDataType());
+    this.comparator =
+        new 
FileMergeSortComparator(rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
+            rowPage.getTableFieldStat().getNoDictDataType(),
+            
rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping());
     this.rowPage.setReadConvertedNoSortField();
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 7fcfc0e..c3d9038 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -36,6 +36,7 @@ import 
org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.SortTempRowUpdater;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.FileMergeSortComparator;
 import 
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -105,18 +106,24 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
    * Constructor to initialize
    */
   public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters 
parameters,
-      boolean convertNoSortFields) {
+      boolean convertNoSortFields, TableFieldStat tableFieldStat) {
     // set temp file
     this.tempFile = tempFile;
     this.readBufferSize = parameters.getBufferSize();
     this.compressorName = parameters.getSortTempCompressorName();
-    this.tableFieldStat = new TableFieldStat(parameters);
+    this.tableFieldStat = tableFieldStat;
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.executorService = Executors.newFixedThreadPool(1);
-    comparator = new 
IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
-        parameters.getNoDictDataType());
     this.convertNoSortFields = convertNoSortFields;
     this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
+    if (!this.convertNoSortFields) {
+      comparator = new 
IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
+          parameters.getNoDictDataType());
+    } else {
+      this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+          tableFieldStat.getNoDictDataType(),
+          tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+    }
     initialize();
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index f7e38b3..fcb69ea 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -211,7 +211,7 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
     for (File tempFile : intermediateFiles) {
       // create chunk holder
       sortTempFileChunkHolder =
-          new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false);
+          new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false, 
tableFieldStat);
 
       sortTempFileChunkHolder.readRow();
       this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index e7cadec..2d0fbd3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -38,6 +38,7 @@ import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMe
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryHolder;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 import org.apache.log4j.Logger;
 
@@ -117,7 +118,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
 
       // create record holder heap
       createRecordHolderQueue();
-
+      TableFieldStat tableFieldStat = new TableFieldStat(parameters);
       // iterate over file list and create chunk holder and add to heap
       LOGGER.info("Started adding first record from each page");
       for (final UnsafeCarbonRowPage rowPage : rowPages) {
@@ -133,7 +134,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
       for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, 
parameters.getNoDictionarySortColumn());
+            new UnsafeFinalMergePageHolder(merger, tableFieldStat);
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -144,7 +145,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
       for (final File file : filesToMergeSort) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeSortTempFileChunkHolder(file, parameters, true);
+            new UnsafeSortTempFileChunkHolder(file, parameters, true, 
tableFieldStat);
 
         // initialize
         sortTempFileChunkHolder.readRow();
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
new file mode 100644
index 0000000..1724262
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+public class FileMergeSortComparator implements 
Comparator<IntermediateSortTempRow> {
+
+  private boolean[] isSortColumnNoDictionary;
+
+  private DataType[] noDicSortDataTypes;
+
+  /**
+   * Index of the no dict Sort columns in the carbonRow for final merge step 
of sorting.
+   */
+  private int[] noDictPrimitiveIndex;
+
+  /**
+   * Comparator for IntermediateSortTempRow for compatibility cases where 
column added in old
+   * version and it is sort column
+   * @param isSortColumnNoDictionary isSortColumnNoDictionary
+   */
+  public FileMergeSortComparator(boolean[] isSortColumnNoDictionary, 
DataType[] noDicSortDataTypes,
+      int[] columnIdBasedOnSchemaInRow) {
+    this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+    this.noDicSortDataTypes = noDicSortDataTypes;
+    this.noDictPrimitiveIndex = columnIdBasedOnSchemaInRow;
+  }
+
+  @Override public int compare(IntermediateSortTempRow rowA, 
IntermediateSortTempRow rowB) {
+    int diff = 0;
+    int dictIndex = 0;
+    int nonDictIndex = 0;
+    int noDicTypeIdx = 0;
+    int schemaRowIdx = 0;
+
+    for (boolean isNoDictionary : isSortColumnNoDictionary) {
+
+      if (isNoDictionary) {
+        if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) {
+          // use data types based comparator for the no dictionary measure 
columns
+          SerializableComparator comparator = 
org.apache.carbondata.core.util.comparator.Comparator
+              .getComparator(noDicSortDataTypes[noDicTypeIdx]);
+          int difference = comparator
+              
.compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
+                  
rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
+          schemaRowIdx++;
+          if (difference != 0) {
+            return difference;
+          }
+        } else {
+          byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
+          byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
+
+          int difference = 
ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+          if (difference != 0) {
+            return difference;
+          }
+        }
+        nonDictIndex++;
+        noDicTypeIdx++;
+      } else {
+        int dimFieldA = rowA.getDictSortDims()[dictIndex];
+        int dimFieldB = rowB.getDictSortDims()[dictIndex];
+        dictIndex++;
+
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+    }
+    return diff;
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index ffc7416..e7a0173 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -153,6 +153,11 @@ public class SortParameters implements Serializable {
 
   private int[] noDictActualPosition;
 
+  /**
+   * Index of the no dict Sort columns in the carbonRow for final merge step 
of sorting.
+   */
+  private int[] noDictSortColumnSchemaOrderMapping;
+
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
     parameters.tempFileLocation = tempFileLocation;
@@ -191,6 +196,7 @@ public class SortParameters implements Serializable {
     parameters.isUpdateNonDictDims = isUpdateNonDictDims;
     parameters.dictDimActualPosition = dictDimActualPosition;
     parameters.noDictActualPosition = noDictActualPosition;
+    parameters.noDictSortColumnSchemaOrderMapping = 
noDictSortColumnSchemaOrderMapping;
     return parameters;
   }
 
@@ -402,6 +408,14 @@ public class SortParameters implements Serializable {
     return carbonTable;
   }
 
+  int[] getNoDictSortColumnSchemaOrderMapping() {
+    return noDictSortColumnSchemaOrderMapping;
+  }
+
+  private void setNoDictSortColumnSchemaOrderMapping(int[] 
noDictSortColumnSchemaOrderMapping) {
+    this.noDictSortColumnSchemaOrderMapping = 
noDictSortColumnSchemaOrderMapping;
+  }
+
 
   public static SortParameters 
createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
@@ -431,6 +445,8 @@ public class SortParameters implements Serializable {
     
parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
         .getNoDictSortColMapping(parameters.getCarbonTable()));
+    parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+        .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
     parameters.setSortColumn(configuration.getSortColumnMapping());
     parameters.setObserver(new SortObserver());
     // get sort buffer size
@@ -573,6 +589,8 @@ public class SortParameters implements Serializable {
     
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
         .getNoDictSortColMapping(parameters.getCarbonTable()));
+    parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+        .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
     TableSpec tableSpec = new TableSpec(carbonTable);
     parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
     parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 9e9bac1..5dfce20 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -105,9 +105,8 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
 
   public SortTempFileChunkHolder(SortParameters sortParameters) {
     this.tableFieldStat = new TableFieldStat(sortParameters);
-    this.comparator =
-        new 
IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
-            tableFieldStat.getNoDictDataType());
+    this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+        tableFieldStat.getNoDictDataType(), 
tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
     this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
@@ -130,6 +129,15 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
         .newFixedThreadPool(1, new 
CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName,
                 true));
     this.convertToActualField = convertToActualField;
+    if (this.convertToActualField) {
+      this.comparator = new FileMergeSortComparator(
+          tableFieldStat.getIsSortColNoDictFlags(), 
tableFieldStat.getNoDictDataType(),
+          tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+    } else {
+      this.comparator =
+          new 
IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
+              tableFieldStat.getNoDictDataType());
+    }
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 9553bc9..63d0525 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -71,6 +72,11 @@ public class TableFieldStat implements Serializable {
 
   private SortTempRowUpdater sortTempRowUpdater;
 
+  /**
+   * Index of the no dict Sort columns in the carbonRow for final merge step 
of sorting.
+   */
+  private int[] noDictSortColumnSchemaOrderMapping;
+
   public TableFieldStat(SortParameters sortParameters) {
     int noDictDimCnt = sortParameters.getNoDictionaryCount();
     int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
@@ -105,6 +111,8 @@ public class TableFieldStat implements Serializable {
     this.complexDimIdx = new int[complexDimCnt];
     this.varcharDimIdx = new int[varcharDimCnt];
     this.measureIdx = new int[measureCnt];
+    this.noDictSortColumnSchemaOrderMapping =
+        sortParameters.getNoDictSortColumnSchemaOrderMapping();
 
     int tmpNoDictSortCnt = 0;
     int tmpNoDictNoSortCnt = 0;
@@ -115,9 +123,9 @@ public class TableFieldStat implements Serializable {
 
     List<CarbonDimension> allDimensions =
         
sortParameters.getCarbonTable().getDimensionByTableName(sortParameters.getTableName());
-
-    for (int i = 0; i < allDimensions.size(); i++) {
-      CarbonDimension carbonDimension = allDimensions.get(i);
+    List<CarbonDimension> updatedDimensions = 
updateDimensionsBasedOnSortColumns(allDimensions);
+    for (int i = 0; i < updatedDimensions.size(); i++) {
+      CarbonDimension carbonDimension = updatedDimensions.get(i);
       if (carbonDimension.hasEncoding(Encoding.DICTIONARY) && 
!carbonDimension.isComplex()) {
         if (carbonDimension.isSortColumn()) {
           dictSortDimIdx[tmpDictSortCnt++] = i;
@@ -140,7 +148,7 @@ public class TableFieldStat implements Serializable {
     dictNoSortDimCnt = tmpDictNoSortCnt;
     noDictNoSortDimCnt = tmpNoDictNoSortCnt;
 
-    int base = allDimensions.size();
+    int base = updatedDimensions.size();
 
     // indices for measure columns
     for (int i = 0; i < measureCnt; i++) {
@@ -223,6 +231,10 @@ public class TableFieldStat implements Serializable {
     return measureIdx;
   }
 
+  public int[] getNoDictSortColumnSchemaOrderMapping() {
+    return noDictSortColumnSchemaOrderMapping;
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) return true;
     if (!(o instanceof TableFieldStat)) return false;
@@ -257,4 +269,30 @@ public class TableFieldStat implements Serializable {
   public SortTempRowUpdater getSortTempRowUpdater() {
     return sortTempRowUpdater;
   }
+
+  private static List<CarbonDimension> updateDimensionsBasedOnSortColumns(
+      List<CarbonDimension> carbonDimensions) {
+    return getCarbonDimensions(carbonDimensions);
+  }
+
+  /**
+   * This method rearrange the dimensions where all the sort columns are added 
at first. Because
+   * if the column gets added in old version like carbon1.1, it will be added 
at last, so if it is
+   * sort column, this method will bring it to first.
+   */
+  private static List<CarbonDimension> 
getCarbonDimensions(List<CarbonDimension> carbonDimensions) {
+    List<CarbonDimension> updatedDataFields = new ArrayList<>();
+    List<CarbonDimension> sortFields = new ArrayList<>();
+    List<CarbonDimension> nonSortFields = new ArrayList<>();
+    for (CarbonDimension carbonDimension : carbonDimensions) {
+      if (carbonDimension.getColumnSchema().isSortColumn()) {
+        sortFields.add(carbonDimension);
+      } else {
+        nonSortFields.add(carbonDimension);
+      }
+    }
+    updatedDataFields.addAll(sortFields);
+    updatedDataFields.addAll(nonSortFields);
+    return updatedDataFields;
+  }
 }
\ No newline at end of file
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 588d4ac..17ff811 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -439,15 +439,45 @@ public final class CarbonDataProcessorUtil {
         }
       }
     }
-    Boolean[] mapping = noDicSortColMap.toArray(new 
Boolean[noDicSortColMap.size()]);
+    Boolean[] mapping = noDicSortColMap.toArray(new Boolean[0]);
     boolean[] noDicSortColMapping = new boolean[mapping.length];
     for (int i = 0; i < mapping.length; i++) {
-      noDicSortColMapping[i] = mapping[i].booleanValue();
+      noDicSortColMapping[i] = mapping[i];
     }
     return noDicSortColMapping;
   }
 
   /**
+   * If the dimension is added in older version 1.1, by default it will be 
sort column, So during
+   * initial sorting, carbonrow will be in order where added sort column is at 
the beginning, But
+   * before final merger of sort, the data should be in schema order
+   * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the 
carbonRow in schema
+   * order), so This method helps to find the index of no dictionary sort 
column in the carbonrow
+   * data.
+   */
+  public static int[] getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable) {
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    List<Integer> noDicSortColMap = new ArrayList<>();
+    int counter = 0;
+    for (CarbonDimension dimension : dimensions) {
+      if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+        continue;
+      }
+      if (dimension.isSortColumn() && 
DataTypeUtil.isPrimitiveColumn(dimension.getDataType())) {
+        noDicSortColMap.add(counter);
+      }
+      counter++;
+    }
+    Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
+    int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      columnIdxBasedOnSchemaInRow[i] = mapping[i];
+    }
+    return columnIdxBasedOnSchemaInRow;
+  }
+
+  /**
    * Get the data types of the no dictionary sort columns
    *
    * @param carbonTable

Reply via email to