http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index e066ee3..3f25d9b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -35,8 +35,8 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import 
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -340,21 +340,21 @@ public class RowLevelFilterExecuterImpl implements 
FilterExecuter {
       }
 
       Object msrValue;
-      MeasureColumnDataChunk measureColumnDataChunk =
+      ColumnPage columnPage =
           blockChunkHolder.getMeasureRawDataChunk()[measureBlocksIndex[0]]
-              .convertToMeasureColDataChunk(pageIndex);
+              .convertToColumnPage(pageIndex);
       switch (msrType) {
         case SHORT:
-          msrValue = (short) 
measureColumnDataChunk.getColumnPage().getLong(index);
+          msrValue = (short) columnPage.getLong(index);
           break;
         case INT:
-          msrValue = (int) 
measureColumnDataChunk.getColumnPage().getLong(index);
+          msrValue = (int) columnPage.getLong(index);
           break;
         case LONG:
-          msrValue = measureColumnDataChunk.getColumnPage().getLong(index);
+          msrValue = columnPage.getLong(index);
           break;
         case DECIMAL:
-          BigDecimal bigDecimalValue = 
measureColumnDataChunk.getColumnPage().getDecimal(index);
+          BigDecimal bigDecimalValue = columnPage.getDecimal(index);
           if (null != bigDecimalValue &&
               
msrColumnEvalutorInfo.getCarbonColumn().getColumnSchema().getScale() >
                   bigDecimalValue.scale()) {
@@ -366,10 +366,10 @@ public class RowLevelFilterExecuterImpl implements 
FilterExecuter {
           msrValue = bigDecimalValue;
           break;
         default:
-          msrValue = measureColumnDataChunk.getColumnPage().getDouble(index);
+          msrValue = columnPage.getDouble(index);
       }
       record[msrColumnEvalutorInfo.getRowIndex()] =
-          
measureColumnDataChunk.getNullValueIndexHolder().getBitSet().get(index) ? null 
: msrValue;
+          columnPage.getNullBits().get(index) ? null : msrValue;
     }
     row.setValues(record);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index ca6df0c..d34ab2c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -23,9 +23,9 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -223,23 +223,21 @@ public class RowLevelRangeGrtThanFiterExecuterImpl 
extends RowLevelFilterExecute
             int compare = comparator.compare(msrFilterRangeValues[0], 
DataTypeUtil
                 .getMeasureObjectFromDataType(rawColumnChunk.getMinValues()[i],
                     msrColEvalutorInfoList.get(0).getType()));
-            MeasureColumnDataChunk measureColumnDataChunk =
-                rawColumnChunk.convertToMeasureColDataChunk(i);
-            if (compare < 0 &&
-                
measureColumnDataChunk.getNullValueIndexHolder().getBitSet().isEmpty()) {
+            ColumnPage columnPage =
+                rawColumnChunk.convertToColumnPage(i);
+            if (compare < 0 && columnPage.getNullBits().isEmpty()) {
               BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
               bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
               bitSetGroup.setBitSet(bitSet, i);
             } else {
               BitSet bitSet =
-                  getFilteredIndexesForMeasures(measureColumnDataChunk,
-                      rawColumnChunk.getRowCount()[i]);
+                  getFilteredIndexesForMeasures(columnPage, 
rawColumnChunk.getRowCount()[i]);
               bitSetGroup.setBitSet(bitSet, i);
             }
           }
         } else {
           BitSet bitSet =
-              
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+              
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                   rawColumnChunk.getRowCount()[i]);
           bitSetGroup.setBitSet(bitSet, i);
         }
@@ -249,13 +247,13 @@ public class RowLevelRangeGrtThanFiterExecuterImpl 
extends RowLevelFilterExecute
     return null;
   }
 
-  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk 
measureColumnDataChunk,
+  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
     SerializableComparator comparator = 
Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = 
measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+    BitSet nullBitSet = columnPage.getNullBits();
     for (int i = 0; i < filterValues.length; i++) {
       if (filterValues[i] == null) {
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = 
nullBitSet.nextSetBit(j + 1)) {
@@ -266,7 +264,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends 
RowLevelFilterExecute
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         if (!nullBitSet.get(startIndex)) {
           Object msrValue = DataTypeUtil
-              
.getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), 
startIndex,
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
                   msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
           if (comparator.compare(msrValue, filterValues[i]) > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index e20f4a6..3f8eddd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -23,9 +23,9 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -222,23 +222,22 @@ public class 
RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
             int compare = comparator.compare(msrFilterRangeValues[0], 
DataTypeUtil
                 .getMeasureObjectFromDataType(rawColumnChunk.getMinValues()[i],
                     msrColEvalutorInfoList.get(0).getType()));
-            MeasureColumnDataChunk measureColumnDataChunk =
-                rawColumnChunk.convertToMeasureColDataChunk(i);
-            if (compare <= 0 &&
-                
measureColumnDataChunk.getNullValueIndexHolder().getBitSet().isEmpty()) {
+            ColumnPage columnPage =
+                rawColumnChunk.convertToColumnPage(i);
+            if (compare <= 0 && columnPage.getNullBits().isEmpty()) {
               BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
               bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
               bitSetGroup.setBitSet(bitSet, i);
             } else {
               BitSet bitSet =
-                  
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+                  
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                       rawColumnChunk.getRowCount()[i]);
               bitSetGroup.setBitSet(bitSet, i);
             }
           }
         } else {
           BitSet bitSet =
-              
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+              
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                   rawColumnChunk.getRowCount()[i]);
           bitSetGroup.setBitSet(bitSet, i);
         }
@@ -248,13 +247,13 @@ public class 
RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     return null;
   }
 
-  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk 
measureColumnDataChunk,
+  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
     SerializableComparator comparator = 
Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = 
measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+    BitSet nullBitSet = columnPage.getNullBits();
     for (int i = 0; i < filterValues.length; i++) {
       if (filterValues[i] == null) {
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = 
nullBitSet.nextSetBit(j + 1)) {
@@ -265,7 +264,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl 
extends RowLevelFilte
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         if (!nullBitSet.get(startIndex)) {
           Object msrValue = DataTypeUtil
-              
.getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), 
startIndex,
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
                   msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
           if (comparator.compare(msrValue, filterValues[i]) >= 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 7b9e7d2..63c9395 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -23,9 +23,9 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -212,13 +212,13 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
           if (isScanRequired(rawColumnChunk.getMinValues()[i], 
this.msrFilterRangeValues,
               msrColEvalutorInfoList.get(0).getType())) {
             BitSet bitSet =
-                
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+                
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                     rawColumnChunk.getRowCount()[i]);
             bitSetGroup.setBitSet(bitSet, i);
           }
         } else {
           BitSet bitSet =
-              
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+              
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                   rawColumnChunk.getRowCount()[i]);
           bitSetGroup.setBitSet(bitSet, i);
         }
@@ -228,13 +228,13 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
     return null;
   }
 
-  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk 
measureColumnDataChunk,
+  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
     SerializableComparator comparator = 
Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = 
measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+    BitSet nullBitSet = columnPage.getNullBits();
     for (int i = 0; i < filterValues.length; i++) {
       if (filterValues[i] == null) {
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = 
nullBitSet.nextSetBit(j + 1)) {
@@ -245,7 +245,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         if (!nullBitSet.get(startIndex)) {
           Object msrValue = DataTypeUtil
-              
.getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), 
startIndex,
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
                   msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
           if (comparator.compare(msrValue, filterValues[i]) <= 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 4681e4e..86ded59 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -23,9 +23,9 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -214,13 +214,13 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
           if (isScanRequired(rawColumnChunk.getMinValues()[i], 
this.msrFilterRangeValues,
               msrColEvalutorInfoList.get(0).getType())) {
             BitSet bitSet =
-                
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+                
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                     rawColumnChunk.getRowCount()[i]);
             bitSetGroup.setBitSet(bitSet, i);
           }
         } else {
           BitSet bitSet =
-              
getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
+              
getFilteredIndexesForMeasures(rawColumnChunk.convertToColumnPage(i),
                   rawColumnChunk.getRowCount()[i]);
           bitSetGroup.setBitSet(bitSet, i);
         }
@@ -230,13 +230,13 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
     return null;
   }
 
-  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk 
measureColumnDataChunk,
+  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
     SerializableComparator comparator = 
Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = 
measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+    BitSet nullBitSet = columnPage.getNullBits();
     for (int i = 0; i < filterValues.length; i++) {
       if (filterValues[i] == null) {
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = 
nullBitSet.nextSetBit(j + 1)) {
@@ -247,7 +247,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends 
RowLevelFilterExecut
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         if (!nullBitSet.get(startIndex)) {
           Object msrValue = DataTypeUtil
-              
.getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), 
startIndex,
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
                   msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
           if (comparator.compare(msrValue, filterValues[i]) < 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 21cf6c8..7cc02ad 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.scan.result;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Map;
@@ -28,8 +27,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.mutate.TupleIdEnum;
@@ -88,7 +87,7 @@ public abstract class AbstractScannedResult {
   /**
    * measure column data chunk
    */
-  protected MeasureColumnDataChunk[][] measureDataChunks;
+  protected ColumnPage[][] measureDataChunks;
   /**
    * dictionary column block index in file
    */
@@ -169,7 +168,7 @@ public abstract class AbstractScannedResult {
    *
    * @param measureDataChunks measure data chunks
    */
-  public void setMeasureChunks(MeasureColumnDataChunk[][] measureDataChunks) {
+  public void setMeasureChunks(ColumnPage[][] measureDataChunks) {
     this.measureDataChunks = measureDataChunks;
   }
 
@@ -183,7 +182,7 @@ public abstract class AbstractScannedResult {
    * @param ordinal measure ordinal
    * @return measure column chunk
    */
-  public MeasureColumnDataChunk getMeasureChunk(int ordinal) {
+  public ColumnPage getMeasureChunk(int ordinal) {
     return measureDataChunks[ordinal][pageCounter];
   }
 
@@ -563,55 +562,6 @@ public abstract class AbstractScannedResult {
     this.rowMapping = indexes;
   }
 
-  /**
-   * Below method will be used to check whether measure value is null or not
-   *
-   * @param ordinal  measure ordinal
-   * @param rowIndex row number to be checked
-   * @return whether it is null or not
-   */
-  protected boolean isNullMeasureValue(int ordinal, int rowIndex) {
-    return 
measureDataChunks[ordinal][pageCounter].getNullValueIndexHolder().getBitSet()
-        .get(rowIndex);
-  }
-
-  /**
-   * Below method will be used to get the measure value of
-   * long type
-   *
-   * @param ordinal  measure ordinal
-   * @param rowIndex row number of the measure value
-   * @return measure value of long type
-   */
-  protected long getLongMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal][pageCounter].getColumnPage()
-        .getLong(rowIndex);
-  }
-
-  /**
-   * Below method will be used to get the measure value of double type
-   *
-   * @param ordinal  measure ordinal
-   * @param rowIndex row number
-   * @return measure value of double type
-   */
-  protected double getDoubleMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal][pageCounter].getColumnPage()
-        .getDouble(rowIndex);
-  }
-
-  /**
-   * Below method will be used to get the measure type of big decimal data type
-   *
-   * @param ordinal  ordinal of the of the measure
-   * @param rowIndex row number
-   * @return measure of big decimal type
-   */
-  protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal][pageCounter].getColumnPage()
-        .getDecimal(rowIndex);
-  }
-
   public int getRowCounter() {
     return rowCounter;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index 1b0c6ca..c145055 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.scan.result.vector;
 import java.math.BigDecimal;
 import java.util.BitSet;
 
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 import org.apache.spark.sql.types.Decimal;
@@ -28,25 +28,24 @@ public class MeasureDataVectorProcessor {
 
   public interface MeasureVectorFiller {
 
-    void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo 
info);
+    void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info);
 
-    void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk 
dataChunk,
+    void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
         ColumnVectorInfo info);
   }
 
   public static class IntegralMeasureVectorFiller implements 
MeasureVectorFiller {
 
     @Override
-    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) 
{
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          vector.putInt(vectorOffset,
-              (int)dataChunk.getColumnPage().getLong(i));
+          vector.putInt(vectorOffset, (int)dataChunk.getLong(i));
           vectorOffset++;
         }
       } else {
@@ -54,8 +53,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(i)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putInt(vectorOffset,
-                (int)dataChunk.getColumnPage().getLong(i));
+            vector.putInt(vectorOffset, (int)dataChunk.getLong(i));
           }
           vectorOffset++;
         }
@@ -63,18 +61,17 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage 
dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
           int currentRow = rowMapping[i];
-          vector.putInt(vectorOffset,
-              (int)dataChunk.getColumnPage().getLong(currentRow));
+          vector.putInt(vectorOffset, (int)dataChunk.getLong(currentRow));
           vectorOffset++;
         }
       } else {
@@ -83,8 +80,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putInt(vectorOffset,
-                (int)dataChunk.getColumnPage().getLong(currentRow));
+            vector.putInt(vectorOffset, (int)dataChunk.getLong(currentRow));
           }
           vectorOffset++;
         }
@@ -95,16 +91,15 @@ public class MeasureDataVectorProcessor {
   public static class ShortMeasureVectorFiller implements MeasureVectorFiller {
 
     @Override
-    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) 
{
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          vector.putShort(vectorOffset,
-              (short) dataChunk.getColumnPage().getLong(i));
+          vector.putShort(vectorOffset, (short) dataChunk.getLong(i));
           vectorOffset++;
         }
       } else {
@@ -112,8 +107,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(i)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putShort(vectorOffset,
-                (short) dataChunk.getColumnPage().getLong(i));
+            vector.putShort(vectorOffset, (short) dataChunk.getLong(i));
           }
           vectorOffset++;
         }
@@ -121,18 +115,17 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage 
dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
           int currentRow = rowMapping[i];
-          vector.putShort(vectorOffset,
-              (short) dataChunk.getColumnPage().getLong(currentRow));
+          vector.putShort(vectorOffset, (short) dataChunk.getLong(currentRow));
           vectorOffset++;
         }
       } else {
@@ -141,8 +134,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putShort(vectorOffset,
-                (short) dataChunk.getColumnPage().getLong(currentRow));
+            vector.putShort(vectorOffset, (short) 
dataChunk.getLong(currentRow));
           }
           vectorOffset++;
         }
@@ -153,16 +145,15 @@ public class MeasureDataVectorProcessor {
   public static class LongMeasureVectorFiller implements MeasureVectorFiller {
 
     @Override
-    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) 
{
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          vector.putLong(vectorOffset,
-              dataChunk.getColumnPage().getLong(i));
+          vector.putLong(vectorOffset, dataChunk.getLong(i));
           vectorOffset++;
         }
       } else {
@@ -170,8 +161,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(i)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putLong(vectorOffset,
-                dataChunk.getColumnPage().getLong(i));
+            vector.putLong(vectorOffset, dataChunk.getLong(i));
           }
           vectorOffset++;
         }
@@ -179,18 +169,17 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage 
dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
           int currentRow = rowMapping[i];
-          vector.putLong(vectorOffset,
-              dataChunk.getColumnPage().getLong(currentRow));
+          vector.putLong(vectorOffset, dataChunk.getLong(currentRow));
           vectorOffset++;
         }
       } else {
@@ -199,8 +188,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putLong(vectorOffset,
-                dataChunk.getColumnPage().getLong(currentRow));
+            vector.putLong(vectorOffset, dataChunk.getLong(currentRow));
           }
           vectorOffset++;
         }
@@ -211,20 +199,20 @@ public class MeasureDataVectorProcessor {
   public static class DecimalMeasureVectorFiller implements 
MeasureVectorFiller {
 
     @Override
-    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) 
{
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
       int newMeasureScale = info.measure.getMeasure().getScale();
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       for (int i = offset; i < len; i++) {
         if (nullBitSet.get(i)) {
           vector.putNull(vectorOffset);
         } else {
           BigDecimal decimal =
-              dataChunk.getColumnPage().getDecimal(i);
+              dataChunk.getDecimal(i);
           if (decimal.scale() < newMeasureScale) {
             decimal = decimal.setScale(newMeasureScale);
           }
@@ -236,21 +224,20 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage 
dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       for (int i = offset; i < len; i++) {
         int currentRow = rowMapping[i];
         if (nullBitSet.get(currentRow)) {
           vector.putNull(vectorOffset);
         } else {
-          BigDecimal decimal =
-              dataChunk.getColumnPage().getDecimal(currentRow);
+          BigDecimal decimal = dataChunk.getDecimal(currentRow);
           if (info.measure.getMeasure().getScale() > decimal.scale()) {
             decimal = decimal.setScale(info.measure.getMeasure().getScale());
           }
@@ -265,16 +252,15 @@ public class MeasureDataVectorProcessor {
   public static class DefaultMeasureVectorFiller implements 
MeasureVectorFiller {
 
     @Override
-    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) 
{
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          vector.putDouble(vectorOffset,
-              dataChunk.getColumnPage().getDouble(i));
+          vector.putDouble(vectorOffset, dataChunk.getDouble(i));
           vectorOffset++;
         }
       } else {
@@ -282,8 +268,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(i)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putDouble(vectorOffset,
-                dataChunk.getColumnPage().getDouble(i));
+            vector.putDouble(vectorOffset, dataChunk.getDouble(i));
           }
           vectorOffset++;
         }
@@ -291,18 +276,17 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage 
dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
-      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
           int currentRow = rowMapping[i];
-          vector.putDouble(vectorOffset,
-              dataChunk.getColumnPage().getDouble(currentRow));
+          vector.putDouble(vectorOffset, dataChunk.getDouble(currentRow));
           vectorOffset++;
         }
       } else {
@@ -311,8 +295,7 @@ public class MeasureDataVectorProcessor {
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
-            vector.putDouble(vectorOffset,
-                dataChunk.getColumnPage().getDouble(currentRow));
+            vector.putDouble(vectorOffset, dataChunk.getDouble(currentRow));
           }
           vectorOffset++;
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 467d3d6..0e1ede8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -93,14 +93,14 @@ public abstract class AbstractBlockletScanner implements 
BlockletScanner {
     }
     scannedResult.setDimensionChunks(dimensionColumnDataChunks);
     MeasureRawColumnChunk[] measureRawColumnChunks = 
blocksChunkHolder.getMeasureRawDataChunk();
-    MeasureColumnDataChunk[][] measureColumnDataChunks =
-        new MeasureColumnDataChunk[measureRawColumnChunks.length][];
+    ColumnPage[][] columnPages =
+        new ColumnPage[measureRawColumnChunks.length][];
     for (int i = 0; i < measureRawColumnChunks.length; i++) {
       if (measureRawColumnChunks[i] != null) {
-        measureColumnDataChunks[i] = 
measureRawColumnChunks[i].convertToMeasureColDataChunks();
+        columnPages[i] = measureRawColumnChunks[i].convertToColumnPage();
       }
     }
-    scannedResult.setMeasureChunks(measureColumnDataChunks);
+    scannedResult.setMeasureChunks(columnPages);
     int[] numberOfRows = null;
     if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
       for (int i = 0; i < dimensionRawColumnChunks.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index 55483d2..919b837 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -23,9 +23,9 @@ import java.util.BitSet;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
@@ -256,7 +256,7 @@ public class FilterScanner extends AbstractBlockletScanner {
     }
     dimensionReadTime1 = System.currentTimeMillis();
     /**
-     * in case projection if the projected measure are not loaded in the 
measureColumnDataChunk
+     * in case projection if the projected measure are not loaded in the 
ColumnPage
      * then loading them
      */
     int[] projectionListMeasureIndexes = 
blockExecutionInfo.getProjectionListMeasureIndexes();
@@ -270,8 +270,8 @@ public class FilterScanner extends AbstractBlockletScanner {
     dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1;
     DimensionColumnDataChunk[][] dimensionColumnDataChunks =
         new 
DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length];
-    MeasureColumnDataChunk[][] measureColumnDataChunks =
-        new 
MeasureColumnDataChunk[measureRawColumnChunks.length][indexesGroup.length];
+    ColumnPage[][] columnPages =
+        new ColumnPage[measureRawColumnChunks.length][indexesGroup.length];
     for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
       if (dimensionRawColumnChunks[i] != null) {
         for (int j = 0; j < indexesGroup.length; j++) {
@@ -282,13 +282,13 @@ public class FilterScanner extends 
AbstractBlockletScanner {
     for (int i = 0; i < measureRawColumnChunks.length; i++) {
       if (measureRawColumnChunks[i] != null) {
         for (int j = 0; j < indexesGroup.length; j++) {
-          measureColumnDataChunks[i][j] = 
measureRawColumnChunks[i].convertToMeasureColDataChunk(j);
+          columnPages[i][j] = measureRawColumnChunks[i].convertToColumnPage(j);
         }
       }
     }
     scannedResult.setDimensionChunks(dimensionColumnDataChunks);
     scannedResult.setIndexes(indexesGroup);
-    scannedResult.setMeasureChunks(measureColumnDataChunks);
+    scannedResult.setMeasureChunks(columnPages);
     scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
     scannedResult.setNumberOfRows(rowCount);
     // adding statistics for carbon scan time

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 34c7709..f4f2693 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -31,7 +31,6 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -54,12 +53,9 @@ public abstract class AbstractDataFileFooterConverter {
    * @param presentMetadataThrift
    * @return wrapper presence meta
    */
-  private static PresenceMeta getPresenceMeta(
+  private static BitSet getPresenceMeta(
       org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    PresenceMeta presenceMeta = new PresenceMeta();
-    
presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    
presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream()));
-    return presenceMeta;
+    return BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index db5010f..ff02bf7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -16,47 +16,34 @@
  */
 package org.apache.carbondata.core.util;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import 
org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletBTreeIndex;
 import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.BlockletInfo2;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ChunkCompressionMeta;
 import org.apache.carbondata.format.ColumnSchema;
 import org.apache.carbondata.format.CompressionCodec;
-import org.apache.carbondata.format.DataChunk;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
 import org.apache.carbondata.format.Encoding;
-import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.format.FileHeader;
 import org.apache.carbondata.format.IndexHeader;
-import org.apache.carbondata.format.PresenceMeta;
 import org.apache.carbondata.format.SegmentInfo;
-import org.apache.carbondata.format.SortState;
 
 /**
  * Util class to convert to thrift metdata classes
@@ -64,54 +51,6 @@ import org.apache.carbondata.format.SortState;
 public class CarbonMetadataUtil {
 
   /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonMetadataUtil.class.getName());
-
-  /**
-   * It converts list of BlockletInfoColumnar to FileFooter thrift objects
-   *
-   * @param infoList
-   * @param cardinalities
-   * @return FileFooter
-   */
-  public static FileFooter convertFileFooter(List<BlockletInfoColumnar> 
infoList,
-      int[] cardinalities, List<ColumnSchema> columnSchemaList, 
SegmentProperties segmentProperties)
-      throws IOException {
-    FileFooter footer = getFileFooter(infoList, cardinalities, 
columnSchemaList);
-    for (BlockletInfoColumnar info : infoList) {
-      footer.addToBlocklet_info_list(getBlockletInfo(info, columnSchemaList, 
segmentProperties));
-    }
-    return footer;
-  }
-
-  /**
-   * Below method will be used to get the file footer object
-   *
-   * @param infoList         blocklet info
-   * @param cardinalities    cardinlaity of dimension columns
-   * @param columnSchemaList column schema list
-   * @return file footer
-   */
-  private static FileFooter getFileFooter(List<BlockletInfoColumnar> infoList, 
int[] cardinalities,
-      List<ColumnSchema> columnSchemaList) {
-    SegmentInfo segmentInfo = new SegmentInfo();
-    segmentInfo.setNum_cols(columnSchemaList.size());
-    
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
-    ColumnarFormatVersion version = 
CarbonProperties.getInstance().getFormatVersion();
-    FileFooter footer = new FileFooter();
-    footer.setVersion(version.number());
-    footer.setNum_rows(getTotalNumberOfRows(infoList));
-    footer.setSegment_info(segmentInfo);
-    footer.setTable_columns(columnSchemaList);
-    for (BlockletInfoColumnar info : infoList) {
-      footer.addToBlocklet_index_list(getBlockletIndex(info));
-    }
-    return footer;
-  }
-
-  /**
    * Below method prepares the file footer object for carbon data file version 
3
    *
    * @param infoList
@@ -153,29 +92,6 @@ public class CarbonMetadataUtil {
     return footer;
   }
 
-  /**
-   * Below method will be used to get the file footer object for
-   *
-   * @param infoList         blocklet info
-   * @param cardinalities    cardinality of each column
-   * @param columnSchemaList column schema list
-   * @param dataChunksOffset data chunks offsets
-   * @param dataChunksLength data chunks length
-   * @return filefooter thrift object
-   */
-  public static FileFooter convertFilterFooter2(List<BlockletInfoColumnar> 
infoList,
-      int[] cardinalities, List<ColumnSchema> columnSchemaList, 
List<List<Long>> dataChunksOffset,
-      List<List<Short>> dataChunksLength) {
-    FileFooter footer = getFileFooter(infoList, cardinalities, 
columnSchemaList);
-    int index = 0;
-    for (BlockletInfoColumnar info : infoList) {
-      footer.addToBlocklet_info_list2(
-          getBlockletInfo2(info, dataChunksOffset.get(index), 
dataChunksLength.get(index)));
-      index++;
-    }
-    return footer;
-  }
-
   private static BlockletIndex getBlockletIndex(
       org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
@@ -199,20 +115,6 @@ public class CarbonMetadataUtil {
    * @param infoList
    * @return
    */
-  private static long getTotalNumberOfRows(List<BlockletInfoColumnar> 
infoList) {
-    long numberOfRows = 0;
-    for (BlockletInfoColumnar info : infoList) {
-      numberOfRows += info.getNumberOfKeys();
-    }
-    return numberOfRows;
-  }
-
-  /**
-   * Get total number of rows for the file.
-   *
-   * @param infoList
-   * @return
-   */
   private static long getNumberOfRowForFooter(List<BlockletInfo3> infoList) {
     long numberOfRows = 0;
     for (BlockletInfo3 info : infoList) {
@@ -221,25 +123,6 @@ public class CarbonMetadataUtil {
     return numberOfRows;
   }
 
-  private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) {
-
-    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
-    for (byte[] max : info.getColumnMaxData()) {
-      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
-    }
-    for (byte[] min : info.getColumnMinData()) {
-      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
-    }
-    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
-    blockletBTreeIndex.setStart_key(info.getStartKey());
-    blockletBTreeIndex.setEnd_key(info.getEndKey());
-
-    BlockletIndex blockletIndex = new BlockletIndex();
-    blockletIndex.setMin_max_index(blockletMinMaxIndex);
-    blockletIndex.setB_tree_index(blockletBTreeIndex);
-    return blockletIndex;
-  }
-
   public static BlockletIndex getBlockletIndex(List<EncodedTablePage> 
encodedTablePageList,
       List<CarbonMeasure> carbonMeasureList) {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
@@ -248,6 +131,7 @@ public class CarbonMetadataUtil {
         encodedTablePageList.get(0).getMeasures());
     byte[][] minCol = stats.getDimensionMinValue().clone();
     byte[][] maxCol = stats.getDimensionMaxValue().clone();
+
     for (EncodedTablePage encodedTablePage : encodedTablePageList) {
       stats = new TablePageStatistics(encodedTablePage.getDimensions(),
           encodedTablePage.getMeasures());
@@ -312,112 +196,6 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the blocklet info object for data version
-   * 2 file
-   *
-   * @param blockletInfoColumnar blocklet info
-   * @param dataChunkOffsets     data chunks offsets
-   * @param dataChunksLength     data chunks length
-   * @return blocklet info version 2
-   */
-  private static BlockletInfo2 getBlockletInfo2(BlockletInfoColumnar 
blockletInfoColumnar,
-      List<Long> dataChunkOffsets, List<Short> dataChunksLength) {
-    BlockletInfo2 blockletInfo = new BlockletInfo2();
-    blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys());
-    blockletInfo.setColumn_data_chunks_length(dataChunksLength);
-    blockletInfo.setColumn_data_chunks_offsets(dataChunkOffsets);
-    return blockletInfo;
-  }
-
-  private static BlockletInfo getBlockletInfo(BlockletInfoColumnar 
blockletInfoColumnar,
-      List<ColumnSchema> columnSchema, SegmentProperties segmentProperties) 
throws IOException {
-
-    BlockletInfo blockletInfo = new BlockletInfo();
-    blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys());
-
-    List<DataChunk> colDataChunks = new ArrayList<DataChunk>();
-    int j = 0;
-    int aggregateIndex = 0;
-    boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
-    boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock();
-    for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) {
-      DataChunk dataChunk = new DataChunk();
-      dataChunk.setChunk_meta(getSnappyChunkCompressionMeta());
-      List<Encoding> encodings = new ArrayList<Encoding>();
-      if (containsEncoding(i, Encoding.DICTIONARY, columnSchema, 
segmentProperties)) {
-        encodings.add(Encoding.DICTIONARY);
-      }
-      if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchema, 
segmentProperties)) {
-        encodings.add(Encoding.DIRECT_DICTIONARY);
-      }
-      // TODO : Once schema PR is merged and information needs to be passed
-      // here.
-      dataChunk.setColumn_ids(new ArrayList<Integer>());
-      dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
-      dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]);
-      if (aggKeyBlock[i]) {
-        
dataChunk.setRle_page_offset(blockletInfoColumnar.getDataIndexMapOffsets()[aggregateIndex]);
-        
dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
-        encodings.add(Encoding.RLE);
-        aggregateIndex++;
-      }
-      dataChunk
-          .setSort_state(isSortedKeyColumn[i] ? SortState.SORT_EXPLICIT : 
SortState.SORT_NATIVE);
-
-      if (!isSortedKeyColumn[i]) {
-        
dataChunk.setRowid_page_offset(blockletInfoColumnar.getKeyBlockIndexOffSets()[j]);
-        
dataChunk.setRowid_page_length(blockletInfoColumnar.getKeyBlockIndexLength()[j]);
-        if (!encodings.contains(Encoding.INVERTED_INDEX)) {
-          encodings.add(Encoding.INVERTED_INDEX);
-        }
-        j++;
-      }
-
-      // TODO : Right now the encodings are happening at runtime. change as per
-      // this encoders.
-      dataChunk.setEncoders(encodings);
-
-      colDataChunks.add(dataChunk);
-    }
-
-    for (int i = 0; i < blockletInfoColumnar.getMeasureLength().length; i++) {
-      DataChunk dataChunk = new DataChunk();
-      dataChunk.setChunk_meta(getSnappyChunkCompressionMeta());
-      dataChunk.setRowMajor(false);
-      // TODO : Once schema PR is merged and information needs to be passed
-      // here.
-      dataChunk.setColumn_ids(new ArrayList<Integer>());
-      
dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
-      
dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]);
-      // TODO : Right now the encodings are happening at runtime. change as per
-      // this encoders.
-      List<Encoding> encodings = new ArrayList<Encoding>();
-      encodings.add(Encoding.DELTA);
-      dataChunk.setEncoders(encodings);
-      // TODO writing dummy presence meta need to set actual presence
-      // meta
-      PresenceMeta presenceMeta = new PresenceMeta();
-      presenceMeta.setPresent_bit_streamIsSet(true);
-      presenceMeta
-          
.setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray());
-      dataChunk.setPresence(presenceMeta);
-      // TODO : PresenceMeta needs to be implemented and set here
-      // dataChunk.setPresence(new PresenceMeta());
-      // TODO : Need to write ValueCompression meta here.
-      List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
-      encoderMetaList.add(
-          ByteBuffer.wrap(
-              serializeEncoderMeta(
-                      
blockletInfoColumnar.getEncodedTablePage().getMeasure(i).getMetaData())));
-      dataChunk.setEncoder_meta(encoderMetaList);
-      colDataChunks.add(dataChunk);
-    }
-    blockletInfo.setColumn_data_chunks(colDataChunks);
-
-    return blockletInfo;
-  }
-
-  /**
    * @param blockIndex
    * @param encoding
    * @param columnSchemas
@@ -439,15 +217,6 @@ public class CarbonMetadataUtil {
     return false;
   }
 
-  private static byte[] serializeEncoderMeta(ValueEncoderMeta encoderMeta) 
throws IOException {
-    // TODO : should remove the unnecessary fields.
-    ByteArrayOutputStream aos = new ByteArrayOutputStream();
-    ObjectOutputStream objStream = new ObjectOutputStream(aos);
-    objStream.writeObject(encoderMeta);
-    objStream.close();
-    return aos.toByteArray();
-  }
-
   /**
    * Right now it is set to default values. We may use this in future
    */
@@ -459,18 +228,6 @@ public class CarbonMetadataUtil {
     return chunkCompressionMeta;
   }
 
-
-  /**
-   * Right now it is set to default values. We may use this in future
-   */
-  private static ChunkCompressionMeta getChunkCompressionMeta() {
-    ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();
-    chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY);
-    chunkCompressionMeta.setTotal_compressed_size(0);
-    chunkCompressionMeta.setTotal_uncompressed_size(0);
-    return chunkCompressionMeta;
-  }
-
   /**
    * Below method will be used to get the index header
    *
@@ -522,89 +279,6 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the data chunk object for all the columns
-   *
-   * @param blockletInfoColumnar blocklet info
-   * @param columnSchema        list of columns
-   * @param segmentProperties    segment properties
-   * @return list of data chunks
-   * @throws IOException
-   */
-  public static List<DataChunk2> getDatachunk2(BlockletInfoColumnar 
blockletInfoColumnar,
-      List<ColumnSchema> columnSchema, SegmentProperties segmentProperties) 
throws IOException {
-    List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
-    int rowIdIndex = 0;
-    int aggregateIndex = 0;
-    boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
-    boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock();
-    for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) {
-      DataChunk2 dataChunk = new DataChunk2();
-      dataChunk.setChunk_meta(getChunkCompressionMeta());
-      List<Encoding> encodings = new ArrayList<Encoding>();
-      if (containsEncoding(i, Encoding.DICTIONARY, columnSchema, 
segmentProperties)) {
-        encodings.add(Encoding.DICTIONARY);
-      }
-      if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchema, 
segmentProperties)) {
-        encodings.add(Encoding.DIRECT_DICTIONARY);
-      }
-      // TODO : Once schema PR is merged and information needs to be passed
-      // here.
-      dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
-      if (aggKeyBlock[i]) {
-        
dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
-        encodings.add(Encoding.RLE);
-        aggregateIndex++;
-      }
-      dataChunk
-          .setSort_state(isSortedKeyColumn[i] ? SortState.SORT_EXPLICIT : 
SortState.SORT_NATIVE);
-
-      if (!isSortedKeyColumn[i]) {
-        
dataChunk.setRowid_page_length(blockletInfoColumnar.getKeyBlockIndexLength()[rowIdIndex]);
-        encodings.add(Encoding.INVERTED_INDEX);
-        rowIdIndex++;
-      }
-
-      // TODO : Right now the encodings are happening at runtime. change as per
-      // this encoders.
-      dataChunk.setEncoders(encodings);
-
-      colDataChunks.add(dataChunk);
-    }
-
-    for (int i = 0; i < blockletInfoColumnar.getMeasureLength().length; i++) {
-      DataChunk2 dataChunk = new DataChunk2();
-      dataChunk.setChunk_meta(getChunkCompressionMeta());
-      dataChunk.setRowMajor(false);
-      // TODO : Once schema PR is merged and information needs to be passed
-      // here.
-      
dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
-      // TODO : Right now the encodings are happening at runtime. change as per
-      // this encoders.
-      List<Encoding> encodings = new ArrayList<Encoding>();
-      encodings.add(Encoding.DELTA);
-      dataChunk.setEncoders(encodings);
-      // TODO writing dummy presence meta need to set actual presence
-      // meta
-      PresenceMeta presenceMeta = new PresenceMeta();
-      presenceMeta.setPresent_bit_streamIsSet(true);
-      
presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
-          
.compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
-      dataChunk.setPresence(presenceMeta);
-      // TODO : PresenceMeta needs to be implemented and set here
-      // dataChunk.setPresence(new PresenceMeta());
-      // TODO : Need to write ValueCompression meta here.
-      List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
-      encoderMetaList.add(
-          ByteBuffer.wrap(
-              serializeEncoderMeta(
-                      
blockletInfoColumnar.getEncodedTablePage().getMeasure(i).getMetaData())));
-      dataChunk.setEncoder_meta(encoderMetaList);
-      colDataChunks.add(dataChunk);
-    }
-    return colDataChunks;
-  }
-
-  /**
    * return DataChunk3 that contains the input DataChunk2 list
    */
   public static DataChunk3 getDataChunk3(List<DataChunk2> dataChunksList) {
@@ -634,7 +308,7 @@ public class CarbonMetadataUtil {
       int columnIndex) throws IOException {
     List<DataChunk2> dataChunksList = new 
ArrayList<>(encodedTablePageList.size());
     for (EncodedTablePage encodedTablePage : encodedTablePageList) {
-      
dataChunksList.add(encodedTablePage.getDimension(columnIndex).getDataChunk2());
+      
dataChunksList.add(encodedTablePage.getDimension(columnIndex).getPageMetadata());
     }
     return CarbonMetadataUtil.getDataChunk3(dataChunksList);
   }
@@ -647,7 +321,7 @@ public class CarbonMetadataUtil {
       int columnIndex) throws IOException {
     List<DataChunk2> dataChunksList = new 
ArrayList<>(encodedTablePageList.size());
     for (EncodedTablePage encodedTablePage : encodedTablePageList) {
-      
dataChunksList.add(encodedTablePage.getMeasure(columnIndex).getDataChunk2());
+      
dataChunksList.add(encodedTablePage.getMeasure(columnIndex).getPageMetadata());
     }
     return CarbonMetadataUtil.getDataChunk3(dataChunksList);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 1db1cca..5a1e40f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
@@ -1335,6 +1336,7 @@ public final class CarbonUtil {
       stream.flush();
       thriftByteArray = stream.toByteArray();
     } catch (TException | IOException e) {
+      LOGGER.error("Error while converting to byte array from thrift object: " 
+ e.getMessage());
       closeStreams(stream);
     } finally {
       closeStreams(stream);
@@ -1957,10 +1959,51 @@ public final class CarbonUtil {
     }
   }
 
-  public static void requireNotNull(Object obj) {
-    if (obj == null) {
-      throw new IllegalArgumentException("parameter not be null");
+
+  /**
+   * convert value to byte array
+   */
+  public static byte[] getValueAsBytes(DataType dataType, Object value) {
+    ByteBuffer b;
+    switch (dataType) {
+      case BYTE:
+        b = ByteBuffer.allocate(8);
+        b.putLong((byte) value);
+        b.flip();
+        return b.array();
+      case SHORT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((short) value);
+        b.flip();
+        return b.array();
+      case INT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((int) value);
+        b.flip();
+        return b.array();
+      case LONG:
+        b = ByteBuffer.allocate(8);
+        b.putLong((long) value);
+        b.flip();
+        return b.array();
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((double) value);
+        b.flip();
+        return b.array();
+      case DECIMAL:
+        return DataTypeUtil.bigDecimalToByte((BigDecimal)value);
+      case BYTE_ARRAY:
+        return (byte[]) value;
+      case STRING:
+      case TIMESTAMP:
+      case DATE:
+        return (byte[]) value;
+      default:
+        throw new IllegalArgumentException("Invalid data type: " + dataType);
     }
   }
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java 
b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index 4afa9b6..2adfadc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -433,7 +433,7 @@ public class NodeHolder {
     int numDimensions = encodedTablePage.getNumDimensions();
     byte[][] keyArray = new byte[numDimensions][];
     for (int i = 0; i < numDimensions; i++) {
-      keyArray[i] = encodedTablePage.getDimension(i).getEncodedData();
+      keyArray[i] = encodedTablePage.getDimension(i).getEncodedData().array();
     }
     return keyArray;
   }
@@ -442,7 +442,7 @@ public class NodeHolder {
     int numMeasures = encodedTablePage.getNumMeasures();
     byte[][] dataArray = new byte[numMeasures][];
     for (int i = 0; i < numMeasures; i++) {
-      dataArray[i] = encodedTablePage.getMeasure(i).getEncodedData();
+      dataArray[i] = encodedTablePage.getMeasure(i).getEncodedData().array();
     }
     return dataArray;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
deleted file mode 100644
index b560352..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.writer;
-
-import java.io.IOException;
-
-import org.apache.carbondata.format.FileFooter;
-
-/**
- * Writes metadata block to the fact table file in thrift
- * format org.apache.carbondata.format.FileFooter
- */
-public class CarbonFooterWriter {
-
-  // Fact file path
-  private String filePath;
-
-  public CarbonFooterWriter(String filePath) {
-    this.filePath = filePath;
-  }
-
-  /**
-   * It writes FileFooter thrift format object to file.
-   *
-   * @param footer
-   * @param currentPosition At where this metadata is going to be written.
-   * @throws IOException
-   */
-  public void writeFooter(FileFooter footer, long currentPosition) throws 
IOException {
-
-    ThriftWriter thriftWriter = openThriftWriter(filePath);
-    try {
-      thriftWriter.write(footer);
-      thriftWriter.writeOffset(currentPosition);
-    } finally {
-      thriftWriter.close();
-    }
-  }
-
-  /**
-   * open thrift writer for writing dictionary chunk/meta object
-   */
-  private ThriftWriter openThriftWriter(String filePath) throws IOException {
-    // create thrift writer instance
-    ThriftWriter thriftWriter = new ThriftWriter(filePath, true);
-    // open the file stream
-    thriftWriter.open();
-    return thriftWriter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
 
b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
index 251376e..9e17717 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
@@ -21,6 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
 import 
org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -41,7 +43,7 @@ public class RLECodecSuite {
     TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws 
IOException, MemoryException {
       this.inputByteData = inputByteData;
       inputBytePage = ColumnPage.newPage(DataType.BYTE, inputByteData.length);
-      
inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataType.BYTE,
 inputByteData.length, 0, 0));
+      
inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataType.BYTE,
 0, 0));
       for (int i = 0; i < inputByteData.length; i++) {
         inputBytePage.putData(i, inputByteData[i]);
       }
@@ -111,9 +113,10 @@ public class RLECodecSuite {
 
   private void testBytePageEncode(ColumnPage inputPage, byte[] 
expectedEncodedBytes)
       throws IOException, MemoryException {
-    RLECodec codec = new RLECodec(DataType.BYTE, inputPage.getPageSize());
-    EncodedColumnPage out = codec.encode(inputPage);
-    byte[] encoded = out.getEncodedData();
+    RLECodec codec = new RLECodec();
+    ColumnPageEncoder encoder = codec.createEncoder(null);
+    EncodedColumnPage result = encoder.encode(inputPage);
+    byte[] encoded = result.getEncodedData().array();
     assertEquals(expectedEncodedBytes.length, encoded.length);
     for (int i = 0; i < encoded.length; i++) {
       assertEquals(expectedEncodedBytes[i], encoded[i]);
@@ -121,8 +124,10 @@ public class RLECodecSuite {
   }
 
   private void testBytePageDecode(byte[] inputBytes, byte[] 
expectedDecodedBytes) throws IOException, MemoryException {
-    RLECodec codec = new RLECodec(DataType.BYTE, expectedDecodedBytes.length);
-    ColumnPage page = codec.decode(inputBytes, 0, inputBytes.length);
+    RLECodec codec = new RLECodec();
+    RLEEncoderMeta meta = new RLEEncoderMeta(DataType.BYTE, 
expectedDecodedBytes.length, null);
+    ColumnPageDecoder decoder = codec.createDecoder(meta);
+    ColumnPage page = decoder.decode(inputBytes, 0, inputBytes.length);
     byte[] decoded = page.getBytePage();
     assertEquals(expectedDecodedBytes.length, decoded.length);
     for (int i = 0; i < decoded.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/test/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollectorTest.java
 
b/core/src/test/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollectorTest.java
index 9a9a773..55de2e2 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollectorTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollectorTest.java
@@ -99,17 +99,17 @@ public class RawBasedResultCollectorTest {
 //        return new byte[][] { { 1, 2 }, { 1, 2 } };
 //      }
 //
-//      @SuppressWarnings("unused") @Mock public MeasureColumnDataChunk 
getMeasureChunk(int ordinal) {
-//        MeasureColumnDataChunk measureColumnDataChunk = new 
MeasureColumnDataChunk();
+//      @SuppressWarnings("unused") @Mock public ColumnPage 
getMeasureChunk(int ordinal) {
+//        ColumnPage ColumnPage = new ColumnPage();
 //        PresenceMeta presenceMeta = new PresenceMeta();
 //        BitSet bitSet = new BitSet();
 //        bitSet.set(1);
 //        presenceMeta.setBitSet(bitSet);
-//        measureColumnDataChunk.setNullValueIndexHolder(presenceMeta);
+//        ColumnPage.setNullValueIndexHolder(presenceMeta);
 //        CarbonReadDataHolder carbonReadDataHolder = new 
CarbonReadDataHolder();
 //        carbonReadDataHolder.setReadableLongValues(new long[] { 1 });
-//        measureColumnDataChunk.setColumnPage(carbonReadDataHolder);
-//        return measureColumnDataChunk;
+//        ColumnPage.setColumnPage(carbonReadDataHolder);
+//        return ColumnPage;
 //      }
 //    };
 //

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index b953d45..3e1b63b 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -20,26 +20,27 @@ package org.apache.carbondata.core.util;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.metadata.CodecMetaFactory;
-import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import 
org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveEncoderMeta;
+import org.apache.carbondata.core.datastore.page.key.TablePageKey;
+import 
org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ColumnSchema;
 import org.apache.carbondata.format.DataChunk;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataType;
 import org.apache.carbondata.format.Encoding;
-import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.format.SegmentInfo;
 
@@ -49,8 +50,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static junit.framework.TestCase.assertEquals;
-import static 
org.apache.carbondata.core.util.CarbonMetadataUtil.convertFileFooter;
+import static 
org.apache.carbondata.core.util.CarbonMetadataUtil.convertFileFooterVersion3;
 import static 
org.apache.carbondata.core.util.CarbonMetadataUtil.getBlockIndexInfo;
+import static 
org.apache.carbondata.core.util.CarbonMetadataUtil.getBlockletIndex;
 import static 
org.apache.carbondata.core.util.CarbonMetadataUtil.getIndexHeader;
 
 public class CarbonMetadataUtilTest {
@@ -118,11 +120,11 @@ public class CarbonMetadataUtilTest {
     blockletInfoList.add(blockletInfo);
     blockletInfoList.add(blockletInfo);
 
-    ValueEncoderMeta meta = CodecMetaFactory.createMeta();
+    ValueEncoderMeta meta = CarbonTestUtil.createValueEncoderMeta();
     meta.setDecimal(5);
     meta.setMinValue(objMinArr);
     meta.setMaxValue(objMaxArr);
-    meta.setType(ColumnPageCodecMeta.DOUBLE_MEASURE);
+    meta.setType(AdaptiveEncoderMeta.DOUBLE_MEASURE);
 
     List<Encoding> encoders = new ArrayList<>();
     encoders.add(Encoding.INVERTED_INDEX);
@@ -173,17 +175,7 @@ public class CarbonMetadataUtilTest {
   }
 
   @Test public void testConvertFileFooter() throws Exception {
-    int[] intArr = { 1, 2, 3, 4, 5 };
-    boolean[] boolArr = { true, true, true, true, true };
-    long[] longArr = { 1, 2, 3, 4, 5 };
-    byte[][] maxByteArr = { { 1, 2 }, { 3, 4 }, { 5, 6 }, { 2, 4 }, { 1, 2 } };
     int[] cardinality = { 1, 2, 3, 4, 5 };
-    org.apache.carbondata.core.metadata.datatype.DataType[] dataType = {
-        org.apache.carbondata.core.metadata.datatype.DataType.INT,
-        org.apache.carbondata.core.metadata.datatype.DataType.INT,
-        org.apache.carbondata.core.metadata.datatype.DataType.INT,
-        org.apache.carbondata.core.metadata.datatype.DataType.INT,
-        org.apache.carbondata.core.metadata.datatype.DataType.INT };
 
     org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema 
colSchema =
         new 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema();
@@ -196,132 +188,54 @@ public class CarbonMetadataUtilTest {
 
     SegmentProperties segmentProperties = new 
SegmentProperties(columnSchemaList, cardinality);
 
-    final List<Integer> integerList = new ArrayList<>();
-    integerList.add(new Integer("1"));
-    integerList.add(new Integer("2"));
-
-    ValueEncoderMeta[] metas = new ValueEncoderMeta[6];
-    for (int i = 0; i < metas.length; i++) {
-      metas[i] = CodecMetaFactory.createMeta();
-      metas[i].setMinValue(objMinArr[i]);
-      metas[i].setMaxValue(objMaxArr[i]);
-      metas[i].setDecimal(objDecimal[i]);
-      metas[i].setType(ColumnPageCodecMeta.BIG_INT_MEASURE);
-    }
-
-    BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();
-
-    final ValueEncoderMeta meta = CodecMetaFactory.createMeta();
-
-    new MockUp<ColumnPageCodecMeta>() {
-      @SuppressWarnings("unused") @Mock
-      public byte[] serialize() {
-        return new byte[]{1,2};
-      }
-      @SuppressWarnings("unused") @Mock
-      public byte[] getMaxAsBytes() {
-        return new byte[]{1,2};
-      }
-      @SuppressWarnings("unused") @Mock
-      public byte[] getMinAsBytes() {
-        return new byte[]{1,2};
-      }
+    final EncodedColumnPage measure = new EncodedColumnPage(new DataChunk2(), 
new byte[]{0,1},
+        PrimitivePageStatsCollector.newInstance(
+        org.apache.carbondata.core.metadata.datatype.DataType.BYTE, 0, 0));
+    new MockUp<EncodedTablePage>() {
       @SuppressWarnings("unused") @Mock
-      public org.apache.carbondata.core.metadata.datatype.DataType 
getSrcDataType() {
-        return org.apache.carbondata.core.metadata.datatype.DataType.DOUBLE;
+      public EncodedColumnPage getMeasure(int measureIndex) {
+        return measure;
       }
     };
 
-    new MockUp<EncodedMeasurePage>() {
+    new MockUp<TablePageKey>() {
       @SuppressWarnings("unused") @Mock
-      public ValueEncoderMeta getMetaData() {
-        return meta;
+      public byte[] serializeStartKey() {
+        return new byte[]{1, 2};
       }
-    };
 
-    final EncodedMeasurePage measure = new EncodedMeasurePage(6, new 
byte[]{0,1}, meta,
-        new BitSet());
-    new MockUp<EncodedTablePage>() {
       @SuppressWarnings("unused") @Mock
-      public EncodedMeasurePage getMeasure(int measureIndex) {
-        return measure;
+      public byte[] serializeEndKey() {
+        return new byte[]{1, 2};
       }
     };
 
-    BitSet[] bitSetArr = new BitSet[6];
-    bitSetArr[0] = new BitSet();
-    bitSetArr[1] = new BitSet();
-    bitSetArr[2] = new BitSet();
-    bitSetArr[3] = new BitSet();
-    bitSetArr[4] = new BitSet();
-    bitSetArr[5] = new BitSet();
-    blockletInfoColumnar.setColumnMaxData(maxByteArr);
-    blockletInfoColumnar.setColumnMinData(maxByteArr);
-    blockletInfoColumnar.setKeyLengths(intArr);
-    blockletInfoColumnar.setKeyOffSets(longArr);
-    blockletInfoColumnar.setDataIndexMapOffsets(longArr);
-    blockletInfoColumnar.setAggKeyBlock(boolArr);
-    blockletInfoColumnar.setDataIndexMapLength(intArr);
-    blockletInfoColumnar.setIsSortedKeyColumn(boolArr);
-    blockletInfoColumnar.setKeyOffSets(longArr);
-    blockletInfoColumnar.setMeasureLength(intArr);
-    blockletInfoColumnar.setMeasureOffset(longArr);
-    blockletInfoColumnar.setMeasureNullValueIndex(bitSetArr);
-    EncodedTablePage encodedTablePage = EncodedTablePage.newEmptyInstance();
-    blockletInfoColumnar.setEncodedTablePage(encodedTablePage);
-
-    BlockletInfoColumnar blockletInfoColumnar1 = new BlockletInfoColumnar();
-    blockletInfoColumnar1.setColumnMaxData(maxByteArr);
-    blockletInfoColumnar1.setColumnMinData(maxByteArr);
-    blockletInfoColumnar1.setKeyLengths(intArr);
-    blockletInfoColumnar1.setKeyOffSets(longArr);
-    blockletInfoColumnar1.setDataIndexMapOffsets(longArr);
-    blockletInfoColumnar1.setAggKeyBlock(boolArr);
-    blockletInfoColumnar1.setDataIndexMapLength(intArr);
-    blockletInfoColumnar1.setIsSortedKeyColumn(boolArr);
-    blockletInfoColumnar1.setKeyOffSets(longArr);
-    blockletInfoColumnar1.setMeasureLength(intArr);
-    blockletInfoColumnar1.setMeasureOffset(longArr);
-    blockletInfoColumnar1.setMeasureNullValueIndex(bitSetArr);
-    blockletInfoColumnar1.setEncodedTablePage(encodedTablePage);
-
-    List<BlockletInfoColumnar> blockletInfoColumnarList = new ArrayList<>();
-    blockletInfoColumnarList.add(blockletInfoColumnar);
-    blockletInfoColumnarList.add(blockletInfoColumnar1);
-
-    new MockUp<CarbonUtil>() {
-      @SuppressWarnings("unused") @Mock public List<Integer> 
convertToIntegerList(int[] array) {
-        return integerList;
-      }
-    };
+    TablePageKey key = new TablePageKey(3, null, segmentProperties, false);
+    EncodedTablePage encodedTablePage = EncodedTablePage.newInstance(3, new 
EncodedColumnPage[0], new EncodedColumnPage[0],
+        key);
 
-    final Set<Integer> integerSet = new HashSet<>();
-    integerSet.add(new Integer("1"));
-    integerSet.add(new Integer("2"));
-    new MockUp<SegmentProperties>() {
-      @SuppressWarnings("unused") @Mock
-      public Set<Integer> getDimensionOrdinalForBlock(int blockIndex) {
-        return integerSet;
-      }
-    };
+    List<EncodedTablePage> encodedTablePageList = new ArrayList<>();
+    encodedTablePageList.add(encodedTablePage);
 
-    SegmentInfo segmentInfo = new SegmentInfo();
-    segmentInfo.setNum_cols(4);
-    segmentInfo.setColumn_cardinalities(integerList);
+    BlockletInfo3 blockletInfoColumnar1 = new BlockletInfo3();
 
-    FileFooter fileFooter = new FileFooter();
-    fileFooter.setNum_rows(4);
-    fileFooter.setSegment_info(segmentInfo);
+    List<BlockletInfo3> blockletInfoColumnarList = new ArrayList<>();
+    blockletInfoColumnarList.add(blockletInfoColumnar1);
 
     byte[] byteMaxArr = "1".getBytes();
     byte[] byteMinArr = "2".getBytes();
 
+    BlockletIndex index = getBlockletIndex(encodedTablePageList, 
segmentProperties.getMeasures());
+    List<BlockletIndex> indexList = new ArrayList<>();
+    indexList.add(index);
+
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
     blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(byteMaxArr));
     blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(byteMinArr));
-    FileFooter result = convertFileFooter(blockletInfoColumnarList, 
cardinality, columnSchemas,
-        segmentProperties);
-    assertEquals(result.getTable_columns(), columnSchemas);
+    FileFooter3 footer = convertFileFooterVersion3(blockletInfoColumnarList,
+        indexList,
+        cardinality, 2);
+    assertEquals(footer.getBlocklet_index_list(), indexList);
 
   }
 

Reply via email to