[CARBONDATA-3112] Optimise decompressing while filling the vector during 
conversion of primitive typess

Following optimizations done in the PR.

1. Optimise decompressing while filling the vector during conversion of 
primitive types. It avoids creating an intermediate buffer while decompression.
2. Refactor the global dictionary decoder codegen to minimise the amount of 
code generated to reduce the time.
3. Disable lazy load for full scan queries as it is unnecessary.
4. Refactor the compressor interface and created Abstract class. All primitive 
datatype conversions now happen in little_endian as snappy does
that conversion while compressing. So it might break the compatibility for ZSTD 
for the last version.

This closes #2863


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bed51ba7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bed51ba7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bed51ba7

Branch: refs/heads/master
Commit: bed51ba772cf0e8c5c648f620b62d2c9ba4ef9e8
Parents: 51b10ba
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Fri Oct 26 20:50:53 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Nov 21 12:23:57 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 ...mpressedDimensionChunkFileBasedReaderV3.java |   2 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |   2 +-
 .../safe/AbstractNonDictionaryVectorFiller.java |  47 +++--
 ...feVariableLengthDimensionDataChunkStore.java |   2 +-
 .../compression/AbstractCompressor.java         | 123 ++++++++++++
 .../datastore/compression/SnappyCompressor.java |   4 +-
 .../datastore/compression/ZstdCompressor.java   |  95 +--------
 .../page/ColumnPageValueConverter.java          |   6 +-
 .../datastore/page/VarLengthColumnPageBase.java |   2 +-
 .../page/encoding/ColumnPageDecoder.java        |   2 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |  74 ++++---
 .../adaptive/AdaptiveDeltaIntegralCodec.java    | 164 ++++++++-------
 .../adaptive/AdaptiveFloatingCodec.java         |  73 +++----
 .../adaptive/AdaptiveIntegralCodec.java         | 137 +++++++------
 .../encoding/compress/DirectCompressCodec.java  | 146 ++++++++------
 .../datastore/page/encoding/rle/RLECodec.java   |   2 +-
 .../statistics/PrimitivePageStatsCollector.java |   7 +
 .../page/statistics/StatisticsCollector.java    |  66 ------
 .../datatype/DecimalConverterFactory.java       |  53 +++--
 .../scan/result/vector/CarbonColumnVector.java  |   4 +
 .../scan/result/vector/CarbonDictionary.java    |   2 +
 .../vector/impl/CarbonColumnVectorImpl.java     |  35 +++-
 .../vector/impl/CarbonDictionaryImpl.java       |  37 ++++
 .../AbstractCarbonColumnarVector.java           |  10 +
 ...umnarVectorWrapperDirectWithDeleteDelta.java |  10 +-
 ...erDirectWithDeleteDeltaAndInvertedIndex.java |  34 +++-
 ...narVectorWrapperDirectWithInvertedIndex.java |   9 +-
 .../apache/carbondata/core/util/ByteUtil.java   |  28 ++-
 .../presto/CarbonColumnVectorWrapper.java       |   9 +
 .../src/test/resources/IUD/negativevalue.csv    |   7 +
 .../iud/UpdateCarbonTableTestCase.scala         |  17 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  10 +
 .../ColumnarVectorWrapperDirect.java            |   8 +
 .../VectorizedCarbonRecordReader.java           |  31 ++-
 .../datasources/SparkCarbonFileFormat.scala     |  10 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 156 ++++++++++-----
 .../org/apache/spark/sql/CarbonVectorProxy.java | 200 ++++++++++++++-----
 .../stream/CarbonStreamRecordReader.java        |   5 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 195 ++++++++++++------
 .../strategy/CarbonLateDecodeStrategy.scala     |  26 ++-
 41 files changed, 1193 insertions(+), 663 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b75648e..094e552 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1949,6 +1949,12 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_WRITTEN_BY_APPNAME = 
"carbon.writtenby.app.name";
 
+  /**
+   * When more global dictionary columns are there then there is issue in 
generating codegen to them
+   * and it slows down the query.So we limit to 100 for now
+   */
+  public static final int CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT = 100;
+
   
//////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   
//////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 9df5bc1..c85c9ee 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -252,7 +252,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 
extends AbstractChunkRead
     if (vectorInfo != null) {
       decoder
           .decodeAndFillVector(pageData.array(), offset, 
pageMetadata.data_page_length, vectorInfo,
-              nullBitSet, isLocalDictEncodedPage);
+              nullBitSet, isLocalDictEncodedPage, 
pageMetadata.numberOfRowsInpage);
       return null;
     } else {
       return decoder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index a754cf2..2d3979a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -242,7 +242,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 
extends AbstractMeasureChun
     if (vectorInfo != null) {
       codec
           .decodeAndFillVector(pageData.array(), offset, 
pageMetadata.data_page_length, vectorInfo,
-              nullBitSet, false);
+              nullBitSet, false, pageMetadata.numberOfRowsInpage);
       return null;
     } else {
       return codec.decode(pageData.array(), offset, 
pageMetadata.data_page_length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
index f2e91be..5e0dfdf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -23,6 +23,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import 
org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectWithInvertedIndex;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -88,10 +89,11 @@ class StringVectorFiller extends 
AbstractNonDictionaryVectorFiller {
           CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, 
localOffset, length)) {
         vector.putNull(i);
       } else {
-        vector.putByteArray(i, localOffset, length, data);
+        vector.putArray(i, localOffset, length);
       }
       localOffset += length;
     }
+    vector.putAllByteArray(data, 0, data.length);
   }
 }
 
@@ -100,23 +102,40 @@ class LongStringVectorFiller extends 
AbstractNonDictionaryVectorFiller {
     super(numberOfRows);
   }
 
-  @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector) {
+  @Override public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
+    boolean invertedIndex = vector instanceof 
ColumnarVectorWrapperDirectWithInvertedIndex;
     int localOffset = 0;
     ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
-    for (int i = 0; i < numberOfRows; i++) {
-      int length =
-          (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 
0xFF) << 16) | (
-              (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] & 
0xFF));
-      localOffset += 4;
-      if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, 
localOffset, length)) {
-        vector.putNull(i);
-      } else {
-        vector.putByteArray(i, localOffset, length, data);
+    if (invertedIndex) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int length =
+            (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 
0xFF) << 16) | (
+                (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] 
& 0xFF));
+        localOffset += 4;
+        if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 
0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, 
localOffset, length)) {
+          vector.putNull(i);
+        } else {
+          vector.putByteArray(i, localOffset, length, data);
+        }
+        localOffset += length;
       }
-      localOffset += length;
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        int length =
+            (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 
0xFF) << 16) | (
+                (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] 
& 0xFF));
+        localOffset += 4;
+        if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 
0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, 
localOffset, length)) {
+          vector.putNull(i);
+        } else {
+          vector.putArray(i, localOffset, length);
+        }
+        localOffset += length;
+      }
+      vector.putAllByteArray(data, 0, data.length);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 01db383..3c00fd8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -51,7 +51,6 @@ public abstract class 
SafeVariableLengthDimensionDataChunkStore
   public SafeVariableLengthDimensionDataChunkStore(boolean isInvertedIndex, 
int numberOfRows) {
     super(isInvertedIndex);
     this.numberOfRows = numberOfRows;
-    this.dataOffsets = new int[numberOfRows];
   }
 
   /**
@@ -66,6 +65,7 @@ public abstract class 
SafeVariableLengthDimensionDataChunkStore
       byte[] data) {
     // first put the data, inverted index and reverse inverted index to memory
     super.putArray(invertedIndex, invertedIndexReverse, data);
+    this.dataOffsets = new int[numberOfRows];
     // As data is of variable length and data format is
     // <length in short><data><length in short><data>
     // we need to store offset of each data so data can be accessed directly

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
new file mode 100644
index 0000000..5123cc6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.ShortBuffer;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public abstract class AbstractCompressor implements Compressor {
+
+  @Override
+  public byte[] compressShort(short[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_SHORT);
+    
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public short[] unCompressShort(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    ShortBuffer unCompBuffer =
+        
ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer();
+    short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT];
+    unCompBuffer.get(shorts);
+    return shorts;
+  }
+
+  @Override
+  public byte[] compressInt(int[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_INT);
+    unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public int[] unCompressInt(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    IntBuffer unCompBuffer =
+        
ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+    int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT];
+    unCompBuffer.get(ints);
+    return ints;
+  }
+
+  @Override
+  public byte[] compressLong(long[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_LONG);
+    
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public long[] unCompressLong(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    LongBuffer unCompBuffer =
+        
ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer();
+    long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG];
+    unCompBuffer.get(longs);
+    return longs;
+  }
+
+  @Override
+  public byte[] compressFloat(float[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_FLOAT);
+    
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public float[] unCompressFloat(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    FloatBuffer unCompBuffer =
+        
ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer();
+    float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT];
+    unCompBuffer.get(floats);
+    return floats;
+  }
+
+  @Override
+  public byte[] compressDouble(double[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_DOUBLE);
+    
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public double[] unCompressDouble(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    DoubleBuffer unCompBuffer =
+        
ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer();
+    double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE];
+    unCompBuffer.get(doubles);
+    return doubles;
+  }
+
+  @Override
+  public long rawCompress(long inputAddress, int inputSize, long 
outputAddress) throws IOException {
+    throw new RuntimeException("Not implemented rawCompress for " + 
this.getName());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index c86011c..6f4a9c6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -26,7 +26,7 @@ import org.apache.log4j.Logger;
 import org.xerial.snappy.Snappy;
 import org.xerial.snappy.SnappyNative;
 
-public class SnappyCompressor implements Compressor {
+public class SnappyCompressor extends AbstractCompressor {
 
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(SnappyCompressor.class.getName());
@@ -90,7 +90,7 @@ public class SnappyCompressor implements Compressor {
     try {
       uncompressedLength = Snappy.uncompressedLength(compInput, offset, 
length);
       data = new byte[uncompressedLength];
-      Snappy.uncompress(compInput, offset, length, data, 0);
+      snappyNative.rawUncompress(compInput, offset, length, data, 0);
     } catch (IOException e) {
       LOGGER.error(e.getMessage(), e);
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
index 914c3e7..3e6a11b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
@@ -18,18 +18,10 @@
 package org.apache.carbondata.core.datastore.compression;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.DoubleBuffer;
-import java.nio.FloatBuffer;
-import java.nio.IntBuffer;
-import java.nio.LongBuffer;
-import java.nio.ShortBuffer;
-
-import org.apache.carbondata.core.util.ByteUtil;
 
 import com.github.luben.zstd.Zstd;
 
-public class ZstdCompressor implements Compressor {
+public class ZstdCompressor extends AbstractCompressor {
   private static final int COMPRESS_LEVEL = 3;
 
   public ZstdCompressor() {
@@ -65,91 +57,6 @@ public class ZstdCompressor implements Compressor {
   }
 
   @Override
-  public byte[] compressShort(short[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_SHORT);
-    unCompBuffer.asShortBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public short[] unCompressShort(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    ShortBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asShortBuffer();
-    short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT];
-    unCompBuffer.get(shorts);
-    return shorts;
-  }
-
-  @Override
-  public byte[] compressInt(int[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_INT);
-    unCompBuffer.asIntBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public int[] unCompressInt(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    IntBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asIntBuffer();
-    int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT];
-    unCompBuffer.get(ints);
-    return ints;
-  }
-
-  @Override
-  public byte[] compressLong(long[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_LONG);
-    unCompBuffer.asLongBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public long[] unCompressLong(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    LongBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asLongBuffer();
-    long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG];
-    unCompBuffer.get(longs);
-    return longs;
-  }
-
-  @Override
-  public byte[] compressFloat(float[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_FLOAT);
-    unCompBuffer.asFloatBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public float[] unCompressFloat(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    FloatBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asFloatBuffer();
-    float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT];
-    unCompBuffer.get(floats);
-    return floats;
-  }
-
-  @Override
-  public byte[] compressDouble(double[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_DOUBLE);
-    unCompBuffer.asDoubleBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public double[] unCompressDouble(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    DoubleBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asDoubleBuffer();
-    double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE];
-    unCompBuffer.get(doubles);
-    return doubles;
-  }
-
-  @Override
-  public long rawCompress(long inputAddress, int inputSize, long 
outputAddress) throws IOException {
-    throw new RuntimeException("Not implemented rawCompress for zstd yet");
-  }
-
-  @Override
   public long rawUncompress(byte[] input, byte[] output) throws IOException {
     return Zstd.decompress(output, input);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
index 82ccd22..5bc46e9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -17,6 +17,9 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 // Transformation type that can be applied to ColumnPage
@@ -37,5 +40,6 @@ public interface ColumnPageValueConverter {
   double decodeDouble(long value);
   double decodeDouble(float value);
   double decodeDouble(double value);
-  void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo);
+  void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, 
BitSet nullBits,
+      DataType pageDataType, int pageSize);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index a760b64..81bb1b5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -124,7 +124,7 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
+  public static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
       byte[] lvEncodedBytes) throws MemoryException {
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     DecimalConverterFactory.DecimalConverter decimalConverter =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
index d82a873..6f36c08 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
@@ -35,7 +35,7 @@ public interface ColumnPageDecoder {
    *  Apply decoding algorithm on input byte array and fill the vector here.
    */
   void decodeAndFillVector(byte[] input, int offset, int length, 
ColumnVectorInfo vectorInfo,
-      BitSet nullBits, boolean isLVEncoded) throws MemoryException, 
IOException;
+      BitSet nullBits, boolean isLVEncoded, int pageSize) throws 
MemoryException, IOException;
 
   ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
       throws MemoryException, IOException;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index f91ede5..735847e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -27,7 +27,6 @@ import 
org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -133,14 +132,13 @@ public class AdaptiveDeltaFloatingCodec extends 
AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, 
int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, 
isLVEncoded);
-        page.setNullBits(nullBits);
-        if (page instanceof DecimalColumnPage) {
-          vectorInfo.decimalConverter = ((DecimalColumnPage) 
page).getDecimalConverter();
-        }
-        converter.decodeAndFillVector(page, vectorInfo);
+        Compressor compressor =
+            
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, 
length);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, 
meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override public ColumnPage decode(byte[] input, int offset, int length, 
boolean isLVEncoded)
@@ -244,69 +242,67 @@ public class AdaptiveDeltaFloatingCodec extends 
AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo 
vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo 
vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, 
true, false);
+      int rowId = 0;
       if (vectorDataType == DataTypes.FLOAT) {
         float floatFactor = factor.floatValue();
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == 
DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (max - byteData[i]) / floatFactor);
+            vector.putFloat(i, (max - pageData[i]) / floatFactor);
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (max - shortData[i]) / floatFactor);
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector
+                .putFloat(rowId++, (max - 
ByteUtil.toShortLittleEndian(pageData, i)) / floatFactor);
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putFloat(i, (max - shortInt) / floatFactor);
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putFloat(rowId++, (max - shortInt) / floatFactor);
           }
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (max - intData[i]) / floatFactor);
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (max - 
ByteUtil.toIntLittleEndian(pageData, i)) / floatFactor);
           }
         } else {
           throw new RuntimeException("internal error: " + this.toString());
         }
       } else {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == 
DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - byteData[i]) / factor);
+            vector.putDouble(rowId++, (max - pageData[i]) / factor);
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - shortData[i]) / factor);
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - 
ByteUtil.toShortLittleEndian(pageData, i)) / factor);
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putDouble(i, (max - shortInt) / factor);
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putDouble(rowId++, (max - shortInt) / factor);
           }
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - intData[i]) / factor);
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - 
ByteUtil.toIntLittleEndian(pageData, i)) / factor);
           }
         } else if (pageDataType == DataTypes.LONG) {
-          long[] longData = columnPage.getLongPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - longData[i]) / factor);
+          int size = pageSize * DataTypes.LONG.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - 
ByteUtil.toLongLittleEndian(pageData, i)) / factor);
           }
         } else {
           throw new RuntimeException("Unsupported datatype : " + pageDataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 12d108b..578945b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -24,11 +24,11 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -144,17 +144,20 @@ public class AdaptiveDeltaIntegralCodec extends 
AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, 
int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = null;
+        Compressor compressor =
+            
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, 
length);
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
-          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-          vectorInfo.decimalConverter = ((DecimalColumnPage) 
page).getDecimalConverter();
-        } else {
-          page = ColumnPage.decompress(meta, input, offset, length, 
isLVEncoded);
+          TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+          DecimalConverterFactory.DecimalConverter decimalConverter =
+              DecimalConverterFactory.INSTANCE
+                  .getDecimalConverter(columnSpec.getPrecision(), 
columnSpec.getScale());
+          vectorInfo.decimalConverter = decimalConverter;
         }
-        page.setNullBits(nullBits);
-        converter.decodeAndFillVector(page, vectorInfo);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, 
meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override
@@ -300,17 +303,15 @@ public class AdaptiveDeltaIntegralCodec extends 
AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo 
vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo 
vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
       DataType vectorDataType = vector.getType();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, 
nullBits, deletedRows,
               true, false);
-      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, 
vectorInfo);
+      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, 
vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i 
+ 1)) {
           vector.putNull(i);
@@ -321,165 +322,180 @@ public class AdaptiveDeltaIntegralCodec extends 
AdaptiveCodec {
       }
     }
 
-    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+    private void fillVector(byte[] pageData, CarbonColumnVector vector,
         DataType vectorDataType, DataType pageDataType, int pageSize, 
ColumnVectorInfo vectorInfo) {
       int newScale = 0;
       if (vectorInfo.measure != null) {
         newScale = vectorInfo.measure.getMeasure().getScale();
       }
+      int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) 
{
-        byte[] byteData = columnPage.getBytePage();
         if (vectorDataType == DataTypes.SHORT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) (max - byteData[i]));
+            vector.putShort(i, (short) (max - pageData[i]));
           }
         } else if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) (max - byteData[i]));
+            vector.putInt(i, (int) (max - pageData[i]));
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - byteData[i]));
+            vector.putLong(i, (max - pageData[i]));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - (long) byteData[i]) * 1000);
+            vector.putLong(i, (max - (long) pageData[i]) * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putByte(i, (byte) (max - byteData[i]));
+            vector.putByte(i, (byte) (max - pageData[i]));
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - 
byteData[i]);
+            BigDecimal decimal = decimalConverter.getDecimal(max - 
pageData[i]);
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
             vector.putDecimal(i, decimal, precision);
           }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (int) (max - pageData[i]));
+          }
         } else {
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - byteData[i]));
+            vector.putDouble(i, (max - pageData[i]));
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        short[] shortData = columnPage.getShortPage();
+        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
         if (vectorDataType == DataTypes.SHORT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putShort(rowId++, (short) (max - 
ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putInt(rowId++, (int) (max - 
ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - 
ByteUtil.toShortLittleEndian(pageData, i)));
           }
-        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - (long) shortData[i]) * 1000);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector
+                .putLong(rowId++, (max - (long) 
ByteUtil.toShortLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - 
shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            BigDecimal decimal =
+                decimalConverter.getDecimal(max - 
ByteUtil.toShortLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
-            vector.putDecimal(i, decimal, precision);
+            vector.putDecimal(rowId++, decimal, precision);
+          }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (int) (max - 
ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - 
ByteUtil.toShortLittleEndian(pageData, i)));
           }
         }
-
       } else if (pageDataType == DataTypes.SHORT_INT) {
-        byte[] shortIntPage = columnPage.getShortIntPage();
+        int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putInt(i, (int) (max - shortInt));
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putInt(rowId++, (int) (max - shortInt));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, (max - shortInt));
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putLong(rowId++, (max - shortInt));
           }
-        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, (max - (long) shortInt) * 1000);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            vector.putLong(rowId++, (max - (long) 
ByteUtil.valueOf3Bytes(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
             vector.putDecimal(i, decimal, precision);
           }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putFloat(rowId++, (int) (max - shortInt));
+          }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putDouble(i, (max - shortInt));
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putDouble(rowId++, (max - shortInt));
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int[] intData = columnPage.getIntPage();
+        int size = pageSize * DataTypes.INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) (max - intData[i]));
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putInt(rowId++, (int) (max - 
ByteUtil.toIntLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - intData[i]));
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - 
ByteUtil.toIntLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - (long) intData[i]) * 1000);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - (long) 
ByteUtil.toIntLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            BigDecimal decimal =
+                decimalConverter.getDecimal(max - 
ByteUtil.toIntLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
-            vector.putDecimal(i, decimal, precision);
+            vector.putDecimal(rowId++, decimal, precision);
           }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - intData[i]));
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - 
ByteUtil.toIntLittleEndian(pageData, i)));
           }
         }
       } else if (pageDataType == DataTypes.LONG) {
-        long[] longData = columnPage.getLongPage();
+        int size = pageSize * DataTypes.LONG.getSizeInBytes();
         if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - longData[i]));
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - 
ByteUtil.toLongLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - longData[i]) * 1000);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - 
ByteUtil.toLongLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - 
longData[i]);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            BigDecimal decimal =
+                decimalConverter.getDecimal(max - 
ByteUtil.toLongLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
-            vector.putDecimal(i, decimal, precision);
+            vector.putDecimal(rowId++, decimal, precision);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index b04c9df..c66c065 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -27,7 +27,6 @@ import 
org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -121,14 +120,13 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, 
int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, 
isLVEncoded);
-        page.setNullBits(nullBits);
-        if (page instanceof DecimalColumnPage) {
-          vectorInfo.decimalConverter = ((DecimalColumnPage) 
page).getDecimalConverter();
-        }
-        converter.decodeAndFillVector(page, vectorInfo);
+        Compressor compressor =
+            
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, 
length);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, 
meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override
@@ -235,68 +233,63 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo 
vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo 
vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, 
true, false);
+      int rowId = 0;
       if (vectorDataType == DataTypes.FLOAT) {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == 
DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (byteData[i] / floatFactor));
+            vector.putFloat(i, (pageData[i] / floatFactor));
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (shortData[i] / floatFactor));
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.toShortLittleEndian(pageData, 
i) / floatFactor));
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putFloat(i, (shortInt / floatFactor));
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            vector.putFloat(rowId++, (ByteUtil.valueOf3Bytes(pageData, i) / 
floatFactor));
           }
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (intData[i] / floatFactor));
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.toIntLittleEndian(pageData, i) 
/ floatFactor));
           }
         } else {
           throw new RuntimeException("internal error: " + this.toString());
         }
       } else {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == 
DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (byteData[i] / factor));
+            vector.putDouble(i, (pageData[i] / factor));
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (shortData[i] / factor));
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.toShortLittleEndian(pageData, 
i) / factor));
           }
-
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putDouble(i, (shortInt / factor));
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) 
{
+            vector.putDouble(rowId++, (ByteUtil.valueOf3Bytes(pageData, i) / 
factor));
           }
+
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (intData[i] / factor));
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.toIntLittleEndian(pageData, i) 
/ factor));
           }
         } else if (pageDataType == DataTypes.LONG) {
-          long[] longData = columnPage.getLongPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (longData[i] / factor));
+          int size = pageSize * DataTypes.LONG.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.toLongLittleEndian(pageData, 
i) / factor));
           }
         } else {
           throw new RuntimeException("Unsupported datatype : " + pageDataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index d77a949..d9db437 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -23,11 +23,11 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -121,17 +121,20 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, 
int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = null;
+        Compressor compressor =
+            
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, 
length);
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
-          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-          vectorInfo.decimalConverter = ((DecimalColumnPage) 
page).getDecimalConverter();
-        } else {
-          page = ColumnPage.decompress(meta, input, offset, length, 
isLVEncoded);
+          TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+          DecimalConverterFactory.DecimalConverter decimalConverter =
+              DecimalConverterFactory.INSTANCE
+                  .getDecimalConverter(columnSpec.getPrecision(), 
columnSpec.getScale());
+          vectorInfo.decimalConverter = decimalConverter;
         }
-        page.setNullBits(nullBits);
-        converter.decodeAndFillVector(page, vectorInfo);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, 
meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override public ColumnPage decode(byte[] input, int offset, int length, 
boolean isLVEncoded)
@@ -273,17 +276,15 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo 
vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo 
vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
       DataType vectorDataType = vector.getType();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, 
nullBits, deletedRows,
               true, false);
-      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, 
vectorInfo);
+      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, 
vectorInfo, nullBits);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i 
+ 1)) {
           vector.putNull(i);
@@ -295,123 +296,143 @@ public class AdaptiveIntegralCodec extends 
AdaptiveCodec {
 
     }
 
-    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
-        DataType vectorDataType, DataType pageDataType, int pageSize, 
ColumnVectorInfo vectorInfo) {
+    private void fillVector(byte[] pageData, CarbonColumnVector vector, 
DataType vectorDataType,
+        DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo, 
BitSet nullBits) {
+      int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) 
{
-        byte[] byteData = columnPage.getBytePage();
         if (vectorDataType == DataTypes.SHORT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) byteData[i]);
+            vector.putShort(i, (short) pageData[i]);
           }
         } else if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) byteData[i]);
+            vector.putInt(i, (int) pageData[i]);
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, byteData[i]);
+            vector.putLong(i, pageData[i]);
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) byteData[i] * 1000);
+            vector.putLong(i, (long) pageData[i] * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN) {
-          vector.putBytes(0, pageSize, byteData, 0);
+          vector.putBytes(0, pageSize, pageData, 0);
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
-          decimalConverter.fillVector(byteData, pageSize, vectorInfo, 
columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, 
nullBits, pageDataType);
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, pageData[i]);
+          }
         } else {
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, byteData[i]);
+            vector.putDouble(i, pageData[i]);
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        short[] shortData = columnPage.getShortPage();
+        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
         if (vectorDataType == DataTypes.SHORT) {
-          vector.putShorts(0, pageSize, shortData, 0);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putShort(rowId++, (ByteUtil.toShortLittleEndian(pageData, 
i)));
+          }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putInt(rowId++, (ByteUtil.toShortLittleEndian(pageData, 
i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, (ByteUtil.toShortLittleEndian(pageData, 
i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) shortData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, ((long) 
ByteUtil.toShortLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
-          decimalConverter.fillVector(shortData, pageSize, vectorInfo, 
columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, 
nullBits, pageDataType);
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.toShortLittleEndian(pageData, 
i)));
+          }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, ByteUtil.toShortLittleEndian(pageData, 
i));
           }
         }
 
       } else if (pageDataType == DataTypes.SHORT_INT) {
-        byte[] shortIntPage = columnPage.getShortIntPage();
         if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putInt(i, shortInt);
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putLong(i, shortInt);
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putLong(i, (long) shortInt * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
-          int[] shortIntData = ByteUtil.toIntArrayFrom3Bytes(shortIntPage, 
pageSize);
-          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, 
columnPage.getNullBits());
+          decimalConverter
+              .fillVector(pageData, pageSize, vectorInfo, nullBits, 
DataTypes.SHORT_INT);
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
+            vector.putFloat(i, shortInt);
+          }
         } else {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putDouble(i, shortInt);
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int[] intData = columnPage.getIntPage();
+        int size = pageSize * DataTypes.INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          vector.putInts(0, pageSize, intData, 0);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
+          }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) intData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (long) 
ByteUtil.toIntLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
-          decimalConverter.fillVector(intData, pageSize, vectorInfo, 
columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, 
nullBits, pageDataType);
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         }
       } else if (pageDataType == DataTypes.LONG) {
-        long[] longData = columnPage.getLongPage();
+        int size = pageSize * DataTypes.LONG.getSizeInBytes();
         if (vectorDataType == DataTypes.LONG) {
-          vector.putLongs(0, pageSize, longData, 0);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
+          }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, longData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 
1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = 
vectorInfo.decimalConverter;
-          decimalConverter.fillVector(longData, pageSize, vectorInfo, 
columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, 
nullBits, pageDataType);
         }
       } else {
-        double[] doubleData = columnPage.getDoublePage();
-        vector.putDoubles(0, pageSize, doubleData, 0);
+        int size = pageSize * DataTypes.DOUBLE.getSizeInBytes();
+        for (int i = 0; i < size; i += DataTypes.DOUBLE.getSizeInBytes()) {
+          vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, 
i));
+        }
       }
     }
   };

Reply via email to