This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f568d9b  [CARBONDATA-3730] Avoid data conversion and remove duplicate 
codes in BlockIndexerStorage
f568d9b is described below

commit f568d9bb8a6bfcb9554bea2fcbdc52a46a8eb9cd
Author: Manhua <kevin...@qq.com>
AuthorDate: Fri Mar 6 10:39:05 2020 +0800

    [CARBONDATA-3730] Avoid data conversion and remove duplicate codes in 
BlockIndexerStorage
    
    Why is this PR needed?
    
    The initial purpose of this PR was to remove the conversion between 
byte[][] and List when applying RLE on datapage in 
BlockIndexerStorageForNoInvertedIndexForShort, especially case when rle won't 
give any benefit. After checking other subclass of BlockIndexerStorage whether 
has similar problem, we found many duplicate codes and clean them up also.
    
    What changes were proposed in this PR?
    
    avoid some conversion between byte[][] and List
    refactor classes of BlockIndexerStorage to remove duplicate codes
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3649
---
 .../core/datastore/block/TableTaskInfo.java        | 143 ---------------
 .../datastore/columnar/BlockIndexerStorage.java    | 112 +++++++++---
 ...ckIndexerStorageForNoInvertedIndexForShort.java | 135 ---------------
 .../columnar/BlockIndexerStorageForShort.java      | 192 ---------------------
 .../columnar/ByteArrayBlockIndexerStorage.java     |  73 ++++++++
 ... ByteArrayBlockIndexerStorageWithoutRowId.java} |  47 +++--
 ...HighCard.java => ByteArrayColumnWithRowId.java} |  27 ++-
 .../core/datastore/columnar/ColumnWithRowId.java   |  66 -------
 .../columnar/ColumnarKeyStoreDataHolder.java       |  52 ------
 ...ry.java => ObjectArrayBlockIndexerStorage.java} |  70 ++------
 ...oDictionary.java => ObjectColumnWithRowId.java} |  45 ++---
 .../page/encoding/adaptive/AdaptiveCodec.java      |   4 +-
 .../legacy/ComplexDimensionIndexCodec.java         |   4 +-
 .../legacy/DirectDictDimensionIndexCodec.java      |   8 +-
 .../dimension/legacy/PlainDimensionIndexCodec.java |   8 +-
 .../core/datastore/block/TableTaskInfoTest.java    |  81 ---------
 .../ColumnarKeyStoreDataHolderUnitTest.java        |  92 ----------
 17 files changed, 252 insertions(+), 907 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
deleted file mode 100644
index adca63c..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.block;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * This class is responsible for maintaining the mapping of tasks of a node.
- */
-public class TableTaskInfo implements Distributable {
-
-  private final List<TableBlockInfo> tableBlockInfoList;
-  private final String taskId;
-  public String getTaskId() {
-    return taskId;
-  }
-
-  public List<TableBlockInfo> getTableBlockInfoList() {
-    return tableBlockInfoList;
-  }
-
-  public TableTaskInfo(String taskId, List<TableBlockInfo> tableBlockInfoList) 
{
-    this.taskId = taskId;
-    this.tableBlockInfoList = tableBlockInfoList;
-  }
-
-  @Override
-  public String[] getLocations() {
-    Set<String> locations = new HashSet<String>();
-    for (TableBlockInfo tableBlockInfo : tableBlockInfoList) {
-      locations.addAll(Arrays.asList(tableBlockInfo.getLocations()));
-    }
-    locations.toArray(new String[locations.size()]);
-    List<String> nodes =  TableTaskInfo.maxNoNodes(tableBlockInfoList);
-    return nodes.toArray(new String[nodes.size()]);
-  }
-
-  @Override
-  public int compareTo(Distributable o) {
-    return taskId.compareTo(((TableTaskInfo)o).getTaskId());
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (null == obj) {
-      return false;
-    }
-
-    if (!(obj instanceof TableTaskInfo)) {
-      return false;
-    }
-
-    return 0 == taskId.compareTo(((TableTaskInfo)obj).getTaskId());
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((taskId == null) ? 0 : taskId.hashCode());
-    return result;
-  }
-
-  /**
-   * Finding which node has the maximum number of blocks for it.
-   * @param blockList
-   * @return
-   */
-  public static List<String> maxNoNodes(List<TableBlockInfo> blockList) {
-    boolean useIndex = true;
-    Integer maxOccurence = 0;
-    String maxNode = null;
-    Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>();
-
-    // populate the map of node and number of occurences of that node.
-    for (TableBlockInfo block : blockList) {
-      for (String node : block.getLocations()) {
-        Integer nodeOccurence = nodeAndOccurenceMapping.get(node);
-        if (null == nodeOccurence) {
-          nodeAndOccurenceMapping.put(node, 1);
-        } else {
-          nodeOccurence++;
-        }
-      }
-    }
-    Integer previousValueOccurence = null;
-
-    // check which node is occured maximum times.
-    for (Map.Entry<String, Integer> entry : 
nodeAndOccurenceMapping.entrySet()) {
-      // finding the maximum node.
-      if (entry.getValue() > maxOccurence) {
-        maxOccurence = entry.getValue();
-        maxNode = entry.getKey();
-      }
-      // first time scenario. initialzing the previous value.
-      if (null == previousValueOccurence) {
-        previousValueOccurence = entry.getValue();
-      } else {
-        // for the case where all the nodes have same number of blocks then
-        // we need to return complete list instead of max node.
-        if (!Objects.equals(previousValueOccurence, entry.getValue())) {
-          useIndex = false;
-        }
-      }
-    }
-
-    // if all the nodes have equal occurence then returning the complete key 
set.
-    if (useIndex) {
-      return new ArrayList<>(nodeAndOccurenceMapping.keySet());
-    }
-
-    // if any max node is found then returning the max node.
-    List<String> node =  new ArrayList<>(1);
-    node.add(maxNode);
-    return node;
-  }
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
index 44b3c12..559b671 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -18,27 +18,60 @@
 package org.apache.carbondata.core.datastore.columnar;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
 
 public abstract class BlockIndexerStorage<T> {
 
-  public abstract short[] getRowIdPage();
+  protected short[] rowIdPage;
 
-  public abstract int getRowIdPageLengthInBytes();
+  protected short[] rowIdRlePage;
 
-  public abstract short[] getRowIdRlePage();
+  protected T dataPage;
 
-  public abstract int getRowIdRlePageLengthInBytes();
+  protected short[] dataRlePage;
 
-  public abstract T getDataPage();
+  public short[] getRowIdPage() {
+    return rowIdPage;
+  }
 
-  public abstract short[] getDataRlePage();
+  public int getRowIdPageLengthInBytes() {
+    if (rowIdPage != null) {
+      return rowIdPage.length * CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    } else {
+      return 0;
+    }
+  }
 
-  public abstract int getDataRlePageLengthInBytes();
+  public short[] getRowIdRlePage() {
+    return rowIdRlePage;
+  }
+
+  public int getRowIdRlePageLengthInBytes() {
+    if (rowIdRlePage != null) {
+      return rowIdRlePage.length * CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    } else {
+      return 0;
+    }
+  }
+
+  public T getDataPage() {
+    return dataPage;
+  }
+
+  public short[] getDataRlePage() {
+    return dataRlePage;
+  }
+
+  public int getDataRlePageLengthInBytes() {
+    if (dataRlePage != null) {
+      return dataRlePage.length * CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    } else {
+      return 0;
+    }
+  }
 
   /**
    * It compresses depends up on the sequence numbers.
@@ -50,9 +83,7 @@ public abstract class BlockIndexerStorage<T> {
    *
    * @param rowIds
    */
-  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds) {
-    short[] rowIdPage;
-    short[] rowIdRlePage;
+  protected void encodeAndSetRowId(short[] rowIds) {
     List<Short> list = new 
ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     List<Short> map = new 
ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     int k = 0;
@@ -78,28 +109,61 @@ public abstract class BlockIndexerStorage<T> {
     } else {
       list.add(rowIds[i - 1]);
     }
-    int compressionPercentage = (((list.size() + map.size()) * 100) / 
rowIds.length);
-    if (compressionPercentage > 70) {
-      rowIdPage = rowIds;
+    if ((((list.size() + map.size()) * 100) / rowIds.length) > 70) {
+      this.rowIdPage = rowIds;
+      this.rowIdRlePage = new short[0];
     } else {
-      rowIdPage = convertToArray(list);
+      this.rowIdPage = convertToArray(list);
+      this.rowIdRlePage = convertToArray(map);
+    }
+  }
+
+  /**
+   * apply RLE(run-length encoding) on byte array data page
+   */
+  protected byte[][] rleEncodeOnData(byte[][] dataPage) {
+    List<Short> map = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    List<byte[]> list = new ArrayList<>(dataPage.length / 2);
+    list.add(dataPage[0]);
+    short counter = 1;
+    short startIdx = 0;
+    for (int i = 1; i < dataPage.length; i++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(dataPage[i - 1], 
dataPage[i]) != 0) {
+        list.add(dataPage[i]);
+        map.add(startIdx);
+        map.add(counter);
+        startIdx += counter;
+        counter = 1;
+        continue;
+      }
+      counter++;
     }
-    if (rowIds.length == rowIdPage.length) {
-      rowIdRlePage = new short[0];
+    map.add(startIdx);
+    map.add(counter);
+    // if rle is index size is more than 70% then rle wont give any benefit
+    // so better to avoid rle index and write data as it is
+    if ((((list.size() + map.size()) * 100) / dataPage.length) > 70) {
+      this.dataRlePage = new short[0];
+      return dataPage;
     } else {
-      rowIdRlePage = convertToArray(map);
+      this.dataRlePage = convertToArray(map);
+      return convertToDataPage(list);
     }
-    Map<String, short[]> rowIdAndRowRleIdPages = new HashMap<>(2);
-    rowIdAndRowRleIdPages.put("rowIdPage", rowIdPage);
-    rowIdAndRowRleIdPages.put("rowRlePage", rowIdRlePage);
-    return rowIdAndRowRleIdPages;
   }
 
-  protected short[] convertToArray(List<Short> list) {
+  private short[] convertToArray(List<Short> list) {
     short[] shortArray = new short[list.size()];
     for (int i = 0; i < shortArray.length; i++) {
       shortArray[i] = list.get(i);
     }
     return shortArray;
   }
+
+  private byte[][] convertToDataPage(List<byte[]> list) {
+    byte[][] shortArray = new byte[list.size()][];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
deleted file mode 100644
index 21ca421..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.ByteUtil;
-
-/**
- * Below class will be used to for no inverted index
- */
-public class BlockIndexerStorageForNoInvertedIndexForShort extends 
BlockIndexerStorage<byte[][]> {
-
-  /**
-   * column data
-   */
-  private byte[][] dataPage;
-
-  private short[] dataRlePage;
-
-  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage, 
boolean applyRLE) {
-    this.dataPage = dataPage;
-    if (applyRLE) {
-      List<byte[]> actualDataList = new ArrayList<>();
-      for (int i = 0; i < dataPage.length; i++) {
-        actualDataList.add(dataPage[i]);
-      }
-      rleEncodeOnData(actualDataList);
-    }
-  }
-
-  private void rleEncodeOnData(List<byte[]> actualDataList) {
-    byte[] prvKey = actualDataList.get(0);
-    List<byte[]> list = new ArrayList<>(actualDataList.size() / 2);
-    list.add(actualDataList.get(0));
-    short counter = 1;
-    short start = 0;
-    List<Short> map = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    for (int i = 1; i < actualDataList.size(); i++) {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, 
actualDataList.get(i)) != 0) {
-        prvKey = actualDataList.get(i);
-        list.add(actualDataList.get(i));
-        map.add(start);
-        map.add(counter);
-        start += counter;
-        counter = 1;
-        continue;
-      }
-      counter++;
-    }
-    map.add(start);
-    map.add(counter);
-    // if rle is index size is more than 70% then rle wont give any benefit
-    // so better to avoid rle index and write data as it is
-    boolean useRle = (((list.size() + map.size()) * 100) / 
actualDataList.size()) < 70;
-    if (useRle) {
-      this.dataPage = convertToDataPage(list);
-      dataRlePage = convertToArray(map);
-    } else {
-      this.dataPage = convertToDataPage(actualDataList);
-      dataRlePage = new short[0];
-    }
-  }
-
-  private byte[][] convertToDataPage(List<byte[]> list) {
-    byte[][] shortArray = new byte[list.size()][];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
-  public short[] getDataRlePage() {
-    return dataRlePage;
-  }
-
-  public int getDataRlePageLengthInBytes() {
-    if (dataRlePage != null) {
-      return dataRlePage.length * 2;
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * no use
-   *
-   * @return
-   */
-  public short[] getRowIdPage() {
-    return new short[0];
-  }
-
-  public int getRowIdPageLengthInBytes() {
-    return 0;
-  }
-
-  /**
-   * no use
-   *
-   * @return
-   */
-  public short[] getRowIdRlePage() {
-    return new short[0];
-  }
-
-  public int getRowIdRlePageLengthInBytes() {
-    return 0;
-  }
-
-  /**
-   * @return the dataPage
-   */
-  public byte[][] getDataPage() {
-    return dataPage;
-  }
-
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
deleted file mode 100644
index 6e68caf..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> 
{
-
-  private boolean alreadySorted;
-
-  private short[] rowIdPage;
-
-  private short[] rowIdRlePage;
-
-  private byte[][] dataPage;
-
-  private short[] dataRlePage;
-
-  public BlockIndexerStorageForShort(byte[][] dataPage, boolean rleOnData,
-      boolean isNoDictionary, boolean isSortRequired) {
-    ColumnWithRowId<Short>[] dataWithRowId = createColumnWithRowId(dataPage, 
isNoDictionary);
-    if (isSortRequired) {
-      Arrays.sort(dataWithRowId);
-    }
-    short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
-    rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
-    rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
-    if (rleOnData) {
-      rleEncodeOnData(dataWithRowId);
-    }
-  }
-
-  /**
-   * Create an object with each column array and respective rowId
-   *
-   * @return
-   */
-  private ColumnWithRowId<Short>[] createColumnWithRowId(byte[][] dataPage,
-      boolean isNoDictionary) {
-    ColumnWithRowId<Short>[] columnWithIndexs = new 
ColumnWithRowId[dataPage.length];
-    if (isNoDictionary) {
-      for (short i = 0; i < columnWithIndexs.length; i++) {
-        columnWithIndexs[i] = new ColumnWithRowIdForHighCard<>(dataPage[i], i);
-      }
-    } else {
-      for (short i = 0; i < columnWithIndexs.length; i++) {
-        columnWithIndexs[i] = new ColumnWithRowId<>(dataPage[i], i);
-      }
-    }
-    return columnWithIndexs;
-  }
-
-  private short[] extractDataAndReturnRowId(ColumnWithRowId<Short>[] 
dataWithRowId,
-      byte[][] dataPage) {
-    short[] indexes = new short[dataWithRowId.length];
-    for (int i = 0; i < indexes.length; i++) {
-      indexes[i] = dataWithRowId[i].getIndex();
-      dataPage[i] = dataWithRowId[i].getColumn();
-    }
-    this.dataPage = dataPage;
-    return indexes;
-  }
-
-  /**
-   * @return the alreadySorted
-   */
-  public boolean isAlreadySorted() {
-    return alreadySorted;
-  }
-
-  /**
-   * @return the rowIdPage
-   */
-  public short[] getRowIdPage() {
-    return rowIdPage;
-  }
-
-  public int getRowIdPageLengthInBytes() {
-    if (rowIdPage != null) {
-      return rowIdPage.length * 2;
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * @return the rowIdRlePage
-   */
-  public short[] getRowIdRlePage() {
-    return rowIdRlePage;
-  }
-
-  public int getRowIdRlePageLengthInBytes() {
-    if (rowIdRlePage != null) {
-      return rowIdRlePage.length * 2;
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * @return the dataPage
-   */
-  public byte[][] getDataPage() {
-    return dataPage;
-  }
-
-  private void rleEncodeOnData(ColumnWithRowId<Short>[] dataWithRowId) {
-    byte[] prvKey = dataWithRowId[0].getColumn();
-    List<ColumnWithRowId> list = new ArrayList<>(dataWithRowId.length / 2);
-    list.add(dataWithRowId[0]);
-    short counter = 1;
-    short start = 0;
-    List<Short> map = new 
ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    for (int i = 1; i < dataWithRowId.length; i++) {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, 
dataWithRowId[i].getColumn()) != 0) {
-        prvKey = dataWithRowId[i].getColumn();
-        list.add(dataWithRowId[i]);
-        map.add(start);
-        map.add(counter);
-        start += counter;
-        counter = 1;
-        continue;
-      }
-      counter++;
-    }
-    map.add(start);
-    map.add(counter);
-    // if rle is index size is more than 70% then rle wont give any benefit
-    // so better to avoid rle index and write data as it is
-    boolean useRle = (((list.size() + map.size()) * 100) / 
dataWithRowId.length) < 70;
-    if (useRle) {
-      this.dataPage = convertToDataPage(list);
-      dataRlePage = convertToArray(map);
-    } else {
-      this.dataPage = convertToDataPage(dataWithRowId);
-      dataRlePage = new short[0];
-    }
-  }
-
-  private byte[][] convertToDataPage(ColumnWithRowId[] indexes) {
-    byte[][] shortArray = new byte[indexes.length][];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = indexes[i].getColumn();
-    }
-    return shortArray;
-  }
-
-  private byte[][] convertToDataPage(List<ColumnWithRowId> list) {
-    byte[][] shortArray = new byte[list.size()][];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i).getColumn();
-    }
-    return shortArray;
-  }
-
-  @Override
-  public short[] getDataRlePage() {
-    return dataRlePage;
-  }
-
-  @Override
-  public int getDataRlePageLengthInBytes() {
-    if (dataRlePage != null) {
-      return dataRlePage.length * 2;
-    } else {
-      return 0;
-    }
-  }
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayBlockIndexerStorage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayBlockIndexerStorage.java
new file mode 100644
index 0000000..f5117cc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayBlockIndexerStorage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+/**
+ * Support to encode on both rowId and data
+ */
+public class ByteArrayBlockIndexerStorage extends 
BlockIndexerStorage<byte[][]> {
+
+  public ByteArrayBlockIndexerStorage(byte[][] dataPage, boolean rleOnData,
+                                      boolean isNoDictionary, boolean 
isSortRequired) {
+    ByteArrayColumnWithRowId[] dataWithRowId = createColumnWithRowId(dataPage, 
isNoDictionary);
+    if (isSortRequired) {
+      Arrays.sort(dataWithRowId);
+    }
+    short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
+    encodeAndSetRowId(rowIds);
+    if (rleOnData) {
+      this.dataPage = rleEncodeOnData(dataPage);
+    } else {
+      this.dataPage = dataPage;
+    }
+  }
+
+  /**
+   * Create an object with each column array and respective rowId
+   *
+   * @return
+   */
+  private ByteArrayColumnWithRowId[] createColumnWithRowId(byte[][] dataPage,
+      boolean isNoDictionary) {
+    ByteArrayColumnWithRowId[] columnWithIndexs = new 
ByteArrayColumnWithRowId[dataPage.length];
+    if (isNoDictionary) {
+      for (short i = 0; i < columnWithIndexs.length; i++) {
+        columnWithIndexs[i] = new ByteArrayColumnWithRowId(dataPage[i], i);
+      }
+    } else {
+      for (short i = 0; i < columnWithIndexs.length; i++) {
+        columnWithIndexs[i] = new ByteArrayColumnWithRowId(dataPage[i], i);
+      }
+    }
+    return columnWithIndexs;
+  }
+
+  private short[] extractDataAndReturnRowId(ByteArrayColumnWithRowId[] 
dataWithRowId,
+      byte[][] dataPage) {
+    short[] indexes = new short[dataWithRowId.length];
+    for (int i = 0; i < indexes.length; i++) {
+      indexes[i] = dataWithRowId[i].getRowId();
+      dataPage[i] = dataWithRowId[i].getColumn();
+    }
+    this.dataPage = dataPage;
+    return indexes;
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreMetadata.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayBlockIndexerStorageWithoutRowId.java
similarity index 56%
rename from 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreMetadata.java
rename to 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayBlockIndexerStorageWithoutRowId.java
index e2a9293..921ffc2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreMetadata.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayBlockIndexerStorageWithoutRowId.java
@@ -17,35 +17,44 @@
 
 package org.apache.carbondata.core.datastore.columnar;
 
-class ColumnarKeyStoreMetadata {
-
-  private int[] columnReverseIndex;
-
-  private int eachRowSize;
-
-  ColumnarKeyStoreMetadata(int eachRowSize) {
-    this.eachRowSize = eachRowSize;
+/**
+ * Below class will be used to for no inverted index
+ * Support to encode on data
+ */
+public class ByteArrayBlockIndexerStorageWithoutRowId extends 
BlockIndexerStorage<byte[][]> {
+
+  public ByteArrayBlockIndexerStorageWithoutRowId(byte[][] dataPage, boolean 
rleOnData) {
+    if (rleOnData) {
+      this.dataPage = rleEncodeOnData(dataPage);
+    } else {
+      this.dataPage = dataPage;
+    }
   }
 
   /**
-   * @return the eachRowSize
+   * no use
+   *
+   * @return
    */
-  int getEachRowSize() {
-    return eachRowSize;
+  public short[] getRowIdPage() {
+    return new short[0];
   }
 
-  /**
-   * @return the columnReverseIndex
-   */
-  int[] getColumnReverseIndex() {
-    return columnReverseIndex;
+  public int getRowIdPageLengthInBytes() {
+    return 0;
   }
 
   /**
-   * @param columnReverseIndex the columnReverseIndex to set
+   * no use
+   *
+   * @return
    */
-  void setColumnReverseIndex(int[] columnReverseIndex) {
-    this.columnReverseIndex = columnReverseIndex;
+  public short[] getRowIdRlePage() {
+    return new short[0];
+  }
+
+  public int getRowIdRlePageLengthInBytes() {
+    return 0;
   }
 
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForHighCard.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayColumnWithRowId.java
similarity index 68%
rename from 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForHighCard.java
rename to 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayColumnWithRowId.java
index 8697480..946aa6b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForHighCard.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ByteArrayColumnWithRowId.java
@@ -21,15 +21,26 @@ import java.util.Arrays;
 
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 
-public class ColumnWithRowIdForHighCard<T> extends ColumnWithRowId<T>
-    implements Comparable<ColumnWithRowId<T>> {
+public class ByteArrayColumnWithRowId implements 
Comparable<ByteArrayColumnWithRowId> {
+  private byte[] column;
 
-  ColumnWithRowIdForHighCard(byte[] column, T index) {
-    super(column, index);
+  private short rowId;
+
+  ByteArrayColumnWithRowId(byte[] column, short rowId) {
+    this.column = column;
+    this.rowId = rowId;
+  }
+
+  public byte[] getColumn() {
+    return column;
+  }
+
+  public short getRowId() {
+    return rowId;
   }
 
   @Override
-  public int compareTo(ColumnWithRowId o) {
+  public int compareTo(ByteArrayColumnWithRowId o) {
     return UnsafeComparer.INSTANCE
         .compareTo(column, 2, column.length - 2, o.column, 2, o.column.length 
- 2);
   }
@@ -39,12 +50,12 @@ public class ColumnWithRowIdForHighCard<T> extends 
ColumnWithRowId<T>
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    ColumnWithRowIdForHighCard o = (ColumnWithRowIdForHighCard)obj;
-    return Arrays.equals(column, o.column) && getIndex() == o.getIndex();
+    ByteArrayColumnWithRowId o = (ByteArrayColumnWithRowId)obj;
+    return Arrays.equals(column, o.column) && rowId == o.rowId;
   }
 
   @Override
   public int hashCode() {
-    return Arrays.hashCode(column) + getIndex().hashCode();
+    return Arrays.hashCode(column) + Short.hashCode(rowId);
   }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowId.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowId.java
deleted file mode 100644
index 3756219..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.Arrays;
-
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class ColumnWithRowId<T> implements Comparable<ColumnWithRowId<T>> {
-  protected byte[] column;
-
-  private T index;
-
-  ColumnWithRowId(byte[] column, T index) {
-    this.column = column;
-    this.index = index;
-  }
-
-  /**
-   * @return the column
-   */
-  public byte[] getColumn() {
-    return column;
-  }
-
-  /**
-   * @return the index
-   */
-  public T getIndex() {
-    return index;
-  }
-
-  @Override
-  public int compareTo(ColumnWithRowId o) {
-    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(column, o.column);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    ColumnWithRowId o = (ColumnWithRowId)obj;
-    return Arrays.equals(column, o.column) && index == o.index;
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(column) + index.hashCode();
-  }
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreDataHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreDataHolder.java
deleted file mode 100644
index 51389ff..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreDataHolder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.nio.ByteBuffer;
-
-public class ColumnarKeyStoreDataHolder {
-  private byte[] keyblockData;
-  private ColumnarKeyStoreMetadata columnarKeyStoreMetadata;
-
-  public ColumnarKeyStoreDataHolder(final byte[] keyblockData,
-      final ColumnarKeyStoreMetadata columnarKeyStoreMetadata) {
-    this.keyblockData = keyblockData;
-    this.columnarKeyStoreMetadata = columnarKeyStoreMetadata;
-  }
-
-  public ColumnarKeyStoreDataHolder(final ColumnarKeyStoreMetadata 
columnarKeyStoreMetadata) {
-    this.columnarKeyStoreMetadata = columnarKeyStoreMetadata;
-  }
-
-  public int getSurrogateKey(int columnIndex) {
-    byte[] actual = new byte[4];
-    int startIndex;
-    if (null != columnarKeyStoreMetadata.getColumnReverseIndex()) {
-      startIndex =
-          columnarKeyStoreMetadata.getColumnReverseIndex()[columnIndex] * 
columnarKeyStoreMetadata
-              .getEachRowSize();
-    } else {
-      startIndex = columnIndex * columnarKeyStoreMetadata.getEachRowSize();
-    }
-    int destPos = 4 - columnarKeyStoreMetadata.getEachRowSize();
-    System.arraycopy(keyblockData, startIndex, actual, destPos,
-        columnarKeyStoreMetadata.getEachRowSize());
-    return ByteBuffer.wrap(actual).getInt();
-  }
-
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ObjectArrayBlockIndexerStorage.java
similarity index 51%
rename from 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
rename to 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ObjectArrayBlockIndexerStorage.java
index 6eeadff..27c26a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ObjectArrayBlockIndexerStorage.java
@@ -18,31 +18,25 @@
 package org.apache.carbondata.core.datastore.columnar;
 
 import java.util.Arrays;
-import java.util.Map;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
-public class BlockIndexerStorageForNoDictionary extends 
BlockIndexerStorage<Object[]> {
-
-  private short[] rowIdPage;
-
-  private short[] rowIdRlePage;
-
-  private Object[] dataPage;
+/**
+ * Support to encode on rowId
+ */
+public class ObjectArrayBlockIndexerStorage extends 
BlockIndexerStorage<Object[]> {
 
   private DataType dataType;
 
-  public BlockIndexerStorageForNoDictionary(Object[] dataPage, DataType 
dataType,
-      boolean isSortRequired) {
+  public ObjectArrayBlockIndexerStorage(Object[] dataPage, DataType dataType,
+                                        boolean isSortRequired) {
     this.dataType = dataType;
-    ColumnWithRowIdForNoDictionary<Short>[] dataWithRowId = 
createColumnWithRowId(dataPage);
+    ObjectColumnWithRowId[] dataWithRowId = createColumnWithRowId(dataPage);
     if (isSortRequired) {
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
-    rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
-    rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
+    encodeAndSetRowId(rowIds);
   }
 
   /**
@@ -50,62 +44,26 @@ public class BlockIndexerStorageForNoDictionary extends 
BlockIndexerStorage<Obje
    *
    * @return
    */
-  private ColumnWithRowIdForNoDictionary<Short>[] 
createColumnWithRowId(Object[] dataPage) {
-    ColumnWithRowIdForNoDictionary<Short>[] columnWithIndexs =
-        new ColumnWithRowIdForNoDictionary[dataPage.length];
+  private ObjectColumnWithRowId[] createColumnWithRowId(Object[] dataPage) {
+    ObjectColumnWithRowId[] columnWithIndexs =
+        new ObjectColumnWithRowId[dataPage.length];
     for (short i = 0; i < columnWithIndexs.length; i++) {
-      columnWithIndexs[i] = new ColumnWithRowIdForNoDictionary<>(dataPage[i], 
i, dataType);
+      columnWithIndexs[i] = new ObjectColumnWithRowId(dataPage[i], i, 
dataType);
     }
     return columnWithIndexs;
   }
 
-  private short[] 
extractDataAndReturnRowId(ColumnWithRowIdForNoDictionary<Short>[] dataWithRowId,
+  private short[] extractDataAndReturnRowId(ObjectColumnWithRowId[] 
dataWithRowId,
       Object[] dataPage) {
     short[] indexes = new short[dataWithRowId.length];
     for (int i = 0; i < indexes.length; i++) {
-      indexes[i] = dataWithRowId[i].getIndex();
+      indexes[i] = dataWithRowId[i].getRowId();
       dataPage[i] = dataWithRowId[i].getColumn();
     }
     this.dataPage = dataPage;
     return indexes;
   }
 
-  /**
-   * @return the rowIdPage
-   */
-  @Override
-  public short[] getRowIdPage() {
-    return rowIdPage;
-  }
-
-  @Override
-  public int getRowIdPageLengthInBytes() {
-    if (rowIdPage != null) {
-      return rowIdPage.length * 2;
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public short[] getRowIdRlePage() {
-    return rowIdRlePage;
-  }
-
-  @Override
-  public int getRowIdRlePageLengthInBytes() {
-    if (rowIdRlePage != null) {
-      return rowIdRlePage.length * 2;
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public Object[] getDataPage() {
-    return dataPage;
-  }
-
   @Override
   public short[] getDataRlePage() {
     return new short[0];
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ObjectColumnWithRowId.java
similarity index 71%
rename from 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
rename to 
core/src/main/java/org/apache/carbondata/core/datastore/columnar/ObjectColumnWithRowId.java
index 90b33d7..a312d79 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ObjectColumnWithRowId.java
@@ -20,23 +20,29 @@ package org.apache.carbondata.core.datastore.columnar;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
-public class ColumnWithRowIdForNoDictionary<T>
-    implements Comparable<ColumnWithRowIdForNoDictionary<T>> {
+public class ObjectColumnWithRowId implements 
Comparable<ObjectColumnWithRowId> {
+  private Object column;
 
-  Object column;
+  private short rowId;
 
-  T index;
+  private DataType dataType;
 
-  DataType dataType;
-
-  ColumnWithRowIdForNoDictionary(Object column, T index, DataType dataType) {
+  ObjectColumnWithRowId(Object column, short rowId, DataType dataType) {
     this.column = column;
-    this.index = index;
+    this.rowId = rowId;
     this.dataType = dataType;
   }
 
+  public Object getColumn() {
+    return column;
+  }
+
+  public short getRowId() {
+    return rowId;
+  }
+
   @Override
-  public int compareTo(ColumnWithRowIdForNoDictionary o) {
+  public int compareTo(ObjectColumnWithRowId o) {
     // use the data type based comparator for the no dictionary encoded columns
     SerializableComparator comparator =
         
org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType);
@@ -48,27 +54,12 @@ public class ColumnWithRowIdForNoDictionary<T>
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    ColumnWithRowIdForNoDictionary o = (ColumnWithRowIdForNoDictionary)obj;
-    return column.equals(o.column) && getIndex() == o.getIndex();
+    ObjectColumnWithRowId o = (ObjectColumnWithRowId)obj;
+    return column.equals(o.column) && rowId == o.rowId;
   }
 
   @Override
   public int hashCode() {
-    return getColumn().hashCode() + getIndex().hashCode();
+    return column.hashCode() + Short.hashCode(rowId);
   }
-
-  /**
-   * @return the index
-   */
-  public T getIndex() {
-    return index;
-  }
-
-  /**
-   * @return the column
-   */
-  public Object getColumn() {
-    return column;
-  }
-
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index 17584e3..274fb82 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoDictionary;
+import 
org.apache.carbondata.core.datastore.columnar.ObjectArrayBlockIndexerStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
@@ -235,7 +235,7 @@ public abstract class AdaptiveCodec implements 
ColumnPageCodec {
             input.getColumnPageEncoderMeta().getCompressorName()), 
input.getPageSize());
     if (isInvertedIndex) {
       indexStorage =
-          new 
BlockIndexerStorageForNoDictionary(getPageBasedOnDataType(input), 
input.getDataType(),
+          new ObjectArrayBlockIndexerStorage(getPageBasedOnDataType(input), 
input.getDataType(),
               isInvertedIndex);
     }
     ColumnPage columnPage = getSortedColumnPageIfRequired(input);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
index 5a6d3c0..55f0979 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import 
org.apache.carbondata.core.datastore.columnar.ByteArrayBlockIndexerStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -48,7 +48,7 @@ public class ComplexDimensionIndexCodec extends 
IndexStorageCodec {
       @Override
       void encodeIndexStorage(ColumnPage inputPage) {
         BlockIndexerStorage<byte[][]> indexStorage =
-            new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), 
false, false, false);
+            new ByteArrayBlockIndexerStorage(inputPage.getByteArrayPage(), 
false, false, false);
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             inputPage.getColumnCompressorName());
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 15827f8..39d2337 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -22,8 +22,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import 
org.apache.carbondata.core.datastore.columnar.ByteArrayBlockIndexerStorage;
+import 
org.apache.carbondata.core.datastore.columnar.ByteArrayBlockIndexerStorageWithoutRowId;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -50,9 +50,9 @@ public class DirectDictDimensionIndexCodec extends 
IndexStorageCodec {
         BlockIndexerStorage<byte[][]> indexStorage;
         byte[][] data = inputPage.getByteArrayPage();
         if (isInvertedIndex) {
-          indexStorage = new BlockIndexerStorageForShort(data, false, false, 
isSort);
+          indexStorage = new ByteArrayBlockIndexerStorage(data, false, false, 
isSort);
         } else {
-          indexStorage = new 
BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+          indexStorage = new ByteArrayBlockIndexerStorageWithoutRowId(data, 
false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
index e50a599..5e5ad61 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
@@ -23,8 +23,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import 
org.apache.carbondata.core.datastore.columnar.ByteArrayBlockIndexerStorage;
+import 
org.apache.carbondata.core.datastore.columnar.ByteArrayBlockIndexerStorageWithoutRowId;
 import org.apache.carbondata.core.datastore.columnar.DummyBlockIndexerStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
@@ -63,8 +63,8 @@ public class PlainDimensionIndexCodec extends 
IndexStorageCodec {
         if (isInvertedIndex || isDictionary) {
           byte[][] byteArray = input.getByteArrayPage();
           indexStorage = isInvertedIndex ?
-            new BlockIndexerStorageForShort(byteArray, isDictionary, 
!isDictionary, isSort) :
-            new BlockIndexerStorageForNoInvertedIndexForShort(byteArray, true);
+            new ByteArrayBlockIndexerStorage(byteArray, isDictionary, 
!isDictionary, isSort) :
+            new ByteArrayBlockIndexerStorageWithoutRowId(byteArray, true);
           byte[] compressInput = ByteUtil.flatten(indexStorage.getDataPage());
           super.compressedDataPage = compressor.compressByte(compressInput);
         } else {
diff --git 
a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
 
b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
deleted file mode 100644
index 6501e98..0000000
--- 
a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.block;
-
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class TableTaskInfoTest {
-
-  static TableTaskInfo tableTaskInfo;
-  static List<TableBlockInfo> tableBlockInfoList;
-
-  @BeforeClass public static void setup() {
-    tableBlockInfoList = new ArrayList<>(5);
-
-    String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", 
locations, 6, ColumnarFormatVersion.V1, null));
-
-    String[] locs = { "loc4", "loc5" };
-    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", 
locs, 6, ColumnarFormatVersion.V1, null));
-
-    tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
-  }
-
-  @Test public void getLocationsTest() {
-    String locations[] = { "loc1", "loc2", "loc3", "loc4", "loc5" };
-    String res[] = tableTaskInfo.getLocations();
-    Assert.assertTrue(Arrays.equals(locations, res));
-  }
-
-  @Test public void maxNoNodesTest() {
-    List<String> locs = new ArrayList<String>();
-    locs.add("loc1");
-    locs.add("loc2");
-    locs.add("loc3");
-    locs.add("loc4");
-    locs.add("loc5");
-
-    List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoList);
-    Assert.assertTrue(res.equals(locs));
-  }
-
-  @Test public void maxNoNodesTestForElse() {
-    List<String> locs = new ArrayList<String>();
-    locs.add("loc1");
-    locs.add("loc2");
-    locs.add("loc3");
-    List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
-
-    String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, 
"segmentID", locations, 6, ColumnarFormatVersion.V1, null));
-
-    String[] locations1 = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, 
"segmentID", locations1, 6, ColumnarFormatVersion.V1, null));
-
-    List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
-    Assert.assertTrue(res.equals(locs));
-  }
-}
diff --git 
a/core/src/test/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreDataHolderUnitTest.java
 
b/core/src/test/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreDataHolderUnitTest.java
deleted file mode 100644
index 7959c4b..0000000
--- 
a/core/src/test/java/org/apache/carbondata/core/datastore/columnar/ColumnarKeyStoreDataHolderUnitTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.List;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class ColumnarKeyStoreDataHolderUnitTest {
-
-  private static ColumnarKeyStoreDataHolder columnarKeyStoreDataHolder;
-  private static ColumnarKeyStoreMetadata columnarKeyStoreMetadata;
-
-  @BeforeClass public static void setup() {
-    byte[] keyBlockData = new byte[] { 16, 8, 32, 40, 8, 8, 8 };
-    int eachRowSize = 2;
-    int[] reverseIndex = new int[] { 1, 5, 6, 3, 8 };
-    columnarKeyStoreMetadata = new ColumnarKeyStoreMetadata(eachRowSize);
-    columnarKeyStoreMetadata.setColumnReverseIndex(reverseIndex);
-    columnarKeyStoreDataHolder =
-        new ColumnarKeyStoreDataHolder(keyBlockData, columnarKeyStoreMetadata);
-  }
-
-  @Test public void testGetSurrogateKeyWithNullINGetColumnReverseIndex() {
-    byte[] keyBlockData = new byte[] { 16, 8, 32, 40, 8, 8, 8 };
-    int eachRowSize = 1;
-    ColumnarKeyStoreMetadata columnarKeyStoreMetadata = new 
ColumnarKeyStoreMetadata(eachRowSize);
-    ColumnarKeyStoreDataHolder columnarKeyStoreDataHolderNew =
-        new ColumnarKeyStoreDataHolder(keyBlockData, columnarKeyStoreMetadata);
-    int columnIndex = 5;
-    int expected_result = 8;
-    int result = columnarKeyStoreDataHolderNew.getSurrogateKey(columnIndex);
-    assertEquals(expected_result, result);
-  }
-
-  @Test public void 
testGetSurrogateKeyWithNullINGetColumnReverseIndexAndRowSizeTwo() {
-    byte[] keyBlockData = new byte[] { 16, 8, 32, 40, 8, 8, 8 };
-    int eachRowSize = 2;
-    ColumnarKeyStoreMetadata columnarKeyStoreMetadata = new 
ColumnarKeyStoreMetadata(eachRowSize);
-    ColumnarKeyStoreDataHolder columnarKeyStoreDataHolderNew =
-        new ColumnarKeyStoreDataHolder(keyBlockData, columnarKeyStoreMetadata);
-    int columnIndex = 0;
-    int expected_result = 4104;
-    int result = columnarKeyStoreDataHolderNew.getSurrogateKey(columnIndex);
-    assertEquals(expected_result, result);
-  }
-
-  @Test public void testGetSurrogateKeyWithNotNullINGetColumnReverseIndex() {
-    int columnIndex = 0;
-    int expected_result = 8232;
-    int result = columnarKeyStoreDataHolder.getSurrogateKey(columnIndex);
-    assertEquals(expected_result, result);
-  }
-
-  @Test(expected = ArrayIndexOutOfBoundsException.class)
-  public void testExceptionInGetSurrogateKey() {
-    int columnIndex = 10;
-    int expected_result = 8232;
-    int result = columnarKeyStoreDataHolder.getSurrogateKey(columnIndex);
-    assertEquals(expected_result, result);
-  }
-
-  @Test public void testGetSurrogateKeyWithListOfByteWhileCreatingObject() {
-    byte[] keyBlockData = new byte[] { 32, 64, 32, 40, 64, 8, 8 };
-    List<byte[]> noDictionaryValBasedKeyBlockData = new 
java.util.ArrayList<>();
-    noDictionaryValBasedKeyBlockData.add(keyBlockData);
-    new ColumnarKeyStoreDataHolder(columnarKeyStoreMetadata);
-    int columnIndex = 0;
-    int expected_result = 8232;
-    int result = columnarKeyStoreDataHolder.getSurrogateKey(columnIndex);
-    assertEquals(expected_result, result);
-  }
-
-}

Reply via email to