Repository: carbondata
Updated Branches:
  refs/heads/master 302ef2f56 -> 6f204376f


[CARBONDATA-1429] Add a value based compression for decimal data type when 
decimal is stored as Int or Long

Added a value based compression for decimal data type when decimal is stored as 
Int or Long

When decimal precision is <= 9, decimal values are stored in 4 bytes but they 
are not compressed further based on min and max values as compared with other 
primitive data type compression. Therefore now based on min and max value 
decimal data falling in Integer range will be further compressed as byte or 
short.
When decimal precision is <= 18, decimal values are stored in 8 bytes but they 
are not compressed further based on min and max values as compared with other 
primitive data type compression. Therefore now based on min and max value 
decimal data falling in Long range will be further compressed as byte, short or 
int.
Advantage: This will reduce the storage space thereby decreasing the IO time 
while decompressing the data.

This closes #1297


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f204376
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f204376
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f204376

Branch: refs/heads/master
Commit: 6f204376f880231c8f537052fe1b29008178aad8
Parents: 302ef2f
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Thu Aug 24 12:43:58 2017 +0530
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Fri Sep 15 17:22:58 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/page/ColumnPage.java         |  64 +++-
 .../core/datastore/page/DecimalColumnPage.java  | 109 +++++++
 .../core/datastore/page/LazyColumnPage.java     |  20 +-
 .../datastore/page/SafeDecimalColumnPage.java   | 227 ++++++++++++++
 .../datastore/page/SafeFixLengthColumnPage.java |   3 +-
 .../datastore/page/SafeVarLengthColumnPage.java |   5 +-
 .../datastore/page/UnsafeDecimalColumnPage.java | 296 +++++++++++++++++++
 .../page/UnsafeVarLengthColumnPage.java         |  52 +---
 .../datastore/page/VarLengthColumnPageBase.java |  52 +++-
 .../page/encoding/ColumnPageEncoderMeta.java    |  16 +-
 .../page/encoding/DefaultEncodingFactory.java   |  93 +++++-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |  21 +-
 .../adaptive/AdaptiveIntegralCodec.java         |  12 +-
 .../datatype/DecimalConverterFactory.java       |  74 +++--
 14 files changed, 931 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 0be409e..6c534d6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -22,7 +22,6 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
@@ -31,7 +30,6 @@ import 
org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsColle
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
@@ -61,8 +59,6 @@ public abstract class ColumnPage {
   // statistics collector for this column page
   private ColumnPageStatsCollector statsCollector;
 
-  DecimalConverterFactory.DecimalConverter decimalConverter;
-
   protected static final boolean unsafe = 
Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
           CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT));
@@ -75,12 +71,6 @@ public abstract class ColumnPage {
     this.dataType = dataType;
     this.pageSize = pageSize;
     this.nullBitSet = new BitSet(pageSize);
-    if (dataType == DECIMAL) {
-      assert (columnSpec.getColumnType() == ColumnType.MEASURE);
-      int precision = columnSpec.getPrecision();
-      int scale = columnSpec.getScale();
-      decimalConverter = 
DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
-    }
   }
 
   public DataType getDataType() {
@@ -130,6 +120,19 @@ public abstract class ColumnPage {
     this.statsCollector = statsCollector;
   }
 
+  private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, 
DataType dataType,
+      int pageSize) {
+    if (unsafe) {
+      try {
+        return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return new SafeDecimalColumnPage(columnSpec, dataType, pageSize);
+    }
+  }
+
   private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec 
columnSpec, DataType dataType,
       int pageSize) {
     if (unsafe) {
@@ -158,7 +161,9 @@ public abstract class ColumnPage {
 
   private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, 
DataType dataType,
       int pageSize) {
-    if (dataType.equals(BYTE_ARRAY) || dataType.equals(DECIMAL)) {
+    if (dataType.equals(DECIMAL)) {
+      return createDecimalPage(columnSpec, dataType, pageSize);
+    } else if (dataType.equals(BYTE_ARRAY)) {
       return createVarLengthPage(columnSpec, dataType, pageSize);
     } else {
       return createFixLengthPage(columnSpec, dataType, pageSize);
@@ -189,6 +194,8 @@ public abstract class ColumnPage {
           instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, 
pageSize);
           break;
         case DECIMAL:
+          instance = new UnsafeDecimalColumnPage(columnSpec, dataType, 
pageSize);
+          break;
         case STRING:
         case BYTE_ARRAY:
           instance =
@@ -631,14 +638,43 @@ public abstract class ColumnPage {
   }
 
   /**
-   * Decompress decimal data and create a column page
+   * Decompress data and create a decimal column page using the decompressed 
data
    */
   public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, 
byte[] compressedData,
       int offset, int length) throws MemoryException {
     Compressor compressor = 
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
-    byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, 
length);
-    return newDecimalPage(columnSpec, lvEncodedBytes);
+    ColumnPage decimalPage = null;
+    switch (meta.getStoreDataType()) {
+      case BYTE:
+        byte[] byteData = compressor.unCompressByte(compressedData, offset, 
length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), 
byteData.length);
+        decimalPage.setBytePage(byteData);
+        return decimalPage;
+      case SHORT:
+        short[] shortData = compressor.unCompressShort(compressedData, offset, 
length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), 
shortData.length);
+        decimalPage.setShortPage(shortData);
+        return decimalPage;
+      case SHORT_INT:
+        byte[] shortIntData = compressor.unCompressByte(compressedData, 
offset, length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), 
shortIntData.length);
+        decimalPage.setShortIntPage(shortIntData);
+        return decimalPage;
+      case INT:
+        int[] intData = compressor.unCompressInt(compressedData, offset, 
length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), 
intData.length);
+        decimalPage.setIntPage(intData);
+        return decimalPage;
+      case LONG:
+        long[] longData = compressor.unCompressLong(compressedData, offset, 
length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), 
longData.length);
+        decimalPage.setLongPage(longData);
+        return decimalPage;
+      default:
+        byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, 
offset, length);
+        return newDecimalPage(columnSpec, lvEncodedBytes);
+    }
   }
 
   public BitSet getNullBits() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
new file mode 100644
index 0000000..2624223
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+
+/**
+ * Represent a columnar data in one page for one column of decimal data type
+ */
+public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
+
+  /**
+   * decimal converter instance
+   */
+  DecimalConverterFactory.DecimalConverter decimalConverter;
+
+  DecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int 
pageSize) {
+    super(columnSpec, dataType, pageSize);
+    decimalConverter = DecimalConverterFactory.INSTANCE
+        .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
+  }
+
+  public DecimalConverterFactory.DecimalConverter getDecimalConverter() {
+    return decimalConverter;
+  }
+
+  @Override
+  public byte[] getBytePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public short[] getShortPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getShortIntPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public int[] getIntPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public long[] getLongPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public float[] getFloatPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public double[] getDoublePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[][] getByteArrayPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setFloatPage(float[] floatData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setDoublePage(double[] doubleData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 1e90387..4bdb252 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+
 /**
  * This is a decorator of column page, it performs decoding lazily (when 
caller calls getXXX
  * method to get the value from the page)
@@ -93,7 +95,23 @@ public class LazyColumnPage extends ColumnPage {
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    return columnPage.getDecimal(rowId);
+    DecimalConverterFactory.DecimalConverter decimalConverter =
+        ((DecimalColumnPage) columnPage).getDecimalConverter();
+    switch (columnPage.getDataType()) {
+      case BYTE:
+        return 
decimalConverter.getDecimal(converter.decodeLong(columnPage.getByte(rowId)));
+      case SHORT:
+        return 
decimalConverter.getDecimal(converter.decodeLong(columnPage.getShort(rowId)));
+      case SHORT_INT:
+        return 
decimalConverter.getDecimal(converter.decodeLong(columnPage.getShortInt(rowId)));
+      case INT:
+        return 
decimalConverter.getDecimal(converter.decodeLong(columnPage.getInt(rowId)));
+      case LONG:
+      case DECIMAL:
+        return columnPage.getDecimal(rowId);
+      default:
+        throw new RuntimeException("internal error: " + this.toString());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
new file mode 100644
index 0000000..01d3d87
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Represent a columnar data in one page for one column of decimal data type
+ */
+public class SafeDecimalColumnPage extends DecimalColumnPage {
+
+  // Only one of following fields will be used
+  private byte[] byteData;
+  private short[] shortData;
+  private int[] intData;
+  private long[] longData;
+  private byte[] shortIntData;
+  private byte[][] byteArrayData;
+
+  SafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, 
int pageSize) {
+    super(columnSpec, dataType, pageSize);
+    byteArrayData = new byte[pageSize][];
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    this.byteData = byteData;
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    this.shortData = shortData;
+  }
+
+  @Override
+  public void setShortIntPage(byte[] shortIntData) {
+    this.shortIntData = shortIntData;
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    this.intData = intData;
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    this.longData = longData;
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    byteArrayData = byteArray;
+  }
+
+  /**
+   * Set byte value at rowId
+   */
+  @Override
+  public void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  /**
+   * Set short value at rowId
+   */
+  @Override
+  public void putShort(int rowId, short value) {
+    shortData[rowId] = value;
+  }
+
+  /**
+   * Set integer value at rowId
+   */
+  @Override
+  public void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  /**
+   * Set long value at rowId
+   */
+  @Override
+  public void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  @Override
+  void putBytesAtRow(int rowId, byte[] bytes) {
+    byteArrayData[rowId] = bytes;
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        if (null == intData) {
+          intData = new int[pageSize];
+        }
+        putInt(rowId, (int) decimalConverter.convert(decimal));
+        break;
+      case DECIMAL_LONG:
+        if (null == longData) {
+          longData = new long[pageSize];
+        }
+        putLong(rowId, (long) decimalConverter.convert(decimal));
+        break;
+      default:
+        putBytes(rowId, (byte[]) decimalConverter.convert(decimal));
+    }
+  }
+
+  @Override
+  public void putShortInt(int rowId, int value) {
+    byte[] converted = ByteUtil.to3Bytes(value);
+    System.arraycopy(converted, 0, shortIntData, rowId * 3, 3);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    byteArrayData[rowId] = new byte[length];
+    System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    return byteData[rowId];
+  }
+
+  @Override
+  public byte[] getBytes(int rowId) {
+    return byteArrayData[rowId];
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    return shortData[rowId];
+  }
+
+  @Override
+  public int getShortInt(int rowId) {
+    return ByteUtil.valueOf3Bytes(shortIntData, rowId * 3);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return intData[rowId];
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    return longData[rowId];
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    long value;
+    switch (dataType) {
+      case BYTE:
+        value = getByte(rowId);
+        break;
+      case SHORT:
+        value = getShort(rowId);
+        break;
+      case SHORT_INT:
+        value = getShortInt(rowId);
+        break;
+      case INT:
+        value = getInt(rowId);
+        break;
+      case LONG:
+        value = getLong(rowId);
+        break;
+      default:
+        byte[] bytes = byteArrayData[rowId];
+        return decimalConverter.getDecimal(bytes);
+    }
+    return decimalConverter.getDecimal(value);
+  }
+
+  @Override
+  public void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+    System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length);
+  }
+
+  @Override
+  public void convertValue(ColumnPageValueConverter codec) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, intData[i]);
+        }
+        break;
+      case DECIMAL_LONG:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, longData[i]);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "not support value conversion on " + dataType + " page");
+    }
+  }
+
+  @Override
+  public void freeMemory() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 5e0e822..33d306d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -166,8 +166,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
     return doubleData[rowId];
   }
 
-  @Override
-  public BigDecimal getDecimal(int rowId) {
+  @Override public BigDecimal getDecimal(int rowId) {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index dde6132..b5daddb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -51,13 +51,12 @@ public class SafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    putBytes(rowId, decimalConverter.convert(decimal));
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    byte[] bytes = byteArrayData[rowId];
-    return decimalConverter.getDecimal(bytes);
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
new file mode 100644
index 0000000..45fa7d8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Represents a columnar data for decimal data type column for one page
+ */
+public class UnsafeDecimalColumnPage extends DecimalColumnPage {
+
+  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, 
int pageSize)
+      throws MemoryException {
+    super(columnSpec, dataType, pageSize);
+    capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
+    initMemory();
+  }
+
+  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, 
int pageSize,
+      int capacity) throws MemoryException {
+    super(columnSpec, dataType, pageSize);
+    this.capacity = capacity;
+    initMemory();
+  }
+
+  private void initMemory() throws MemoryException {
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        int size = pageSize << dataType.getSizeBits();
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, 
size);
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      case SHORT_INT:
+        size = pageSize * 3;
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, 
size);
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      case DECIMAL:
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, 
(long) (capacity));
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      default:
+        throw new UnsupportedOperationException("invalid data type: " + 
dataType);
+    }
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(byteData, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseAddress, 
baseOffset,
+            byteData.length << byteBits);
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(shortData, CarbonUnsafe.SHORT_ARRAY_OFFSET, baseAddress, 
baseOffset,
+            shortData.length << shortBits);
+  }
+
+  @Override
+  public void setShortIntPage(byte[] shortIntData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(shortIntData, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseAddress, 
baseOffset,
+            shortIntData.length);
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(intData, CarbonUnsafe.INT_ARRAY_OFFSET, baseAddress, 
baseOffset,
+            intData.length << intBits);
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(longData, CarbonUnsafe.LONG_ARRAY_OFFSET, baseAddress, 
baseOffset,
+            longData.length << longBits);
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    if (totalLength != 0) {
+      throw new IllegalStateException("page is not empty");
+    }
+    for (int i = 0; i < byteArray.length; i++) {
+      putBytes(i, byteArray[i]);
+    }
+  }
+
+  @Override
+  public void freeMemory() {
+    if (memoryBlock != null) {
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      memoryBlock = null;
+      baseAddress = null;
+      baseOffset = 0;
+    }
+  }
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    long offset = rowId << byteBits;
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    long offset = rowId << shortBits;
+    CarbonUnsafe.getUnsafe().putShort(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putShortInt(int rowId, int value) {
+    byte[] data = ByteUtil.to3Bytes(value);
+    long offset = rowId * 3L;
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, 
data[0]);
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 1, 
data[1]);
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 2, 
data[2]);
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    long offset = rowId << intBits;
+    CarbonUnsafe.getUnsafe().putInt(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    long offset = rowId << longBits;
+    CarbonUnsafe.getUnsafe().putLong(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putBytesAtRow(int rowId, byte[] bytes) {
+    putBytes(rowId, bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    try {
+      ensureMemory(length);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
+    CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET 
+ offset, baseAddress,
+        baseOffset + rowOffset[rowId], length);
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        putInt(rowId, (int) decimalConverter.convert(decimal));
+        break;
+      case DECIMAL_LONG:
+        putLong(rowId, (long) decimalConverter.convert(decimal));
+        break;
+      default:
+        putBytes(rowId, (byte[]) decimalConverter.convert(decimal));
+    }
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    long offset = rowId << byteBits;
+    return CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public byte[] getBytes(int rowId) {
+    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[] bytes = new byte[length];
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + 
rowOffset[rowId],
+        bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+    return bytes;
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    long offset = rowId << shortBits;
+    return CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public int getShortInt(int rowId) {
+    long offset = rowId * 3L;
+    byte[] data = new byte[3];
+    data[0] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + 
offset);
+    data[1] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + 
offset + 1);
+    data[2] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + 
offset + 2);
+    return ByteUtil.valueOf3Bytes(data, 0);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    long offset = rowId << intBits;
+    return CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    long offset = rowId << longBits;
+    return CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    long value;
+    switch (dataType) {
+      case BYTE:
+        value = getByte(rowId);
+        break;
+      case SHORT:
+        value = getShort(rowId);
+        break;
+      case SHORT_INT:
+        value = getShortInt(rowId);
+        break;
+      case INT:
+        value = getInt(rowId);
+        break;
+      case LONG:
+        value = getLong(rowId);
+        break;
+      default:
+        int length = rowOffset[rowId + 1] - rowOffset[rowId];
+        byte[] bytes = new byte[length];
+        CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + 
rowOffset[rowId], bytes,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+        return decimalConverter.getDecimal(bytes);
+    }
+    return decimalConverter.getDecimal(value);
+  }
+
+  @Override
+  void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + 
rowOffset[rowId], dest,
+        CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
+  }
+
+  @Override
+  public void convertValue(ColumnPageValueConverter codec) {
+    convertValueForDecimalType(codec);
+  }
+
+  private void convertValueForDecimalType(ColumnPageValueConverter codec) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << intBits;
+          codec.encode(i, CarbonUnsafe.getUnsafe().getInt(baseAddress, 
baseOffset + offset));
+        }
+        break;
+      case DECIMAL_LONG:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << longBits;
+          codec.encode(i, CarbonUnsafe.getUnsafe().getLong(baseAddress, 
baseOffset + offset));
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "not support value conversion on " + dataType + " page");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 85b9b9f..c9737a4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -21,35 +21,15 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
-// This extension uses unsafe memory to store page data, for variable length 
data type (string,
-// decimal)
+/**
+ * This extension uses unsafe memory to store page data, for variable length 
data type (string)
+ */
 public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
-  // memory allocated by Unsafe
-  private MemoryBlock memoryBlock;
-
-  // base address of memoryBlock
-  private Object baseAddress;
-
-  // base offset of memoryBlock
-  private long baseOffset;
-
-  // size of the allocated memory, in bytes
-  private int capacity;
-
-  // default size for each row, grows as needed
-  private static final int DEFAULT_ROW_SIZE = 8;
-
-  private static final double FACTOR = 1.25;
-
-  private final long taskId = 
ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
   /**
    * create a page
    */
@@ -84,23 +64,6 @@ public class UnsafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
     }
   }
 
-  /**
-   * reallocate memory if capacity length than current size + request size
-   */
-  private void ensureMemory(int requestSize) throws MemoryException {
-    if (totalLength + requestSize > capacity) {
-      int newSize = 2 * capacity;
-      MemoryBlock newBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
-          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
-      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
-      memoryBlock = newBlock;
-      baseAddress = newBlock.getBaseObject();
-      baseOffset = newBlock.getBaseOffset();
-      capacity = newSize;
-    }
-  }
-
   @Override
   public void putBytesAtRow(int rowId, byte[] bytes) {
     putBytes(rowId, bytes, 0, bytes.length);
@@ -128,17 +91,12 @@ public class UnsafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    putBytes(rowId, decimalConverter.convert(decimal));
+
   }
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    int length = rowOffset[rowId + 1] - rowOffset[rowId];
-    byte[] bytes = new byte[length];
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + 
rowOffset[rowId],
-        bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-
-    return decimalConverter.getDecimal(bytes);
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 9338bbc..83e6ef0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -22,21 +22,48 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
+import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
 import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
 
 public abstract class VarLengthColumnPageBase extends ColumnPage {
 
+  static final int byteBits = BYTE.getSizeBits();
+  static final int shortBits = DataType.SHORT.getSizeBits();
+  static final int intBits = DataType.INT.getSizeBits();
+  static final int longBits = DataType.LONG.getSizeBits();
+  // default size for each row, grows as needed
+  static final int DEFAULT_ROW_SIZE = 8;
+
+  static final double FACTOR = 1.25;
+
+  final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+  // memory allocated by Unsafe
+  MemoryBlock memoryBlock;
+
+  // base address of memoryBlock
+  Object baseAddress;
+
   // the offset of row in the unsafe memory, its size is pageSize + 1
   int[] rowOffset;
 
   // the length of bytes added in the page
   int totalLength;
 
+  // base offset of memoryBlock
+  long baseOffset;
+
+  // size of the allocated memory, in bytes
+  int capacity;
   VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, 
int pageSize) {
     super(columnSpec, dataType, pageSize);
     rowOffset = new int[pageSize + 1];
@@ -116,9 +143,9 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
 
     VarLengthColumnPageBase page;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(columnSpec, DECIMAL, rowId);
+      page = new UnsafeDecimalColumnPage(columnSpec, DECIMAL, rowId);
     } else {
-      page = new SafeVarLengthColumnPage(columnSpec, DECIMAL, rowId);
+      page = new SafeDecimalColumnPage(columnSpec, DECIMAL, rowId);
     }
 
     // set total length and rowOffset in page
@@ -159,9 +186,9 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(columnSpec, DECIMAL, numRows, 
inputDataLength);
+      page = new UnsafeDecimalColumnPage(columnSpec, DECIMAL, numRows, 
inputDataLength);
     } else {
-      page = new SafeVarLengthColumnPage(columnSpec, dataType, numRows);
+      page = new SafeDecimalColumnPage(columnSpec, dataType, numRows);
     }
 
     // set total length and rowOffset in page
@@ -330,4 +357,21 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
   public void convertValue(ColumnPageValueConverter codec) {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
+
+  /**
+   * reallocate memory if capacity length than current size + request size
+   */
+  protected void ensureMemory(int requestSize) throws MemoryException {
+    if (totalLength + requestSize > capacity) {
+      int newSize = 2 * capacity;
+      MemoryBlock newBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
+          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      memoryBlock = newBlock;
+      baseAddress = newBlock.getBaseObject();
+      baseOffset = newBlock.getBaseOffset();
+      capacity = newSize;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 87eb77a..422ce67 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -161,8 +161,8 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta 
implements Writable
         out.writeDouble(0d); // unique value is obsoleted, maintain for 
compatibility
         break;
       case DECIMAL:
-        byte[] maxAsBytes = getMaxAsBytes();
-        byte[] minAsBytes = getMinAsBytes();
+        byte[] maxAsBytes = getMaxAsBytes(columnSpec.getSchemaDataType());
+        byte[] minAsBytes = getMinAsBytes(columnSpec.getSchemaDataType());
         byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
         out.writeShort((short) maxAsBytes.length);
         out.write(maxAsBytes);
@@ -232,20 +232,20 @@ public class ColumnPageEncoderMeta extends 
ValueEncoderMeta implements Writable
     }
   }
 
-  public byte[] getMaxAsBytes() {
-    return getValueAsBytes(getMaxValue());
+  public byte[] getMaxAsBytes(DataType dataType) {
+    return getValueAsBytes(getMaxValue(), dataType);
   }
 
-  public byte[] getMinAsBytes() {
-    return getValueAsBytes(getMinValue());
+  public byte[] getMinAsBytes(DataType dataType) {
+    return getValueAsBytes(getMinValue(), dataType);
   }
 
   /**
    * convert value to byte array
    */
-  private byte[] getValueAsBytes(Object value) {
+  private byte[] getValueAsBytes(Object value, DataType dataType) {
     ByteBuffer b;
-    switch (storeDataType) {
+    switch (dataType) {
       case BYTE:
         b = ByteBuffer.allocate(8);
         b.putLong((byte) value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index f08444b..ce16ad5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -17,10 +17,13 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.math.BigDecimal;
+
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import 
org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
 import 
org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
 import 
org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
@@ -31,6 +34,7 @@ import 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.Direc
 import 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 
 /**
  * Default factory will select encoding base on column page data type and 
statistics
@@ -113,10 +117,11 @@ public class DefaultEncodingFactory extends 
EncodingFactory {
       case INT:
       case LONG:
         return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
+      case DECIMAL:
+        return createEncoderForDecimalDataTypeMeasure(columnPage);
       case FLOAT:
       case DOUBLE:
         return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
-      case DECIMAL:
       case BYTE_ARRAY:
         return new 
DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
       default:
@@ -124,6 +129,19 @@ public class DefaultEncodingFactory extends 
EncodingFactory {
     }
   }
 
+  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage 
columnPage) {
+    DecimalConverterFactory.DecimalConverterType decimalConverterType =
+        ((DecimalColumnPage) 
columnPage).getDecimalConverter().getDecimalConverterType();
+    switch (decimalConverterType) {
+      case DECIMAL_INT:
+      case DECIMAL_LONG:
+        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), 
decimalConverterType)
+            .createEncoder(null);
+      default:
+        return new 
DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
+    }
+  }
+
   private static DataType fitLongMinMax(long max, long min) {
     if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
       return DataType.BYTE;
@@ -155,6 +173,35 @@ public class DefaultEncodingFactory extends 
EncodingFactory {
     }
   }
 
+  private static DataType fitMinMaxForDecimalType(DataType dataType, Object 
max, Object min,
+      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+    long maxValue = ((BigDecimal) max).unscaledValue().longValue();
+    long minValue = ((BigDecimal) min).unscaledValue().longValue();
+    switch (decimalConverterType) {
+      case DECIMAL_INT:
+        return fitLongMinMax((int) maxValue, (int) minValue);
+      case DECIMAL_LONG:
+        return fitLongMinMax(maxValue, minValue);
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+  }
+
+  private static DataType fitDeltaForDecimalType(DataType dataType, Object 
max, Object min,
+      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+    long maxValue = ((BigDecimal) max).unscaledValue().longValue();
+    long minValue = ((BigDecimal) min).unscaledValue().longValue();
+    switch (decimalConverterType) {
+      case DECIMAL_INT:
+        long value = maxValue - minValue;
+        return compareMinMaxAndSelectDataType(value);
+      case DECIMAL_LONG:
+        return DataType.LONG;
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+  }
+
   // fit the long input value into minimum data type
   private static DataType fitDelta(DataType dataType, Object max, Object min) {
     // use long data type to calculate delta to avoid overflow
@@ -177,6 +224,10 @@ public class DefaultEncodingFactory extends 
EncodingFactory {
       default:
         throw new RuntimeException("internal error: " + dataType);
     }
+    return compareMinMaxAndSelectDataType(value);
+  }
+
+  private static DataType compareMinMaxAndSelectDataType(long value) {
     if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
       return DataType.BYTE;
     } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
@@ -204,8 +255,10 @@ public class DefaultEncodingFactory extends 
EncodingFactory {
     } else {
       deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), 
stats.getMin());
     }
-    if (Math.min(adaptiveDataType.getSizeInBytes(), 
deltaDataType.getSizeInBytes()) ==
-        srcDataType.getSizeInBytes()) {
+    // in case of decimal data type check if the decimal converter type is Int 
or Long and based on
+    // that get size in bytes
+    if (Math.min(adaptiveDataType.getSizeInBytes(), 
deltaDataType.getSizeInBytes()) == srcDataType
+        .getSizeInBytes()) {
       // no effect to use adaptive or delta, use compression only
       return new DirectCompressCodec(stats.getDataType());
     }
@@ -247,4 +300,38 @@ public class DefaultEncodingFactory extends 
EncodingFactory {
     }
   }
 
+  /**
+   * choose between adaptive encoder or delta adaptive encoder, based on whose 
target data type
+   * size is smaller for decimal data type
+   */
+  static ColumnPageCodec selectCodecByAlgorithmForDecimal(SimpleStatsResult 
stats,
+      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+    DataType srcDataType = stats.getDataType();
+    DataType adaptiveDataType =
+        fitMinMaxForDecimalType(stats.getDataType(), stats.getMax(), 
stats.getMin(),
+            decimalConverterType);
+    DataType deltaDataType;
+
+    if (adaptiveDataType == DataType.LONG) {
+      deltaDataType = DataType.LONG;
+    } else {
+      deltaDataType = fitDeltaForDecimalType(stats.getDataType(), 
stats.getMax(), stats.getMin(),
+          decimalConverterType);
+    }
+    // in case of decimal data type check if the decimal converter type is Int 
or Long and based on
+    // that get size in bytes
+    if (Math.min(adaptiveDataType.getSizeInBytes(), 
deltaDataType.getSizeInBytes()) == srcDataType
+        .getSizeInBytes()) {
+      // no effect to use adaptive or delta, use compression only
+      return new DirectCompressCodec(stats.getDataType());
+    }
+    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+      // choose adaptive encoding
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, 
stats);
+    } else {
+      // choose delta adaptive encoding
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), 
deltaDataType, stats);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index ad327f7..383670a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,8 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.format.Encoding;
 
+import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
+
 /**
  * Codec for integer (byte, short, int, long) data type and floating data type 
(in case of
  * scale is 0).
@@ -65,6 +68,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec 
{
       case DOUBLE:
         this.max = (long) (double) stats.getMax();
         break;
+      case DECIMAL:
+        this.max = ((BigDecimal) stats.getMax()).unscaledValue().longValue();
+        break;
       default:
         // this codec is for integer type only
         throw new UnsupportedOperationException(
@@ -111,13 +117,18 @@ public class AdaptiveDeltaIntegralCodec extends 
AdaptiveCodec {
     };
   }
 
-  @Override
-  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+  @Override public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta 
meta) {
     return new ColumnPageDecoder() {
-      @Override
-      public ColumnPage decode(byte[] input, int offset, int length)
+      @Override public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
+        ColumnPage page = null;
+        switch (meta.getSchemaDataType()) {
+          case DECIMAL:
+            page = ColumnPage.decompressDecimalPage(meta, input, offset, 
length);
+            break;
+          default:
+            page = ColumnPage.decompress(meta, input, offset, length);
+        }
         return LazyColumnPage.newPage(page, converter);
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 6df2e64..c7c10a5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -92,7 +92,14 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
+        ColumnPage page = null;
+        switch (meta.getSchemaDataType()) {
+          case DECIMAL:
+            page = ColumnPage.decompressDecimalPage(meta, input, offset, 
length);
+            break;
+          default:
+            page = ColumnPage.decompress(meta, input, offset, length);
+        }
         return LazyColumnPage.newPage(page, converter);
       }
     };
@@ -151,6 +158,9 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         case INT:
           encodedPage.putInt(rowId, (int) value);
           break;
+        case LONG:
+          encodedPage.putLong(rowId, (long) value);
+          break;
         default:
           throw new RuntimeException("internal error: " + debugInfo());
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 0343e38..9dbc9b4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.metadata.datatype;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -53,37 +52,50 @@ public final class DecimalConverterFactory {
     return data;
   }
 
+  public enum DecimalConverterType {
+    DECIMAL_LV(-1), DECIMAL_INT(4), DECIMAL_LONG(8), DECIMAL_UNSCALED(-1);
+
+    private int sizeInBytes;
+
+    DecimalConverterType(int sizeInBytes) {
+      this.sizeInBytes = sizeInBytes;
+    }
+
+    public int getSizeInBytes() {
+      return sizeInBytes;
+    }
+
+  }
+
   public interface DecimalConverter {
 
-    byte[] convert(BigDecimal decimal);
+    Object convert(BigDecimal decimal);
 
-    BigDecimal getDecimal(byte[] bytes);
+    BigDecimal getDecimal(Object valueToBeConverted);
 
     void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int 
rowId);
 
     int getSize();
 
+    DecimalConverterType getDecimalConverterType();
+
   }
 
   public class DecimalIntConverter implements DecimalConverter {
 
-    private ByteBuffer buffer = ByteBuffer.allocate(4);
-
     private int scale;
 
     public DecimalIntConverter(int precision, int scale) {
       this.scale = scale;
     }
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       long longValue = decimal.unscaledValue().longValue();
-      buffer.putInt(0, (int) longValue);
-      return buffer.array().clone();
+      return (int) longValue;
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      long unscaled = getUnscaledLong(bytes);
-      return BigDecimal.valueOf(unscaled, scale);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
     @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
@@ -94,6 +106,10 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return 4;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_INT;
+    }
   }
 
   private long getUnscaledLong(byte[] bytes) {
@@ -112,23 +128,19 @@ public final class DecimalConverterFactory {
 
   public class DecimalLongConverter implements DecimalConverter {
 
-    private ByteBuffer buffer = ByteBuffer.allocate(8);
-
     private int scale;
 
     public DecimalLongConverter(int precision, int scale) {
       this.scale = scale;
     }
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       long longValue = decimal.unscaledValue().longValue();
-      buffer.putLong(0, longValue);
-      return buffer.array().clone();
+      return longValue;
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      long unscaled = getUnscaledLong(bytes);
-      return BigDecimal.valueOf(unscaled, scale);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
     @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
@@ -139,6 +151,10 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return 8;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_LONG;
+    }
   }
 
   public class DecimalUnscaledConverter implements DecimalConverter {
@@ -155,7 +171,7 @@ public final class DecimalConverterFactory {
       this.numBytes = minBytesForPrecision[precision];
     }
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       byte[] bytes = decimal.unscaledValue().toByteArray();
       byte[] fixedLengthBytes = null;
       if (bytes.length == numBytes) {
@@ -181,8 +197,8 @@ public final class DecimalConverterFactory {
       return value;
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      BigInteger bigInteger = new BigInteger(bytes);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      BigInteger bigInteger = new BigInteger((byte[]) valueToBeConverted);
       return new BigDecimal(bigInteger, scale);
     }
 
@@ -193,18 +209,22 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return numBytes;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_UNSCALED;
+    }
   }
 
   public static class LVBytesDecimalConverter implements DecimalConverter {
 
     public static LVBytesDecimalConverter INSTANCE = new 
LVBytesDecimalConverter();
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       return DataTypeUtil.bigDecimalToByte(decimal);
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      return DataTypeUtil.byteToBigDecimal(bytes);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
     }
 
     @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector 
vector, int rowId) {
@@ -214,6 +234,10 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return -1;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_LV;
+    }
   }
 
   public DecimalConverter getDecimalConverter(int precision, int scale) {

Reply via email to