Optimizing decimal datatype

Optimized big decimal to use less space

Fixed comments


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/12911629
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/12911629
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/12911629

Branch: refs/heads/metadata
Commit: 12911629c84aabf2400fddb74c97a4ec54a533e8
Parents: 403c3d9
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Wed Jun 28 16:03:03 2017 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Tue Jul 11 15:50:01 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/datastore/TableSpec.java    |  23 +-
 .../AbstractMeasureChunkReaderV2V3Format.java   |  15 +-
 ...CompressedMeasureChunkFileBasedReaderV1.java |   2 +-
 .../core/datastore/page/ColumnPage.java         | 107 ++++++---
 .../core/datastore/page/LazyColumnPage.java     |  13 +-
 .../datastore/page/SafeFixLengthColumnPage.java |  14 +-
 .../datastore/page/SafeVarLengthColumnPage.java |  11 +-
 .../page/UnsafeFixLengthColumnPage.java         |  13 +-
 .../page/UnsafeVarLengthColumnPage.java         |  19 +-
 .../datastore/page/VarLengthColumnPageBase.java |  76 +++++-
 .../page/encoding/AdaptiveIntegerCodec.java     |  11 +-
 .../page/encoding/DefaultEncodingStrategy.java  |   8 +-
 .../page/encoding/DeltaIntegerCodec.java        |  11 +-
 .../page/encoding/DirectCompressCodec.java      |  16 +-
 .../page/encoding/EncodingStrategy.java         |   4 +-
 .../page/statistics/ColumnPageStatsVO.java      |  18 +-
 .../datatype/DecimalConverterFactory.java       | 237 +++++++++++++++++++
 .../core/util/CarbonMetadataUtil.java           |  28 +++
 .../newflow/sort/SortStepRowUtil.java           |  17 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        |   2 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   3 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |  14 +-
 .../sortdata/SortTempFileChunkHolder.java       |   3 +-
 .../carbondata/processing/store/TablePage.java  |  14 +-
 24 files changed, 557 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 365f1ca..650c2a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -188,18 +188,27 @@ public class TableSpec {
     // data type of each measure, in schema order
     private DataType[] types;
 
+    private int[] scale;
+
+    private int[] precision;
+
     MeasureSpec(List<CarbonMeasure> measures) {
       fieldName = new String[measures.size()];
       types = new DataType[measures.size()];
+      scale = new int[measures.size()];
+      precision = new int[measures.size()];
       int i = 0;
       for (CarbonMeasure measure: measures) {
-        add(i++, measure.getColName(), measure.getDataType());
+        add(i++, measure.getColName(), measure.getDataType(), 
measure.getScale(),
+            measure.getPrecision());
       }
     }
 
-    private void add(int index, String name, DataType type) {
+    private void add(int index, String name, DataType type, int scale, int 
precision) {
       fieldName[index] = name;
       types[index] = type;
+      this.scale[index] = scale;
+      this.precision[index] = precision;
     }
 
     /**
@@ -210,6 +219,16 @@ public class TableSpec {
       return types[index];
     }
 
+    public int getScale(int index) {
+      assert (index >= 0 && index < precision.length);
+      return scale[index];
+    }
+
+    public int getPrecision(int index) {
+      assert (index >= 0 && index < precision.length);
+      return precision[index];
+    }
+
     /**
      * return number of measures
      */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
index 2f5af87..c35cefb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.List;
 
@@ -132,10 +133,18 @@ public abstract class 
AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu
   protected ColumnPage decodeMeasure(MeasureRawColumnChunk 
measureRawColumnChunk,
       DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException {
     // for measure, it should have only one ValueEncoderMeta
-    assert (measureColumnChunk.getEncoder_meta().size() == 1);
-    byte[] encodedMeta = measureColumnChunk.getEncoder_meta().get(0).array();
+    List<ByteBuffer> encoder_meta = measureColumnChunk.getEncoder_meta();
+    assert (encoder_meta.size() > 0);
+    byte[] encodedMeta = encoder_meta.get(0).array();
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV3(encodedMeta);
-    ColumnPageCodec codec = strategy.createCodec(meta);
+    int scale = -1;
+    int precision = -1;
+    if (encoder_meta.size() > 1) {
+      ByteBuffer decimalInfo = encoder_meta.get(1);
+      scale = decimalInfo.getInt();
+      precision = decimalInfo.getInt();
+    }
+    ColumnPageCodec codec = strategy.createCodec(meta, scale, precision);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, 
measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 6e59b9f..8f69a7c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -98,7 +98,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends 
AbstractMeasureChun
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
 
-    ColumnPageCodec codec = strategy.createCodec(meta);
+    ColumnPageCodec codec = strategy.createCodec(meta, -1, -1);
     ColumnPage page = codec.decode(measureRawColumnChunk.getRawData().array(),
         measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 730243c..2c43165 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -26,8 +26,8 @@ import 
org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
 import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
 import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE_ARRAY;
@@ -43,6 +43,8 @@ public abstract class ColumnPage {
 
   protected final int pageSize;
   protected final DataType dataType;
+  protected int scale;
+  protected int precision;
 
   // statistics of this column page
   private ColumnPageStatsVO stats;
@@ -50,15 +52,22 @@ public abstract class ColumnPage {
   // The index of the rowId whose value is null, will be set to 1
   private BitSet nullBitSet;
 
+  protected DecimalConverterFactory.DecimalConverter decimalConverter;
+
   protected static final boolean unsafe = 
Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
           CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT));
 
-  protected ColumnPage(DataType dataType, int pageSize) {
+  protected ColumnPage(DataType dataType, int pageSize, int scale, int 
precision) {
     this.dataType = dataType;
     this.pageSize = pageSize;
+    this.scale = scale;
+    this.precision = precision;
     this.stats = new ColumnPageStatsVO(dataType);
     this.nullBitSet = new BitSet(pageSize);
+    if (dataType == DECIMAL) {
+      decimalConverter = 
DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
+    }
   }
 
   public DataType getDataType() {
@@ -73,54 +82,62 @@ public abstract class ColumnPage {
     return pageSize;
   }
 
-  private static ColumnPage createVarLengthPage(DataType dataType, int 
pageSize) {
+  private static ColumnPage createVarLengthPage(DataType dataType, int 
pageSize, int scale,
+      int precision) {
     if (unsafe) {
       try {
-        return new UnsafeVarLengthColumnPage(dataType, pageSize);
+        return new UnsafeVarLengthColumnPage(dataType, pageSize, scale, 
precision);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeVarLengthColumnPage(dataType, pageSize);
+      return new SafeVarLengthColumnPage(dataType, pageSize, scale, precision);
     }
   }
 
-  private static ColumnPage createFixLengthPage(DataType dataType, int 
pageSize) {
+  private static ColumnPage createFixLengthPage(DataType dataType, int 
pageSize, int scale,
+      int precision) {
     if (unsafe) {
       try {
-        return new UnsafeFixLengthColumnPage(dataType, pageSize);
+        return new UnsafeFixLengthColumnPage(dataType, pageSize, scale, 
precision);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeFixLengthColumnPage(dataType, pageSize);
+      return new SafeFixLengthColumnPage(dataType, pageSize, scale, pageSize);
     }
   }
 
-  private static ColumnPage createPage(DataType dataType, int pageSize) {
+  private static ColumnPage createPage(DataType dataType, int pageSize, int 
scale, int precision) {
     if (dataType.equals(BYTE_ARRAY) | dataType.equals(DECIMAL)) {
-      return createVarLengthPage(dataType, pageSize);
+      return createVarLengthPage(dataType, pageSize, scale, precision);
     } else {
-      return createFixLengthPage(dataType, pageSize);
+      return createFixLengthPage(dataType, pageSize, scale, precision);
     }
   }
 
-  public static ColumnPage newVarLengthPath(DataType dataType, int pageSize) {
+  public static ColumnPage newVarLengthPage(DataType dataType, int pageSize) {
+    return newVarLengthPage(dataType, pageSize, -1, -1);
+  }
+
+  private static ColumnPage newVarLengthPage(DataType dataType, int pageSize, 
int scale,
+      int precision) {
     if (unsafe) {
       try {
-        return new UnsafeVarLengthColumnPage(dataType, pageSize);
+        return new UnsafeVarLengthColumnPage(dataType, pageSize, scale, 
precision);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeVarLengthColumnPage(dataType, pageSize);
+      return new SafeVarLengthColumnPage(dataType, pageSize, scale, precision);
     }
   }
 
   /**
    * Create a new page of dataType and number of row = pageSize
    */
-  public static ColumnPage newPage(DataType dataType, int pageSize) throws 
MemoryException {
+  public static ColumnPage newPage(DataType dataType, int pageSize, int scale, 
int precision)
+      throws MemoryException {
     ColumnPage instance;
     if (unsafe) {
       switch (dataType) {
@@ -131,11 +148,11 @@ public abstract class ColumnPage {
         case LONG:
         case FLOAT:
         case DOUBLE:
-          instance = new UnsafeFixLengthColumnPage(dataType, pageSize);
+          instance = new UnsafeFixLengthColumnPage(dataType, pageSize, scale, 
precision);
           break;
         case DECIMAL:
         case BYTE_ARRAY:
-          instance = new UnsafeVarLengthColumnPage(dataType, pageSize);
+          instance = new UnsafeVarLengthColumnPage(dataType, pageSize, scale, 
precision);
           break;
         default:
           throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -164,7 +181,7 @@ public abstract class ColumnPage {
           instance = newDoublePage(new double[pageSize]);
           break;
         case DECIMAL:
-          instance = newDecimalPage(new byte[pageSize][]);
+          instance = newDecimalPage(new byte[pageSize][], scale, precision);
           break;
         default:
           throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -174,55 +191,61 @@ public abstract class ColumnPage {
   }
 
   private static ColumnPage newBytePage(byte[] byteData) {
-    ColumnPage columnPage = createPage(BYTE, byteData.length);
+    ColumnPage columnPage = createPage(BYTE, byteData.length,  -1, -1);
     columnPage.setBytePage(byteData);
     return columnPage;
   }
 
   private static ColumnPage newShortPage(short[] shortData) {
-    ColumnPage columnPage = createPage(SHORT, shortData.length);
+    ColumnPage columnPage = createPage(SHORT, shortData.length,  -1, -1);
     columnPage.setShortPage(shortData);
     return columnPage;
   }
 
   private static ColumnPage newShortIntPage(byte[] shortIntData) {
-    ColumnPage columnPage = createPage(SHORT_INT, shortIntData.length / 3);
+    ColumnPage columnPage = createPage(SHORT_INT, shortIntData.length / 3,  
-1, -1);
     columnPage.setShortIntPage(shortIntData);
     return columnPage;
   }
 
   private static ColumnPage newIntPage(int[] intData) {
-    ColumnPage columnPage = createPage(INT, intData.length);
+    ColumnPage columnPage = createPage(INT, intData.length,  -1, -1);
     columnPage.setIntPage(intData);
     return columnPage;
   }
 
   private static ColumnPage newLongPage(long[] longData) {
-    ColumnPage columnPage = createPage(LONG, longData.length);
+    ColumnPage columnPage = createPage(LONG, longData.length,  -1, -1);
     columnPage.setLongPage(longData);
     return columnPage;
   }
 
   private static ColumnPage newFloatPage(float[] floatData) {
-    ColumnPage columnPage = createPage(FLOAT, floatData.length);
+    ColumnPage columnPage = createPage(FLOAT, floatData.length,  -1, -1);
     columnPage.setFloatPage(floatData);
     return columnPage;
   }
 
   private static ColumnPage newDoublePage(double[] doubleData) {
-    ColumnPage columnPage = createPage(DOUBLE, doubleData.length);
+    ColumnPage columnPage = createPage(DOUBLE, doubleData.length, -1, -1);
     columnPage.setDoublePage(doubleData);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(byte[][] byteArray) {
-    ColumnPage columnPage = createPage(DECIMAL, byteArray.length);
+  private static ColumnPage newDecimalPage(byte[][] byteArray, int scale, int 
precision) {
+    ColumnPage columnPage = createPage(DECIMAL, byteArray.length, scale, 
precision);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray) throws 
MemoryException {
-    return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray);
+  private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray, int 
scale, int precision)
+      throws MemoryException {
+    return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray, 
scale, precision);
+  }
+
+  private static ColumnPage newVarLengthPage(byte[] lvEncodedByteArray, int 
scale, int precision)
+      throws MemoryException {
+    return VarLengthColumnPageBase.newVarLengthColumnPage(lvEncodedByteArray, 
scale, precision);
   }
 
   /**
@@ -297,6 +320,8 @@ public abstract class ColumnPage {
         putDouble(rowId, (double) value);
         break;
       case DECIMAL:
+        putDecimal(rowId, (BigDecimal) value);
+        break;
       case BYTE_ARRAY:
         putBytes(rowId, (byte[]) value);
         break;
@@ -337,6 +362,11 @@ public abstract class ColumnPage {
   public abstract void putBytes(int rowId, byte[] bytes);
 
   /**
+   * Set byte array value at rowId
+   */
+  public abstract void putDecimal(int rowId, BigDecimal decimal);
+
+  /**
    * Type cast int value to 3 bytes value and set at rowId
    */
   public abstract void putShortInt(int rowId, int value);
@@ -346,7 +376,6 @@ public abstract class ColumnPage {
    */
   public abstract void putBytes(int rowId, byte[] bytes, int offset, int 
length);
 
-  private static final byte[] ZERO = 
DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
 
   /**
    * Set null at rowId
@@ -370,7 +399,7 @@ public abstract class ColumnPage {
         putDouble(rowId, 0.0);
         break;
       case DECIMAL:
-        putBytes(rowId, ZERO);
+        putDecimal(rowId, BigDecimal.ZERO);
         break;
     }
   }
@@ -468,6 +497,11 @@ public abstract class ColumnPage {
   public abstract byte[] getFlattenedBytePage();
 
   /**
+   * For decimals
+   */
+  public abstract byte[] getDecimalPage();
+
+  /**
    * Encode the page data by codec (Visitor)
    */
   public abstract void encode(PrimitiveCodec codec);
@@ -492,7 +526,7 @@ public abstract class ColumnPage {
       case DOUBLE:
         return compressor.compressDouble(getDoublePage());
       case DECIMAL:
-        return compressor.compressByte(getFlattenedBytePage());
+        return compressor.compressByte(getDecimalPage());
       case BYTE_ARRAY:
         return compressor.compressByte(getFlattenedBytePage());
       default:
@@ -504,7 +538,8 @@ public abstract class ColumnPage {
    * Decompress data and create a column page using the decompressed data
    */
   public static ColumnPage decompress(Compressor compressor, DataType dataType,
-      byte[] compressedData, int offset, int length) throws MemoryException {
+      byte[] compressedData, int offset, int length, int scale, int precision)
+      throws MemoryException {
     switch (dataType) {
       case BYTE:
         byte[] byteData = compressor.unCompressByte(compressedData, offset, 
length);
@@ -528,9 +563,11 @@ public abstract class ColumnPage {
         double[] doubleData = compressor.unCompressDouble(compressedData, 
offset, length);
         return newDoublePage(doubleData);
       case DECIMAL:
-      case BYTE_ARRAY:
         byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, 
offset, length);
-        return newDecimalPage(lvEncodedBytes);
+        return newDecimalPage(lvEncodedBytes, scale, precision);
+      case BYTE_ARRAY:
+        byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, 
length);
+        return newVarLengthPage(lvVarBytes, scale, precision);
       default:
         throw new UnsupportedOperationException("unsupport uncompress column 
page: " + dataType);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 6ec2e07..b0978d3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -32,7 +32,8 @@ public class LazyColumnPage extends ColumnPage {
   private PrimitiveCodec codec;
 
   private LazyColumnPage(ColumnPage columnPage, PrimitiveCodec codec) {
-    super(columnPage.getDataType(), columnPage.getPageSize());
+    super(columnPage.getDataType(), columnPage.getPageSize(), columnPage.scale,
+        columnPage.precision);
     this.columnPage = columnPage;
     this.codec = codec;
   }
@@ -137,6 +138,16 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public byte[] getDecimalPage() {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
   public byte[] getFlattenedBytePage() {
     throw new UnsupportedOperationException("internal error");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index cfb1798..9bd85e6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -36,8 +36,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   private double[] doubleData;
   private byte[] shortIntData;
 
-  SafeFixLengthColumnPage(DataType dataType, int pageSize) {
-    super(dataType, pageSize);
+  SafeFixLengthColumnPage(DataType dataType, int pageSize, int scale, int 
precision) {
+    super(dataType, pageSize, scale, precision);
   }
 
   /**
@@ -99,6 +99,16 @@ public class SafeFixLengthColumnPage extends ColumnPage {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getDecimalPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
   /**
    * Get byte value at rowId
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 3a76f55..63291f3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -20,15 +20,14 @@ package org.apache.carbondata.core.datastore.page;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   // for string and decimal data
   private byte[][] byteArrayData;
 
-  SafeVarLengthColumnPage(DataType dataType, int pageSize) {
-    super(dataType, pageSize);
+  SafeVarLengthColumnPage(DataType dataType, int pageSize, int scale, int 
precision) {
+    super(dataType, pageSize, scale, precision);
     byteArrayData = new byte[pageSize][];
   }
 
@@ -47,10 +46,14 @@ public class SafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
     System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
   }
 
+  @Override public void putDecimal(int rowId, BigDecimal decimal) {
+    putBytes(rowId, decimalConverter.convert(decimal));
+  }
+
   @Override
   public BigDecimal getDecimal(int rowId) {
     byte[] bytes = byteArrayData[rowId];
-    return DataTypeUtil.byteToBigDecimal(bytes);
+    return decimalConverter.getDecimal(bytes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 9f71768..2382599 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -49,8 +49,9 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   private static final int floatBits = DataType.FLOAT.getSizeBits();
   private static final int doubleBits = DataType.DOUBLE.getSizeBits();
 
-  UnsafeFixLengthColumnPage(DataType dataType, int pageSize) throws 
MemoryException {
-    super(dataType, pageSize);
+  UnsafeFixLengthColumnPage(DataType dataType, int pageSize, int scale, int 
precision)
+      throws MemoryException {
+    super(dataType, pageSize, scale, precision);
     switch (dataType) {
       case BYTE:
       case SHORT:
@@ -124,6 +125,10 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override public void putDecimal(int rowId, BigDecimal decimal) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
   @Override
   public byte getByte(int rowId) {
     long offset = rowId << byteBits;
@@ -175,6 +180,10 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override public byte[] getDecimalPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
   @Override
   public byte[] getBytePage() {
     byte[] data = new byte[getPageSize()];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index dd6abc5..16cf94c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
 // This extension uses unsafe memory to store page data, for variable length 
data type (string,
 // decimal)
@@ -52,10 +51,11 @@ public class UnsafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
    * @param dataType data type
    * @param pageSize number of row
    */
-  UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws 
MemoryException {
-    super(dataType, pageSize);
+  UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int scale, int 
precision)
+      throws MemoryException {
+    super(dataType, pageSize, scale, precision);
     capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
-    memoryBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long) 
(capacity));
     baseAddress = memoryBlock.getBaseObject();
     baseOffset = memoryBlock.getBaseOffset();
   }
@@ -66,8 +66,9 @@ public class UnsafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
    * @param pageSize number of row
    * @param capacity initial capacity of the page, in bytes
    */
-  UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) 
throws MemoryException {
-    super(dataType, pageSize);
+  UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity,
+      int scale, int precision) throws MemoryException {
+    super(dataType, pageSize, scale, precision);
     this.capacity = capacity;
     memoryBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
     baseAddress = memoryBlock.getBaseObject();
@@ -117,6 +118,10 @@ public class UnsafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
         baseAddress, baseOffset + rowOffset[rowId], length);
   }
 
+  @Override public void putDecimal(int rowId, BigDecimal decimal) {
+    putBytes(rowId, decimalConverter.convert(decimal));
+  }
+
   @Override
   public BigDecimal getDecimal(int rowId) {
     int length = rowOffset[rowId + 1] - rowOffset[rowId];
@@ -124,7 +129,7 @@ public class UnsafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
     CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
         bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
 
-    return DataTypeUtil.byteToBigDecimal(bytes);
+    return decimalConverter.getDecimal(bytes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 801cfb3..46d6787 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.ByteUtil;
 
 import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
@@ -34,8 +35,8 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
   // the length of bytes added in the page
   int totalLength;
 
-  VarLengthColumnPageBase(DataType dataType, int pageSize) {
-    super(dataType, pageSize);
+  VarLengthColumnPageBase(DataType dataType, int pageSize, int scale, int 
precision) {
+    super(dataType, pageSize, scale, precision);
     rowOffset = new int[pageSize + 1];
     totalLength = 0;
   }
@@ -83,7 +84,59 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
   /**
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
-  static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes) throws 
MemoryException {
+  static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes, int scale, int 
precision)
+      throws MemoryException {
+    DecimalConverterFactory.DecimalConverter decimalConverter =
+        DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
+    int size = decimalConverter.getSize();
+    if (size < 0) {
+      return getLVBytesColumnPage(lvEncodedBytes, scale, precision, 
DataType.DECIMAL);
+    } else {
+      // Here the size is always fixed.
+      return getDecimalColumnPage(lvEncodedBytes, scale, precision, size);
+    }
+  }
+
+  /**
+   * Create a new column page based on the LV (Length Value) encoded bytes
+   */
+  static ColumnPage newVarLengthColumnPage(byte[] lvEncodedBytes, int scale, 
int precision)
+      throws MemoryException {
+    return getLVBytesColumnPage(lvEncodedBytes, scale, precision, 
DataType.BYTE_ARRAY);
+  }
+
+  private static ColumnPage getDecimalColumnPage(byte[] lvEncodedBytes, int 
scale, int precision,
+      int size) throws MemoryException {
+    List<Integer> rowOffset = new ArrayList<>();
+    int offset;
+    int rowId = 0;
+    for (offset = 0; offset < lvEncodedBytes.length; offset += size) {
+      rowOffset.add(offset);
+      rowId++;
+    }
+    rowOffset.add(offset);
+
+    VarLengthColumnPageBase page;
+    if (unsafe) {
+      page = new UnsafeVarLengthColumnPage(DECIMAL, rowId, scale, precision);
+    } else {
+      page = new SafeVarLengthColumnPage(DECIMAL, rowId, scale, precision);
+    }
+
+    // set total length and rowOffset in page
+    page.totalLength = offset;
+    page.rowOffset = new int[rowId + 1];
+    for (int i = 0; i < rowId + 1; i++) {
+      page.rowOffset[i] = rowOffset.get(i);
+    }
+    for (int i = 0; i < rowId; i++) {
+      page.putBytes(i, lvEncodedBytes, i * size, size);
+    }
+    return page;
+  }
+
+  private static ColumnPage getLVBytesColumnPage(byte[] lvEncodedBytes, int 
scale,
+      int precision, DataType dataType) throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory 
correspondingly
     int rowId = 0;
     List<Integer> rowOffset = new ArrayList<>();
@@ -107,9 +160,9 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength);
+      page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength, 
scale, precision);
     } else {
-      page = new SafeVarLengthColumnPage(DECIMAL, numRows);
+      page = new SafeVarLengthColumnPage(dataType, numRows, scale, precision);
     }
 
     // set total length and rowOffset in page
@@ -242,6 +295,19 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override
+  public byte[] getDecimalPage() {
+    // output LV encoded byte array
+    int offset = 0;
+    byte[] data = new byte[totalLength];
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      copyBytes(rowId, data, offset, length);
+      offset += length;
+    }
+    return data;
+  }
+
   /**
    * Copy `length` bytes from data at rowId to dest start from destOffset
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
index 3d56f0c..fe15ba7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -52,7 +52,8 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
 
   @Override
   public byte[] encode(ColumnPage input) throws MemoryException, IOException {
-    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    encodedPage = ColumnPage
+        .newPage(targetDataType, input.getPageSize(), stats.getScale(), 
stats.getPrecision());
     input.encode(codec);
     byte[] result = encodedPage.compress(compressor);
     encodedPage.freeMemory();
@@ -62,9 +63,13 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
   @Override
   public ColumnPage decode(byte[] input, int offset, int length) throws 
MemoryException {
     if (srcDataType.equals(targetDataType)) {
-      return ColumnPage.decompress(compressor, targetDataType, input, offset, 
length);
+      return ColumnPage
+          .decompress(compressor, targetDataType, input, offset, length, 
stats.getScale(),
+              stats.getPrecision());
     } else {
-      ColumnPage page = ColumnPage.decompress(compressor, targetDataType, 
input, offset, length);
+      ColumnPage page = ColumnPage
+          .decompress(compressor, targetDataType, input, offset, length, 
stats.getScale(),
+              stats.getPrecision());
       return LazyColumnPage.newPage(page, codec);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
index 3818263..659dc2a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -79,7 +79,7 @@ public class DefaultEncodingStrategy extends EncodingStrategy 
{
     if (Math.min(adaptiveDataType.getSizeInBytes(), 
deltaDataType.getSizeInBytes()) ==
         srcDataType.getSizeInBytes()) {
       // no effect to use adaptive or delta, use compression only
-      return DirectCompressCodec.newInstance(srcDataType, compressor);
+      return DirectCompressCodec.newInstance(stats, compressor);
     }
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
@@ -93,17 +93,17 @@ public class DefaultEncodingStrategy extends 
EncodingStrategy {
 
   @Override
   ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) {
-    return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
+    return DirectCompressCodec.newInstance(stats, compressor);
   }
 
   // for decimal, currently it is a very basic implementation
   @Override
   ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) {
-    return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
+    return DirectCompressCodec.newInstance(stats, compressor);
   }
 
   @Override
   ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) {
-    return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
+    return DirectCompressCodec.newInstance(stats, compressor);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
index 6cf59a6..a45552a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -67,7 +67,8 @@ public class DeltaIntegerCodec extends 
AdaptiveCompressionCodec {
 
   @Override
   public byte[] encode(ColumnPage input) throws MemoryException, IOException {
-    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    encodedPage = ColumnPage
+        .newPage(targetDataType, input.getPageSize(), stats.getScale(), 
stats.getPrecision());
     input.encode(codec);
     byte[] result = encodedPage.compress(compressor);
     encodedPage.freeMemory();
@@ -77,9 +78,13 @@ public class DeltaIntegerCodec extends 
AdaptiveCompressionCodec {
   @Override
   public ColumnPage decode(byte[] input, int offset, int length) throws 
MemoryException {
     if (srcDataType.equals(targetDataType)) {
-      return ColumnPage.decompress(compressor, targetDataType, input, offset, 
length);
+      return ColumnPage
+          .decompress(compressor, targetDataType, input, offset, length, 
stats.getScale(),
+              stats.getPrecision());
     } else {
-      ColumnPage page = ColumnPage.decompress(compressor, targetDataType, 
input, offset, length);
+      ColumnPage page = ColumnPage
+          .decompress(compressor, targetDataType, input, offset, length, 
stats.getScale(),
+              stats.getPrecision());
       return LazyColumnPage.newPage(page, codec);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
index dcb9b7c..d608fea 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * This codec directly apply compression on the input data
@@ -30,15 +30,15 @@ import 
org.apache.carbondata.core.metadata.datatype.DataType;
 public class DirectCompressCodec implements ColumnPageCodec {
 
   private Compressor compressor;
-  private DataType dataType;
+  private ColumnPageStatsVO stats;
 
-  private DirectCompressCodec(DataType dataType, Compressor compressor) {
+  private DirectCompressCodec(ColumnPageStatsVO stats, Compressor compressor) {
     this.compressor = compressor;
-    this.dataType = dataType;
+    this.stats = stats;
   }
 
-  public static DirectCompressCodec newInstance(DataType dataType, Compressor 
compressor) {
-    return new DirectCompressCodec(dataType, compressor);
+  public static DirectCompressCodec newInstance(ColumnPageStatsVO stats, 
Compressor compressor) {
+    return new DirectCompressCodec(stats, compressor);
   }
 
   @Override
@@ -53,6 +53,8 @@ public class DirectCompressCodec implements ColumnPageCodec {
 
   @Override
   public ColumnPage decode(byte[] input, int offset, int length) throws 
MemoryException {
-    return ColumnPage.decompress(compressor, dataType, input, offset, length);
+    return ColumnPage
+        .decompress(compressor, stats.getDataType(), input, offset, length, 
stats.getScale(),
+            stats.getPrecision());
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
index 49fb625..77d3b74 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -51,8 +51,8 @@ public abstract class EncodingStrategy {
   /**
    * create codec based on the page data type and statistics contained by 
ValueEncoderMeta
    */
-  public ColumnPageCodec createCodec(ValueEncoderMeta meta) {
-    ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta);
+  public ColumnPageCodec createCodec(ValueEncoderMeta meta, int scale, int 
precision) {
+    ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta, scale, 
precision);
     return createCodec(stats);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
index 058699a..3629101 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
@@ -40,6 +40,10 @@ public class ColumnPageStatsVO {
   /** decimal count of the measures */
   private int decimal;
 
+  private int scale;
+
+  private int precision;
+
   public ColumnPageStatsVO(DataType dataType) {
     this.dataType = dataType;
     switch (dataType) {
@@ -64,12 +68,14 @@ public class ColumnPageStatsVO {
     decimal = 0;
   }
 
-  public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta) {
+  public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta, int scale, 
int precision) {
     ColumnPageStatsVO instance = new ColumnPageStatsVO(meta.getType());
     instance.min = meta.getMinValue();
     instance.max = meta.getMaxValue();
     instance.decimal = meta.getDecimal();
     instance.nonExistValue = meta.getUniqueValue();
+    instance.scale = scale;
+    instance.precision = precision;
     return instance;
   }
 
@@ -101,7 +107,7 @@ public class ColumnPageStatsVO {
         nonExistValue = (double) min - 1;
         break;
       case DECIMAL:
-        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) 
value);
+        BigDecimal decimalValue = (BigDecimal) value;
         decimal = decimalValue.scale();
         BigDecimal val = (BigDecimal) min;
         nonExistValue = (val.subtract(new BigDecimal(1.0)));
@@ -215,6 +221,14 @@ public class ColumnPageStatsVO {
     return dataType;
   }
 
+  public int getScale() {
+    return scale;
+  }
+
+  public int getPrecision() {
+    return precision;
+  }
+
   @Override
   public String toString() {
     return String.format("min: %s, max: %s, decimal: %s ", min, max, decimal);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
new file mode 100644
index 0000000..555df1c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * Decimal converter to keep the data compact.
+ */
+public final class DecimalConverterFactory {
+
+  public static DecimalConverterFactory INSTANCE = new 
DecimalConverterFactory();
+
+  private int[] minBytesForPrecision = minBytesForPrecision();
+
+  private DecimalConverterFactory() {
+
+  }
+
+  private int computeMinBytesForPrecision(int precision) {
+    int numBytes = 1;
+    while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
+      numBytes += 1;
+    }
+    return numBytes;
+  }
+
+  private int[] minBytesForPrecision() {
+    int[] data = new int[39];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = computeMinBytesForPrecision(i);
+    }
+    return data;
+  }
+
+  public interface DecimalConverter {
+
+    byte[] convert(BigDecimal decimal);
+
+    BigDecimal getDecimal(byte[] bytes);
+
+    void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int 
rowId);
+
+    int getSize();
+
+  }
+
+  public class DecimalIntConverter implements DecimalConverter {
+
+    private ByteBuffer buffer = ByteBuffer.allocate(4);
+
+    private int precision;
+    private int scale;
+
+    public DecimalIntConverter(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override public byte[] convert(BigDecimal decimal) {
+      long longValue = decimal.unscaledValue().longValue();
+      buffer.putInt(0, (int) longValue);
+      return buffer.array().clone();
+    }
+
+    @Override public BigDecimal getDecimal(byte[] bytes) {
+      long unscaled = getUnscaledLong(bytes);
+      return BigDecimal.valueOf(unscaled, scale);
+    }
+
+    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
+      long unscaled = getUnscaledLong(bytes);
+      vector.putInt(rowId, (int) unscaled);
+    }
+
+    @Override public int getSize() {
+      return 4;
+    }
+  }
+
+  private long getUnscaledLong(byte[] bytes) {
+    long unscaled = 0L;
+    int i = 0;
+
+    while (i < bytes.length) {
+      unscaled = (unscaled << 8) | (bytes[i] & 0xff);
+      i += 1;
+    }
+
+    int bits = 8 * bytes.length;
+    unscaled = (unscaled << (64 - bits)) >> (64 - bits);
+    return unscaled;
+  }
+
+  public class DecimalLongConverter implements DecimalConverter {
+
+    private ByteBuffer buffer = ByteBuffer.allocate(8);
+
+    private int precision;
+    private int scale;
+
+    public DecimalLongConverter(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override public byte[] convert(BigDecimal decimal) {
+      long longValue = decimal.unscaledValue().longValue();
+      buffer.putLong(0, longValue);
+      return buffer.array().clone();
+    }
+
+    @Override public BigDecimal getDecimal(byte[] bytes) {
+      long unscaled = getUnscaledLong(bytes);
+      return BigDecimal.valueOf(unscaled, scale);
+    }
+
+    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
+      long unscaled = getUnscaledLong(bytes);
+      vector.putLong(rowId, unscaled);
+    }
+
+    @Override public int getSize() {
+      return 8;
+    }
+  }
+
+  public class DecimalUnscaledConverter implements DecimalConverter {
+
+    private int precision;
+
+    private int scale;
+
+    private int numBytes;
+
+    private byte[] decimalBuffer = new byte[minBytesForPrecision[38]];
+
+    public DecimalUnscaledConverter(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+      this.numBytes = minBytesForPrecision[precision];
+    }
+
+    @Override public byte[] convert(BigDecimal decimal) {
+      byte[] bytes = decimal.unscaledValue().toByteArray();
+      byte[] fixedLengthBytes = null;
+      if (bytes.length == numBytes) {
+        // If the length of the underlying byte array of the unscaled 
`BigInteger` happens to be
+        // `numBytes`, just reuse it, so that we don't bother copying it to 
`decimalBuffer`.
+        fixedLengthBytes = bytes;
+      } else {
+        // Otherwise, the length must be less than `numBytes`.  In this case 
we copy contents of
+        // the underlying bytes with padding sign bytes to `decimalBuffer` to 
form the result
+        // fixed-length byte array.
+        byte signByte = 0;
+        if (bytes[0] < 0) {
+          signByte = (byte) -1;
+        } else {
+          signByte = (byte) 0;
+        }
+        Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte);
+        System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length);
+        fixedLengthBytes = decimalBuffer;
+      }
+      byte[] value = new byte[numBytes];
+      System.arraycopy(fixedLengthBytes, 0, value, 0, numBytes);
+      return value;
+    }
+
+    @Override public BigDecimal getDecimal(byte[] bytes) {
+      BigInteger bigInteger = new BigInteger(bytes);
+      return new BigDecimal(bigInteger, scale);
+    }
+
+    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
+      vector.putBytes(rowId, bytes);
+    }
+
+    @Override public int getSize() {
+      return numBytes;
+    }
+  }
+
+  public static class LVBytesDecimalConverter implements DecimalConverter {
+
+    public static LVBytesDecimalConverter INSTANCE = new 
LVBytesDecimalConverter();
+
+    @Override public byte[] convert(BigDecimal decimal) {
+      return DataTypeUtil.bigDecimalToByte(decimal);
+    }
+
+    @Override public BigDecimal getDecimal(byte[] bytes) {
+      return DataTypeUtil.byteToBigDecimal(bytes);
+    }
+
+    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
+      throw new UnsupportedOperationException("Unsupported in vector reading 
for legacy format");
+    }
+
+    @Override public int getSize() {
+      return -1;
+    }
+  }
+
+  public DecimalConverter getDecimalConverter(int precision, int scale) {
+    if (precision < 0) {
+      return new LVBytesDecimalConverter();
+    } else if (precision <= 9) {
+      return new DecimalIntConverter(precision, scale);
+    } else if (precision <= 18) {
+      return new DecimalLongConverter(precision, scale);
+    } else {
+      return new DecimalUnscaledConverter(precision, scale);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index e89ce12..0a27a74 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -431,6 +432,29 @@ public class CarbonMetadataUtil {
     return false;
   }
 
+  private static ByteBuffer writeInfoIfDecimal(int blockIndex,
+      SegmentProperties segmentProperties) {
+    Map<Integer, Integer> blockMapping = 
segmentProperties.getMeasuresOrdinalToBlockMapping();
+    List<CarbonMeasure> measures = segmentProperties.getMeasures();
+    CarbonMeasure selectedMeasure = null;
+    for (CarbonMeasure measure : measures) {
+      Integer blockId = blockMapping.get(measure.getOrdinal());
+      selectedMeasure = measure;
+      if (blockId == blockIndex) {
+        break;
+      }
+    }
+    assert (selectedMeasure != null);
+    if (selectedMeasure.getDataType() == DataType.DECIMAL) {
+      ByteBuffer buffer = ByteBuffer.allocate(8);
+      buffer.putInt(selectedMeasure.getScale());
+      buffer.putInt(selectedMeasure.getPrecision());
+      buffer.flip();
+      return buffer;
+    }
+    return null;
+  }
+
   private static byte[] serializeEncoderMeta(ValueEncoderMeta encoderMeta) 
throws IOException {
     // TODO : should remove the unnecessary fields.
     ByteArrayOutputStream aos = new ByteArrayOutputStream();
@@ -788,6 +812,10 @@ public class CarbonMetadataUtil {
         List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
         encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
             createValueEncoderMeta(nodeHolder.getStats(), index))));
+        ByteBuffer decimalMeta = writeInfoIfDecimal(index, segmentProperties);
+        if (decimalMeta != null) {
+          encoderMetaList.add(decimalMeta);
+        }
         dataChunk.setEncoder_meta(encoderMetaList);
         dataChunk.min_max
             
.addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
index 5b0685b..50fb4c5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -17,10 +17,6 @@
 
 package org.apache.carbondata.processing.newflow.sort;
 
-import java.math.BigDecimal;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
@@ -60,21 +56,10 @@ public class SortStepRowUtil {
 
       index = 0;
 
-      DataType[] measureDataType = parameters.getMeasureDataType();
       // read measure values
       for (int i = 0; i < measureCount; i++) {
         if (needConvertDecimalToByte) {
-          Object value = data[allCount];
-          if (null != value) {
-            if (measureDataType[i] == DataType.DECIMAL) {
-              BigDecimal decimal = (BigDecimal) value;
-              measures[index++] = DataTypeUtil.bigDecimalToByte(decimal);
-            } else {
-              measures[index++] = value;
-            }
-          } else {
-            measures[index++] = null;
-          }
+          measures[index++] = data[allCount];
         } else {
           measures[index++] = data[allCount];
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 2ac138b..55a8693 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -230,7 +230,7 @@ public class UnsafeCarbonRowPage {
             CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, 
bigDecimalInBytes,
                 CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
             size += bigDecimalInBytes.length;
-            rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+            rowToFill[dimensionSize + mesCount] = 
DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
             break;
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index d69d137..7fb9b6e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -35,6 +35,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import 
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import 
org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
@@ -341,7 +342,7 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
               short aShort = stream.readShort();
               byte[] bigDecimalInBytes = new byte[aShort];
               stream.readFully(bigDecimalInBytes);
-              row[dimensionCount + mesCount] = bigDecimalInBytes;
+              row[dimensionCount + mesCount] = 
DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
               break;
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
index a83e09e..71e5727 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.steps;
 
 import java.io.File;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -33,7 +32,6 @@ import 
org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -257,17 +255,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
 
     Object[] measures = new Object[outputLength];
     for (int i = 0; i < this.measureCount; i++) {
-      Object value = row.getObject(i + this.dimensionWithComplexCount);
-      if (null != value) {
-        if (measureDataType[i] == DataType.DECIMAL) {
-          BigDecimal val = (BigDecimal) value;
-          measures[i] = DataTypeUtil.bigDecimalToByte(val);
-        } else {
-          measures[i] = value;
-        }
-      } else {
-        measures[i] = null;
-      }
+      measures[i] = row.getObject(i + this.dimensionWithComplexCount);
     }
 
     return WriteStepRowUtil.fromColumnCategory(dim, nonDicArray, measures);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 0f7ae1a..ce7b321 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
@@ -356,7 +357,7 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
               int len = stream.readInt();
               byte[] buff = new byte[len];
               stream.readFully(buff);
-              measures[index++] = buff;
+              measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
               break;
           }
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index f068400..7930763 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -20,12 +20,12 @@ package org.apache.carbondata.processing.store;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.TableSpec;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
@@ -35,7 +35,6 @@ import 
org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
 import org.apache.spark.sql.types.Decimal;
 
@@ -69,11 +68,11 @@ public class TablePage {
     int numDictDimension = model.getMDKeyGenerator().getDimCount();
     dictDimensionPage = new ColumnPage[numDictDimension];
     for (int i = 0; i < dictDimensionPage.length; i++) {
-      dictDimensionPage[i] = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, 
pageSize);
+      dictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, 
pageSize);
     }
     noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPage.length; i++) {
-      noDictDimensionPage[i] = 
ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize);
+      noDictDimensionPage[i] = 
ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
     }
     complexDimensionPage = new 
ComplexColumnPage[model.getComplexColumnCount()];
     for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -83,8 +82,10 @@ public class TablePage {
     }
     measurePage = new ColumnPage[model.getMeasureCount()];
     DataType[] dataTypes = model.getMeasureDataType();
+    TableSpec.MeasureSpec measureSpec = model.getTableSpec().getMeasureSpec();
     for (int i = 0; i < measurePage.length; i++) {
-      measurePage[i] = ColumnPage.newPage(dataTypes[i], pageSize);
+      measurePage[i] = ColumnPage
+          .newPage(dataTypes[i], pageSize, measureSpec.getScale(i), 
measureSpec.getPrecision(i));
     }
   }
 
@@ -132,8 +133,7 @@ public class TablePage {
       if (measurePage[i].getDataType() == DataType.DECIMAL &&
           model.isCompactionFlow() &&
           value != null) {
-        BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal();
-        value = DataTypeUtil.bigDecimalToByte(bigDecimal);
+        value = ((Decimal) value).toJavaBigDecimal();
       }
       measurePage[i].putData(rowId, value);
     }

Reply via email to