This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 29ecd5f  [CARBONDATA-4077] Refactor and Fix Insert into partition 
issue with FileMergeSortComparator
29ecd5f is described below

commit 29ecd5fee7af97633b756bfd944251a18dc76c18
Author: Indhumathi27 <[email protected]>
AuthorDate: Sat Dec 5 19:35:58 2020 +0530

    [CARBONDATA-4077] Refactor and Fix Insert into partition issue with 
FileMergeSortComparator
    
    Why is this PR needed?
    From PR-3995 changes, insert into partition flow scenario is missed. Using 
Map for getting
    Dict/No-Dict sort column info during final sort task will affect load 
performance,
    if number of sort columns is more.
    
    What changes were proposed in this PR?
    Handled the insert into partition flow
    Refactored the code, to use list of only Dict/No-Dict sort column indexes 
instead of
    Map to fix performance issue.
    
    This closes #4039
---
 .../query/SecondaryIndexQueryResultProcessor.java  |   2 -
 .../unsafe/holder/UnsafeFinalMergePageHolder.java  |   6 +-
 .../sort/unsafe/holder/UnsafeInmemoryHolder.java   |   6 +-
 .../holder/UnsafeSortTempFileChunkHolder.java      |   6 +-
 .../merger/CompactionResultSortProcessor.java      |  11 +-
 .../sort/sortdata/FileMergeSortComparator.java     | 100 ++++++++--------
 .../processing/sort/sortdata/SortParameters.java   |  69 +++++++----
 .../sort/sortdata/SortTempFileChunkHolder.java     |  12 +-
 .../processing/sort/sortdata/TableFieldStat.java   |  27 +++--
 .../processing/util/CarbonDataProcessorUtil.java   | 127 ++++++++++++++-------
 .../sort/sortdata/FileMergeSortComparatorTest.java |  27 ++---
 11 files changed, 239 insertions(+), 154 deletions(-)

diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 4d045f0..41a5c43 100644
--- 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -528,8 +528,6 @@ public class SecondaryIndexQueryResultProcessor {
         CarbonCommonConstants.FILE_SEPARATOR, 
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     sortParameters.setNoDictionarySortColumn(
         CarbonDataProcessorUtil.getNoDictSortColMapping(indexTable));
-    sortParameters.setSortColumnSchemaOrderMap(
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(indexTable));
     finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation,
         indexTable.getTableName(), sortParameters);
   }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 717bb91..7abd3a9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -61,10 +61,10 @@ public class UnsafeFinalMergePageHolder implements 
SortTempChunkHolder {
     }
     this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
     LOGGER.info("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-        tableFieldStat.getNoDictSchemaDataType(),
+    this.comparator = new 
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
         tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-        tableFieldStat.getSortColSchemaOrderMap());
+        tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+        tableFieldStat.getDictSortColIdxSchemaOrderMapping());
   }
 
   public boolean hasNext() {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index a46811f..a21e802 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -48,10 +48,10 @@ public class UnsafeInmemoryHolder implements 
SortTempChunkHolder {
     this.rowPage = rowPage;
     LOGGER.info("Processing unsafe inmemory rows page with size : " + 
actualSize);
     this.comparator =
-        new 
FileMergeSortComparator(rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
-            rowPage.getTableFieldStat().getNoDictSchemaDataType(),
+        new 
FileMergeSortComparator(rowPage.getTableFieldStat().getNoDictSchemaDataType(),
             
rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping(),
-            rowPage.getTableFieldStat().getSortColSchemaOrderMap());
+            
rowPage.getTableFieldStat().getNoDictSortColIdxSchemaOrderMapping(),
+            rowPage.getTableFieldStat().getDictSortColIdxSchemaOrderMapping());
     this.rowPage.setReadConvertedNoSortField();
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 5144f57..77b19f4 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -120,10 +120,10 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
       comparator = new 
IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
           parameters.getNoDictDataType());
     } else {
-      this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-          tableFieldStat.getNoDictSchemaDataType(),
+      this.comparator = new 
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
           tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-          tableFieldStat.getSortColSchemaOrderMap());
+          tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+          tableFieldStat.getDictSortColIdxSchemaOrderMapping());
     }
     initialize();
   }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 9cfda1d..3f27eee 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -480,8 +481,14 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
     boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
         .getNoDictSortColMapping(carbonTable);
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
-    sortParameters.setSortColumnSchemaOrderMap(
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable));
+    Map<String, int[]> columnIdxMap = CarbonDataProcessorUtil
+        .getColumnIdxBasedOnSchemaInRow(carbonTable);
+    sortParameters.setNoDictSortColumnSchemaOrderMapping(
+        columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+    sortParameters.setNoDictSortColIdxSchemaOrderMapping(
+        columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+    sortParameters.setDictSortColIdxSchemaOrderMapping(
+        columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
     String[] sortTempFileLocation = 
CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, 
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
index 687af7b..337bf73 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
@@ -18,8 +18,6 @@
 package org.apache.carbondata.processing.sort.sortdata;
 
 import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -29,8 +27,6 @@ import 
org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 
 public class FileMergeSortComparator implements 
Comparator<IntermediateSortTempRow> {
 
-  private boolean[] isSortColumnNoDictionary;
-
   /**
    * Datatype of all the no-dictionary columns in the table in schema order
    */
@@ -42,21 +38,25 @@ public class FileMergeSortComparator implements 
Comparator<IntermediateSortTempR
   private int[] noDictPrimitiveIndex;
 
   /**
-   * Sort and Dictionary info of all the columns in schema order
+   * Index of the No-Dict Sort columns in schema order for final merge step of 
sorting.
+   */
+  private int[] noDictSortColIdxSchemaOrderMapping;
+
+  /**
+   * Index of the dict Sort columns in schema order for final merge step of 
sorting.
    */
-  private final Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
+  private int[] dictSortColIdxSchemaOrderMapping;
 
   /**
    * Comparator for IntermediateSortTempRow for compatibility cases where 
column added in old
    * version and it is sort column
-   * @param isSortColumnNoDictionary isSortColumnNoDictionary
    */
-  public FileMergeSortComparator(boolean[] isSortColumnNoDictionary, 
DataType[] noDictDataTypes,
-      int[] columnIdBasedOnSchemaInRow, Map<Integer, List<Boolean>> 
sortColumnSchemaOrderMap) {
-    this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+  public FileMergeSortComparator(DataType[] noDictDataTypes, int[] 
columnIdBasedOnSchemaInRow,
+      int[] noDictSortColIdxSchemaOrderMapping, int[] 
dictSortColIdxSchemaOrderMapping) {
     this.noDictDataTypes = noDictDataTypes;
     this.noDictPrimitiveIndex = columnIdBasedOnSchemaInRow;
-    this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+    this.noDictSortColIdxSchemaOrderMapping = 
noDictSortColIdxSchemaOrderMapping;
+    this.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
   }
 
   /**
@@ -68,8 +68,8 @@ public class FileMergeSortComparator implements 
Comparator<IntermediateSortTempR
    * 1. only Dictionary sort column data
    * 2. dictionary sort and dictionary no-sort column data
    * On this temp data row, we have to identify the sort column data and only 
compare those.
-   * From the sortColumnSchemaOrderMap, get the sort column and dict info. If 
the column
-   * is sort column, then perform the comparison, else increment the 
respective index.
+   * Use noDictSortColIdxSchemaOrderMapping and 
dictSortColIdxSchemaOrderMapping to get
+   * the indexes of sort column in schema order
    */
   @Override
   public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow 
rowB) {
@@ -78,51 +78,47 @@ public class FileMergeSortComparator implements 
Comparator<IntermediateSortTempR
     int nonDictIndex = 0;
     int noDicTypeIdx = 0;
     int schemaRowIdx = 0;
-    int sortIndex = 0;
-
-    for (Map.Entry<Integer, List<Boolean>> schemaEntry : 
sortColumnSchemaOrderMap.entrySet()) {
-      boolean isSortColumn = schemaEntry.getValue().get(0);
-      boolean isDictColumn = schemaEntry.getValue().get(1);
-      if (isSortColumn) {
-        if (isSortColumnNoDictionary[sortIndex++]) {
-          if (DataTypeUtil.isPrimitiveColumn(noDictDataTypes[noDicTypeIdx])) {
-            // use data types based comparator for the no dictionary measure 
columns
-            SerializableComparator comparator =
-                org.apache.carbondata.core.util.comparator.Comparator
-                    .getComparator(noDictDataTypes[noDicTypeIdx]);
-            int difference = comparator
-                
.compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
-                    
rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
-            schemaRowIdx++;
-            if (difference != 0) {
-              return difference;
-            }
-          } else {
-            byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
-            byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
 
-            int difference = 
ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-            if (difference != 0) {
-              return difference;
-            }
-          }
-        } else {
-          int dimFieldA = rowA.getDictSortDims()[dictIndex];
-          int dimFieldB = rowB.getDictSortDims()[dictIndex];
+    // compare no-Dict sort columns
+    for (int i = 0; i < noDictSortColIdxSchemaOrderMapping.length; i++) {
+      if (DataTypeUtil
+          
.isPrimitiveColumn(noDictDataTypes[noDictSortColIdxSchemaOrderMapping[noDicTypeIdx]]))
 {
+        // use data types based comparator for the no dictionary measure 
columns
+        SerializableComparator comparator = 
org.apache.carbondata.core.util.comparator.Comparator
+            
.getComparator(noDictDataTypes[noDictSortColIdxSchemaOrderMapping[noDicTypeIdx]]);
+        int difference = comparator
+            
.compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
+                rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
+        schemaRowIdx++;
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        byte[] byteArr1 =
+            (byte[]) 
rowA.getNoDictSortDims()[noDictSortColIdxSchemaOrderMapping[nonDictIndex]];
+        byte[] byteArr2 =
+            (byte[]) 
rowB.getNoDictSortDims()[noDictSortColIdxSchemaOrderMapping[nonDictIndex]];
 
-          diff = dimFieldA - dimFieldB;
-          if (diff != 0) {
-            return diff;
-          }
+        int difference = ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, 
byteArr2);
+        if (difference != 0) {
+          return difference;
         }
       }
-      if (isDictColumn) {
-        dictIndex++;
-      } else {
-        nonDictIndex++;
-        noDicTypeIdx++;
+      nonDictIndex++;
+      noDicTypeIdx++;
+    }
+    // compare dict sort columns
+    for (int i = 0; i < dictSortColIdxSchemaOrderMapping.length; i++) {
+      int dimFieldA = 
rowA.getDictSortDims()[dictSortColIdxSchemaOrderMapping[dictIndex]];
+      int dimFieldB = 
rowB.getDictSortDims()[dictSortColIdxSchemaOrderMapping[dictIndex]];
+      dictIndex++;
+
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
       }
     }
+
     return diff;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index fc93d52..e8946b9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.File;
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -108,9 +107,6 @@ public class SortParameters implements Serializable {
   // used while performing final sort of intermediate files
   private DataType[] noDictSchemaDataType;
 
-  // Sort and Dictionary info of all the columns in schema order, used in 
final sort
-  private Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
-
   /**
    * To know how many columns are of high cardinality.
    */
@@ -167,6 +163,16 @@ public class SortParameters implements Serializable {
    */
   private int[] noDictSortColumnSchemaOrderMapping;
 
+  /**
+   * Index of the no dict Sort columns in schema order used for final merge 
step of sorting.
+   */
+  private int[] noDictSortColIdxSchemaOrderMapping;
+
+  /**
+   * Index of the dict Sort columns in schema order for final merge step of 
sorting.
+   */
+  private int[] dictSortColIdxSchemaOrderMapping;
+
   private boolean isInsertWithoutReArrangeFlow;
 
   public SortParameters getCopy() {
@@ -209,7 +215,8 @@ public class SortParameters implements Serializable {
     parameters.noDictSortColumnSchemaOrderMapping = 
noDictSortColumnSchemaOrderMapping;
     parameters.isInsertWithoutReArrangeFlow = isInsertWithoutReArrangeFlow;
     parameters.noDictSchemaDataType = noDictSchemaDataType;
-    parameters.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+    parameters.noDictSortColIdxSchemaOrderMapping = 
noDictSortColIdxSchemaOrderMapping;
+    parameters.dictSortColIdxSchemaOrderMapping = 
dictSortColIdxSchemaOrderMapping;
     return parameters;
   }
 
@@ -417,7 +424,7 @@ public class SortParameters implements Serializable {
     return noDictSortColumnSchemaOrderMapping;
   }
 
-  private void setNoDictSortColumnSchemaOrderMapping(int[] 
noDictSortColumnSchemaOrderMapping) {
+  public void setNoDictSortColumnSchemaOrderMapping(int[] 
noDictSortColumnSchemaOrderMapping) {
     this.noDictSortColumnSchemaOrderMapping = 
noDictSortColumnSchemaOrderMapping;
   }
 
@@ -507,8 +514,14 @@ public class SortParameters implements Serializable {
       parameters.setInsertWithoutReArrangeFlow(true);
       parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
           
.getNoDictSortColMappingAsDataFieldOrder(configuration.getDataFields()));
-      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-          
.getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields()));
+      Map<String, int[]> columnIdxMap = CarbonDataProcessorUtil
+          
.getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields());
+      parameters.setNoDictSortColumnSchemaOrderMapping(
+          columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+      parameters.setNoDictSortColIdxSchemaOrderMapping(
+          columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+      parameters.setDictSortColIdxSchemaOrderMapping(
+          columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
       
parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
       parameters.setNoDictDataType(CarbonDataProcessorUtil
           .getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
@@ -525,13 +538,17 @@ public class SortParameters implements Serializable {
     } else {
       parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
           .getNoDictSortColMapping(parameters.getCarbonTable()));
-      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-          .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+      Map<String, int[]> columnIdxMap =
+          
CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable());
+      parameters
+          
.setNoDictSortColumnSchemaOrderMapping(columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+      parameters.setNoDictSortColIdxSchemaOrderMapping(
+          columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+      parameters
+          
.setDictSortColIdxSchemaOrderMapping(columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
       parameters.setMeasureDataType(configuration.getMeasureDataType());
       parameters.setNoDictDataType(CarbonDataProcessorUtil
           
.getNoDictSortDataTypes(configuration.getTableSpec().getCarbonTable()));
-      parameters.setSortColumnSchemaOrderMap(
-          
CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
       parameters.setNoDictSchemaDataType(
           CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
       Map<String, DataType[]> noDictSortAndNoSortDataTypes = 
CarbonDataProcessorUtil
@@ -627,12 +644,16 @@ public class SortParameters implements Serializable {
     
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
         .getNoDictSortColMapping(parameters.getCarbonTable()));
-    parameters.setSortColumnSchemaOrderMap(
-        
CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
     parameters.setNoDictSchemaDataType(
         CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
-    parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-        .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+    Map<String, int[]> columnIdxMap =
+        
CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable());
+    parameters
+        
.setNoDictSortColumnSchemaOrderMapping(columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+    parameters
+        
.setNoDictSortColIdxSchemaOrderMapping(columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+    parameters
+        
.setDictSortColIdxSchemaOrderMapping(columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
     TableSpec tableSpec = new TableSpec(carbonTable, false);
     parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
     parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
@@ -713,11 +734,19 @@ public class SortParameters implements Serializable {
     this.noDictSchemaDataType = noDictSchemaDataType;
   }
 
-  public void setSortColumnSchemaOrderMap(Map<Integer, List<Boolean>> 
sortColumnSchemaOrderMap) {
-    this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+  public int[] getNoDictSortColIdxSchemaOrderMapping() {
+    return noDictSortColIdxSchemaOrderMapping;
+  }
+
+  public void setNoDictSortColIdxSchemaOrderMapping(int[] 
noDictSortColIdxSchemaOrderMapping) {
+    this.noDictSortColIdxSchemaOrderMapping = 
noDictSortColIdxSchemaOrderMapping;
+  }
+
+  public int[] getDictSortColIdxSchemaOrderMapping() {
+    return dictSortColIdxSchemaOrderMapping;
   }
 
-  public Map<Integer, List<Boolean>> getSortColumnSchemaOrderMap() {
-    return sortColumnSchemaOrderMap;
+  public void setDictSortColIdxSchemaOrderMapping(int[] 
dictSortColIdxSchemaOrderMapping) {
+    this.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 84fb4d0..f4cfe69 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -105,10 +105,10 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
 
   public SortTempFileChunkHolder(SortParameters sortParameters) {
     this.tableFieldStat = new TableFieldStat(sortParameters);
-    this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-        tableFieldStat.getNoDictSchemaDataType(),
+    this.comparator = new 
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
         tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-        tableFieldStat.getSortColSchemaOrderMap());
+        tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+        tableFieldStat.getDictSortColIdxSchemaOrderMapping());
     this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
@@ -132,10 +132,10 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
                 true));
     this.convertToActualField = convertToActualField;
     if (this.convertToActualField) {
-      this.comparator = new 
FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-          tableFieldStat.getNoDictSchemaDataType(),
+      this.comparator = new 
FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
           tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-          tableFieldStat.getSortColSchemaOrderMap());
+          tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+          tableFieldStat.getDictSortColIdxSchemaOrderMapping());
     } else {
       this.comparator =
           new 
IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index df7b873..e317d56 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -83,7 +82,15 @@ public class TableFieldStat implements Serializable {
 
   private DataType[] noDictSchemaDataType;
 
-  private Map<Integer, List<Boolean>> sortColSchemaOrderMap;
+  /**
+   * Index of the no dict Sort columns in schema order used for final merge 
step of sorting.
+   */
+  private int[] noDictSortColIdxSchemaOrderMapping;
+
+  /**
+   * Index of the dict Sort columns in schema order for final merge step of 
sorting.
+   */
+  private int[] dictSortColIdxSchemaOrderMapping;
 
   public TableFieldStat(SortParameters sortParameters) {
     int noDictDimCnt = sortParameters.getNoDictionaryCount();
@@ -106,7 +113,9 @@ public class TableFieldStat implements Serializable {
     this.noDictSortDataType = sortParameters.getNoDictSortDataType();
     this.noDictNoSortDataType = sortParameters.getNoDictNoSortDataType();
     this.noDictSchemaDataType = sortParameters.getNoDictSchemaDataType();
-    this.sortColSchemaOrderMap = sortParameters.getSortColumnSchemaOrderMap();
+    this.noDictSortColIdxSchemaOrderMapping =
+        sortParameters.getNoDictSortColIdxSchemaOrderMapping();
+    this.dictSortColIdxSchemaOrderMapping = 
sortParameters.getDictSortColIdxSchemaOrderMapping();
     for (boolean flag : isVarcharDimFlags) {
       if (flag) {
         varcharDimCnt++;
@@ -359,11 +368,15 @@ public class TableFieldStat implements Serializable {
     return otherCols;
   }
 
-  public Map<Integer, List<Boolean>> getSortColSchemaOrderMap() {
-    return sortColSchemaOrderMap;
-  }
-
   public DataType[] getNoDictSchemaDataType() {
     return noDictSchemaDataType;
   }
+
+  public int[] getNoDictSortColIdxSchemaOrderMapping() {
+    return noDictSortColIdxSchemaOrderMapping;
+  }
+
+  public int[] getDictSortColIdxSchemaOrderMapping() {
+    return dictSortColIdxSchemaOrderMapping;
+  }
 }
\ No newline at end of file
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ceaa05c..d8f8673 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -441,38 +441,6 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * Get the sort/no_sort column map based on schema order.
-   * This will be used in the final sort step to find the index of sort 
column, to compare the
-   * intermediate row data based on schema.
-   */
-  public static Map<Integer, List<Boolean>> 
getSortColSchemaOrderMapping(CarbonTable carbonTable) {
-    List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
-    Map<Integer, List<Boolean>> sortColSchemaOrderMap = new HashMap<>();
-    for (CarbonDimension dimension : dimensions) {
-      List<Boolean> sortDictOrNoDictMap = new ArrayList<>();
-      // check if the dimension is sort column or not and add to first index 
of sortDictOrNoDictMap
-      // check if the dimension is dict column or not and add to second index 
of sortDictOrNoDictMap
-      if (dimension.isSortColumn()) {
-        sortDictOrNoDictMap.add(true);
-        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
-          sortDictOrNoDictMap.add(true);
-        } else {
-          sortDictOrNoDictMap.add(false);
-        }
-      } else {
-        sortDictOrNoDictMap.add(false);
-        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
-          sortDictOrNoDictMap.add(true);
-        } else {
-          sortDictOrNoDictMap.add(false);
-        }
-      }
-      sortColSchemaOrderMap.put(dimension.getOrdinal(), sortDictOrNoDictMap);
-    }
-    return sortColSchemaOrderMap;
-  }
-
-  /**
    * get mapping based on data fields order
    *
    * @param dataFields
@@ -504,14 +472,26 @@ public final class CarbonDataProcessorUtil {
    * initial sorting, carbonrow will be in order where added sort column is at 
the beginning, But
    * before final merger of sort, the data should be in schema order
    * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the 
carbonRow in schema
-   * order), so This method helps to find the index of no dictionary sort 
column in the carbonrow
-   * data.
+   * order), so This method helps to find the index of no dictionary/ 
dictionary sort column in
+   * the carbonrow data.
    */
-  public static int[] getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable) {
+  public static Map<String, int[]> getColumnIdxBasedOnSchemaInRow(CarbonTable 
carbonTable) {
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
     List<Integer> noDicSortColMap = new ArrayList<>();
+    // get no-dict / dict sort dimensions
+    List<CarbonDimension> noDictSortDimensions = new ArrayList<>();
+    List<CarbonDimension> dictSortDimensions = new ArrayList<>();
+
+    List<Integer> noDictSortColIdx = new ArrayList<>();
+    List<Integer> dictSortColIdx = new ArrayList<>();
+
     int counter = 0;
     for (CarbonDimension dimension : dimensions) {
+      if (dimension.hasEncoding(Encoding.DICTIONARY) || 
dimension.getDataType() == DataTypes.DATE) {
+        dictSortDimensions.add(dimension);
+      } else {
+        noDictSortDimensions.add(dimension);
+      }
       if (dimension.getDataType() == DataTypes.DATE) {
         continue;
       }
@@ -520,12 +500,39 @@ public final class CarbonDataProcessorUtil {
       }
       counter++;
     }
+    // add no-Dict sort column index
+    for (int i = 0; i < noDictSortDimensions.size(); i++) {
+      if (noDictSortDimensions.get(i).isSortColumn()) {
+        noDictSortColIdx.add(i);
+      }
+    }
+    // add dict sort column index
+    for (int i = 0; i < dictSortDimensions.size(); i++) {
+      if (dictSortDimensions.get(i).isSortColumn()) {
+        dictSortColIdx.add(i);
+      }
+    }
     Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
     int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
     for (int i = 0; i < mapping.length; i++) {
       columnIdxBasedOnSchemaInRow[i] = mapping[i];
     }
-    return columnIdxBasedOnSchemaInRow;
+    Integer[] noDictSortIdx = noDictSortColIdx.toArray(new Integer[0]);
+    int[] noDictSortIdxBasedOnSchemaInRow = new int[noDictSortIdx.length];
+    for (int i = 0; i < noDictSortIdx.length; i++) {
+      noDictSortIdxBasedOnSchemaInRow[i] = noDictSortIdx[i];
+    }
+    Integer[] dictSortIdx = dictSortColIdx.toArray(new Integer[0]);
+    int[] dictSortIdxBasedOnSchemaInRow = new int[dictSortIdx.length];
+    for (int i = 0; i < dictSortIdx.length; i++) {
+      dictSortIdxBasedOnSchemaInRow[i] = dictSortIdx[i];
+    }
+
+    Map<String, int[]> dictOrNoDictSortInfoMap = new HashMap<>();
+    dictOrNoDictSortInfoMap.put("columnIdxBasedOnSchemaInRow", 
columnIdxBasedOnSchemaInRow);
+    dictOrNoDictSortInfoMap.put("noDictSortIdxBasedOnSchemaInRow", 
noDictSortIdxBasedOnSchemaInRow);
+    dictOrNoDictSortInfoMap.put("dictSortIdxBasedOnSchemaInRow", 
dictSortIdxBasedOnSchemaInRow);
+    return dictOrNoDictSortInfoMap;
   }
 
   /**
@@ -533,14 +540,27 @@ public final class CarbonDataProcessorUtil {
    * initial sorting, carbonrow will be in order where added sort column is at 
the beginning, But
    * before final merger of sort, the data should be in schema order
    * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the 
carbonRow in schema
-   * order), so This method helps to find the index of no dictionary sort 
column in the carbonrow
-   * data.
+   * order), so This method helps to find the index of no dictionary/ 
dictionary sort column in
+   * the carbonrow data.
    */
-  public static int[] 
getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(DataField[] dataFields) {
+  public static Map<String, int[]> 
getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(
+      DataField[] dataFields) {
     List<Integer> noDicSortColMap = new ArrayList<>();
     int counter = 0;
+    // get no-dict / dict sort column schema
+    List<CarbonColumn> noDictSortColumns = new ArrayList<>();
+    List<CarbonColumn> dictSortColumns = new ArrayList<>();
+
+    List<Integer> noDictSortColIdx = new ArrayList<>();
+    List<Integer> dictSortColIdx = new ArrayList<>();
     for (DataField dataField : dataFields) {
       if (!dataField.getColumn().isInvisible() && 
dataField.getColumn().isDimension()) {
+        if 
(dataField.getColumn().getColumnSchema().hasEncoding(Encoding.DICTIONARY)
+            || dataField.getColumn().getColumnSchema().getDataType() == 
DataTypes.DATE) {
+          dictSortColumns.add(dataField.getColumn());
+        } else {
+          noDictSortColumns.add(dataField.getColumn());
+        }
         if (dataField.getColumn().getColumnSchema().getDataType() == 
DataTypes.DATE) {
           continue;
         }
@@ -551,12 +571,39 @@ public final class CarbonDataProcessorUtil {
         counter++;
       }
     }
+    // add no-Dict sort column index
+    for (int i = 0; i < noDictSortColumns.size(); i++) {
+      if (noDictSortColumns.get(i).getColumnSchema().isSortColumn()) {
+        noDictSortColIdx.add(i);
+      }
+    }
+    // add dict sort column index
+    for (int i = 0; i < dictSortColumns.size(); i++) {
+      if (dictSortColumns.get(i).getColumnSchema().isSortColumn()) {
+        dictSortColIdx.add(i);
+      }
+    }
     Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
     int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
     for (int i = 0; i < mapping.length; i++) {
       columnIdxBasedOnSchemaInRow[i] = mapping[i];
     }
-    return columnIdxBasedOnSchemaInRow;
+    Integer[] noDictSortIdx = noDictSortColIdx.toArray(new Integer[0]);
+    int[] noDictSortIdxBasedOnSchemaInRow = new int[noDictSortIdx.length];
+    for (int i = 0; i < noDictSortIdx.length; i++) {
+      noDictSortIdxBasedOnSchemaInRow[i] = noDictSortIdx[i];
+    }
+    Integer[] dictSortIdx = dictSortColIdx.toArray(new Integer[0]);
+    int[] dictSortIdxBasedOnSchemaInRow = new int[dictSortIdx.length];
+    for (int i = 0; i < dictSortIdx.length; i++) {
+      dictSortIdxBasedOnSchemaInRow[i] = dictSortIdx[i];
+    }
+
+    Map<String, int[]> dictOrNoSortInfoMap = new HashMap<>();
+    dictOrNoSortInfoMap.put("columnIdxBasedOnSchemaInRow", 
columnIdxBasedOnSchemaInRow);
+    dictOrNoSortInfoMap.put("noDictSortIdxBasedOnSchemaInRow", 
noDictSortIdxBasedOnSchemaInRow);
+    dictOrNoSortInfoMap.put("dictSortIdxBasedOnSchemaInRow", 
dictSortIdxBasedOnSchemaInRow);
+    return dictOrNoSortInfoMap;
   }
 
   /**
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
 
b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
index 46c7e7b..f1d4e17 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
@@ -86,25 +86,20 @@ public class FileMergeSortComparatorTest {
         && noDictDataTypes[1].equals(DataTypes.STRING) && noDictDataTypes[2]
         .equals(DataTypes.LONG));
 
-    // test getSortColSchemaOrderMapping
-    Map<Integer, List<Boolean>> sortColSchemaOrderMapping =
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable);
-    assert (sortColSchemaOrderMapping.get(0).get(0).equals(false) && 
sortColSchemaOrderMapping
-        .get(0).get(1).equals(false));
-    assert (sortColSchemaOrderMapping.get(1).get(0).equals(true) && 
sortColSchemaOrderMapping.get(1)
-        .get(1).equals(false));
-    assert (sortColSchemaOrderMapping.get(2).get(0).equals(true) && 
sortColSchemaOrderMapping
-        .get(2).get(1).equals(false));
-    assert (sortColSchemaOrderMapping.get(3).get(0).equals(true) && 
sortColSchemaOrderMapping.get(3)
-        .get(1).equals(true));
-
     // test comparator
-    boolean[] isSortColNoDict = { true, false };
-    int[] columnIdxBasedOnSchemaInRow =
+    Map<String, int[]> columnIdxMap =
         CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(carbonTable);
+    int[] columnIdxBasedOnSchemaInRows = 
columnIdxMap.get("columnIdxBasedOnSchemaInRow");
+    int[] noDictSortIdxBasedOnSchemaInRows = 
columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow");
+    int[] dictSortIdxBasedOnSchemaInRows = 
columnIdxMap.get("dictSortIdxBasedOnSchemaInRow");
+ 
+    assert (noDictSortIdxBasedOnSchemaInRows.length == 2 && 
noDictSortIdxBasedOnSchemaInRows[0] == 1
+        && noDictSortIdxBasedOnSchemaInRows[1] == 2);
+    assert (dictSortIdxBasedOnSchemaInRows.length == 1 && 
dictSortIdxBasedOnSchemaInRows[0] == 0);
+
     FileMergeSortComparator comparator =
-        new FileMergeSortComparator(isSortColNoDict, noDictDataTypes, 
columnIdxBasedOnSchemaInRow,
-            sortColSchemaOrderMapping);
+        new FileMergeSortComparator(noDictDataTypes, 
columnIdxBasedOnSchemaInRows,
+            noDictSortIdxBasedOnSchemaInRows, dictSortIdxBasedOnSchemaInRows);
 
     // prepare data for final sort
     int[] dictSortDims1 = { 1 };

Reply via email to