http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 984efdb..c5b4d83 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -44,7 +44,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
 
 import mockit.Mock;
 import mockit.MockUp;
@@ -266,8 +266,8 @@ public class CarbonUtilTest {
   @Test public void testToGetNextLesserValue() {
     byte[] dataChunks = { 5, 6, 7, 8, 9 };
     byte[] compareValues = { 7 };
-    FixedLengthDimensionDataChunk fixedLengthDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+    FixedLengthDimensionColumnPage fixedLengthDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
     int result = CarbonUtil.nextLesserValueToTarget(2, fixedLengthDataChunk, 
compareValues);
     assertEquals(result, 1);
   }
@@ -275,8 +275,8 @@ public class CarbonUtilTest {
   @Test public void testToGetNextLesserValueToTarget() {
     byte[] dataChunks = { 7, 7, 7, 8, 9 };
     byte[] compareValues = { 7 };
-    FixedLengthDimensionDataChunk fixedLengthDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+    FixedLengthDimensionColumnPage fixedLengthDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
     int result = CarbonUtil.nextLesserValueToTarget(2, fixedLengthDataChunk, 
compareValues);
     assertEquals(result, -1);
   }
@@ -284,8 +284,8 @@ public class CarbonUtilTest {
   @Test public void testToGetnextGreaterValue() {
     byte[] dataChunks = { 5, 6, 7, 8, 9 };
     byte[] compareValues = { 7 };
-    FixedLengthDimensionDataChunk fixedLengthDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+    FixedLengthDimensionColumnPage fixedLengthDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
     int result = CarbonUtil.nextGreaterValueToTarget(2, fixedLengthDataChunk, 
compareValues, 5);
     assertEquals(result, 3);
   }
@@ -301,8 +301,8 @@ public class CarbonUtilTest {
   @Test public void testToGetnextGreaterValueToTarget() {
     byte[] dataChunks = { 5, 6, 7, 7, 7 };
     byte[] compareValues = { 7 };
-    FixedLengthDimensionDataChunk fixedLengthDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+    FixedLengthDimensionColumnPage fixedLengthDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
     int result = CarbonUtil.nextGreaterValueToTarget(2, fixedLengthDataChunk, 
compareValues, 5);
     assertEquals(result, 5);
   }
@@ -524,23 +524,23 @@ public class CarbonUtilTest {
   }
 
   @Test public void testToGetDictionaryEncodingArray() {
-    QueryDimension column1 = new QueryDimension("Column1");
-    QueryDimension column2 = new QueryDimension("Column2");
     ColumnSchema column1Schema = new ColumnSchema();
     ColumnSchema column2Schema = new ColumnSchema();
     column1Schema.setColumnName("Column1");
     List<Encoding> encoding = new ArrayList<>();
     encoding.add(Encoding.DICTIONARY);
     column1Schema.setEncodingList(encoding);
-    column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
+    ProjectionDimension
+        column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 
1, 1, 1, 1));
 
     column2Schema.setColumnName("Column2");
     List<Encoding> encoding2 = new ArrayList<>();
     encoding2.add(Encoding.DELTA);
     column2Schema.setEncodingList(encoding2);
-    column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
+    ProjectionDimension
+        column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 
1, 1, 1, 1));
 
-    QueryDimension[] queryDimensions = { column1, column2 };
+    ProjectionDimension[] queryDimensions = { column1, column2 };
 
     boolean[] dictionaryEncoding = 
CarbonUtil.getDictionaryEncodingArray(queryDimensions);
     boolean[] expectedDictionaryEncoding = { true, false };
@@ -550,23 +550,23 @@ public class CarbonUtilTest {
   }
 
   @Test public void testToGetDirectDictionaryEncodingArray() {
-    QueryDimension column1 = new QueryDimension("Column1");
-    QueryDimension column2 = new QueryDimension("Column2");
     ColumnSchema column1Schema = new ColumnSchema();
     ColumnSchema column2Schema = new ColumnSchema();
     column1Schema.setColumnName("Column1");
     List<Encoding> encoding = new ArrayList<>();
     encoding.add(Encoding.DIRECT_DICTIONARY);
     column1Schema.setEncodingList(encoding);
-    column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
+    ProjectionDimension
+        column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 
1, 1, 1, 1));
 
     column2Schema.setColumnName("Column2");
     List<Encoding> encoding2 = new ArrayList<>();
     encoding2.add(Encoding.DELTA);
     column2Schema.setEncodingList(encoding2);
-    column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
+    ProjectionDimension
+        column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 
1, 1, 1, 1));
 
-    QueryDimension[] queryDimensions = { column1, column2 };
+    ProjectionDimension[] queryDimensions = { column1, column2 };
 
     boolean[] dictionaryEncoding = 
CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
     boolean[] expectedDictionaryEncoding = { true, false };
@@ -576,19 +576,19 @@ public class CarbonUtilTest {
   }
 
   @Test public void testToGetComplexDataTypeArray() {
-    QueryDimension column1 = new QueryDimension("Column1");
-    QueryDimension column2 = new QueryDimension("Column2");
     ColumnSchema column1Schema = new ColumnSchema();
     ColumnSchema column2Schema = new ColumnSchema();
     column1Schema.setColumnName("Column1");
     column1Schema.setDataType(DataTypes.DATE);
-    column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
+    ProjectionDimension
+        column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 
1, 1, 1, 1));
 
     column2Schema.setColumnName("Column2");
     column2Schema.setDataType(DataTypes.createDefaultArrayType());
-    column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
+    ProjectionDimension
+        column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 
1, 1, 1, 1));
 
-    QueryDimension[] queryDimensions = { column1, column2 };
+    ProjectionDimension[] queryDimensions = { column1, column2 };
 
     boolean[] dictionaryEncoding = 
CarbonUtil.getComplexDataTypeArray(queryDimensions);
     boolean[] expectedDictionaryEncoding = { false, true };
@@ -805,8 +805,8 @@ public class CarbonUtilTest {
   @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo1() {
     byte[] dataChunks = { 10, 20, 30, 40, 50, 60 };
     byte[] compareValue = { 5 };
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
     int result = CarbonUtil
         .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, 
compareValue, false);
     assertEquals(-2, result);
@@ -815,8 +815,8 @@ public class CarbonUtilTest {
   @Test public void 
testToGetFirstIndexUsingBinarySearchWithCompareToLessThan0() {
     byte[] dataChunks = { 10, 20, 30, 40, 50, 60 };
     byte[] compareValue = { 30 };
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
     int result = CarbonUtil
         .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, 
compareValue, false);
     assertEquals(2, result);
@@ -825,8 +825,8 @@ public class CarbonUtilTest {
   @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo0() {
     byte[] dataChunks = { 10, 10, 10, 40, 50, 60 };
     byte[] compareValue = { 10 };
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
     int result = CarbonUtil
         .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, 
compareValue, false);
     assertEquals(0, result);
@@ -835,8 +835,8 @@ public class CarbonUtilTest {
   @Test public void testToGetFirstIndexUsingBinarySearchWithMatchUpLimitTrue() 
{
     byte[] dataChunks = { 10, 10, 10, 40, 50, 60 };
     byte[] compareValue = { 10 };
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
-        new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+        new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
     int result = CarbonUtil
         .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, 
compareValue, true);
     assertEquals(2, result);
@@ -846,13 +846,13 @@ public class CarbonUtilTest {
   public void testBinaryRangeSearch() {
 
     byte[] dataChunk = new byte[10];
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk;
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk;
     byte[] keyWord = new byte[1];
     int[] range;
 
     dataChunk = "abbcccddddeffgggh".getBytes();
     byte[][] dataArr = new byte[dataChunk.length / 
keyWord.length][keyWord.length];
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) {
@@ -884,7 +884,7 @@ public class CarbonUtilTest {
     assertRangeIndex(dataArr, dataChunk, fixedLengthDimensionDataChunk, 
keyWord, expectRangeIndex);
 
     dataChunk = "ab".getBytes();
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     keyWord[0] = Byte.valueOf("97");
@@ -898,7 +898,7 @@ public class CarbonUtilTest {
     assertEquals(1, range[1]);
 
     dataChunk = "aabb".getBytes();
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     keyWord[0] = Byte.valueOf("97");
@@ -912,7 +912,7 @@ public class CarbonUtilTest {
     assertEquals(3, range[1]);
 
     dataChunk = "a".getBytes();
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     keyWord[0] = Byte.valueOf("97");
@@ -921,7 +921,7 @@ public class CarbonUtilTest {
     assertEquals(0, range[1]);
 
     dataChunk = "aa".getBytes();
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     keyWord[0] = Byte.valueOf("97");
@@ -930,7 +930,7 @@ public class CarbonUtilTest {
     assertEquals(1, range[1]);
 
     dataChunk = "aabbbbbbbbbbcc".getBytes();
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
     keyWord[0] = Byte.valueOf("98");
     range = 
CarbonUtil.getRangeIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 0, 
dataChunk.length - 1, keyWord);
@@ -943,14 +943,14 @@ public class CarbonUtilTest {
   public void IndexUsingBinarySearchLengthTwo() {
 
     byte[] dataChunk = new byte[10];
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk;
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk;
 
     byte[] keyWord = new byte[2];
 
     dataChunk = "aabbbbbbbbbbcc".getBytes();
     byte[][] dataArr = new byte[dataChunk.length / 
keyWord.length][keyWord.length];
 
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) {
@@ -985,14 +985,14 @@ public class CarbonUtilTest {
   public void IndexUsingBinarySearchLengthThree() {
 
     byte[] dataChunk = new byte[10];
-    FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk;
+    FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk;
 
     byte[] keyWord = new byte[3];
 
     dataChunk = "aaabbbbbbbbbccc".getBytes();
     byte[][] dataArr = new byte[dataChunk.length / 
keyWord.length][keyWord.length];
 
-    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionDataChunk(dataChunk, null, null,
+    fixedLengthDimensionDataChunk = new 
FixedLengthDimensionColumnPage(dataChunk, null, null,
         dataChunk.length / keyWord.length, keyWord.length);
 
     for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) {
@@ -1100,7 +1100,7 @@ public class CarbonUtilTest {
   }
 
   private void assertRangeIndex(byte[][] dataArr, byte[] dataChunk,
-      FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk, byte[] 
keyWord, int[] expectRangeIndex) {
+      FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk, byte[] 
keyWord, int[] expectRangeIndex) {
     int[] range;
     range = 
CarbonUtil.getRangeIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 0,
         (dataChunk.length - 1) / keyWord.length, keyWord);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index de64c0a..e506994 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileHolderImpl;
+import org.apache.carbondata.core.datastore.impl.FileReaderImpl;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
@@ -229,13 +229,13 @@ public class DataFileFooterConverterTest {
       }
 
       @SuppressWarnings("unused") @Mock
-      public FileHolder getFileHolder(FileFactory.FileType fileType) {
-        return new FileHolderImpl();
+      public FileReader getFileHolder(FileFactory.FileType fileType) {
+        return new FileReaderImpl();
       }
 
     };
 
-    new MockUp<FileHolderImpl>() {
+    new MockUp<FileReaderImpl>() {
       @SuppressWarnings("unused") @Mock public long readLong(String filePath, 
long offset) {
         return 1;
       }
@@ -249,7 +249,6 @@ public class DataFileFooterConverterTest {
     SegmentInfo segmentInfo = new SegmentInfo();
     int[] arr = { 1, 2, 3 };
     segmentInfo.setColumnCardinality(arr);
-    segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
     dataFileFooter.setNumberOfRows(3);
     dataFileFooter.setSegmentInfo(segmentInfo);
     TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new 
String[0], 1, ColumnarFormatVersion.V1, null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
index 4c9a784..4fb5dcc 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
@@ -36,7 +36,6 @@ import 
org.apache.carbondata.core.scan.expression.logical.RangeExpression;
 import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import 
org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 
 import mockit.Deencapsulation;
@@ -102,7 +101,7 @@ public class RangeFilterProcessorTest {
         new LessThanEqualToExpression(new ColumnExpression("a", 
DataTypes.STRING),
             new LiteralExpression("20", DataTypes.STRING))), new 
TrueExpression(null));
     FilterOptimizer rangeFilterOptimizer =
-        new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+        new RangeFilterOptmizer(inputFilter);
     rangeFilterOptimizer.optimizeFilter();
     result = checkBothTrees(inputFilter, output);
     Assert.assertTrue(result);
@@ -143,7 +142,7 @@ public class RangeFilterProcessorTest {
         new LessThanEqualToExpression(new ColumnExpression("a", 
DataTypes.STRING),
             new LiteralExpression("05", DataTypes.STRING)));
     FilterOptimizer rangeFilterOptimizer =
-        new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+        new RangeFilterOptmizer(inputFilter);
     rangeFilterOptimizer.optimizeFilter();
     result = checkBothTrees(inputFilter, output);
     // no change
@@ -218,7 +217,7 @@ public class RangeFilterProcessorTest {
     Expression Andb3 = new AndExpression(Andb2, new TrueExpression(null));
 
     FilterOptimizer rangeFilterOptimizer =
-        new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+        new RangeFilterOptmizer(inputFilter);
     rangeFilterOptimizer.optimizeFilter();
     result = checkBothTrees(inputFilter, new AndExpression(Andb3, new 
TrueExpression(null)));
     // no change
@@ -302,7 +301,7 @@ public class RangeFilterProcessorTest {
     Expression Orb3 = new OrExpression(Orb2, lessThanb2);
 
     FilterOptimizer rangeFilterOptimizer =
-        new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+        new RangeFilterOptmizer(inputFilter);
     rangeFilterOptimizer.optimizeFilter();
     result = checkBothTrees(inputFilter, new OrExpression(Orb3, lessThanb1));
     // no change

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java 
b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
deleted file mode 100644
index 94c3f68..0000000
--- 
a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.scanner.impl;
-
-import static junit.framework.TestCase.assertEquals;
-
-public class FilterScannerTest {
-//
-//  private static FilterScanner filterScanner;
-//  private static BlockletIndex blockletIndex;
-//  private static BlockletMinMaxIndex blockletMinMaxIndex;
-//  private static BTreeBuilderInfo bTreeBuilderInfo;
-//  private static DataFileFooter dataFileFooter;
-//
-//  @BeforeClass public static void setUp() {
-//    BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
-//    FilterExecuter filterExecutor = new AndFilterExecuterImpl(null, null);
-//    blockExecutionInfo.setFilterExecuterTree(filterExecutor);
-//    blockExecutionInfo.setFixedLengthKeySize(1);
-//    blockExecutionInfo.setNoDictionaryBlockIndexes(new int[] { 1, 2 });
-//    blockExecutionInfo.setDictionaryColumnBlockIndex(new int[] { 1 });
-//    blockExecutionInfo.setColumnGroupToKeyStructureInfo(new HashMap<Integer, 
KeyStructureInfo>());
-//    blockExecutionInfo.setComplexDimensionInfoMap(new HashMap<Integer, 
GenericQueryType>());
-//    blockExecutionInfo.setComplexColumnParentBlockIndexes(new int[] { 1 });
-//    blockExecutionInfo.setQueryDimensions(new QueryDimension[] { new 
QueryDimension("Col1") });
-//    blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[][] { { 
0, 0 } });
-//    blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[][] { { 0, 
0 } });
-//    blockExecutionInfo.setTotalNumberOfMeasureBlock(1);
-//    blockExecutionInfo.setTotalNumberDimensionBlock(1);
-//    QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel();
-//    QueryStatistic queryStatistic = new QueryStatistic();
-//    
queryStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, 
1);
-//    Map<String, QueryStatistic> statisticsTypeAndObjMap = new HashMap<>();
-//    statisticsTypeAndObjMap.put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, 
queryStatistic);
-//    
statisticsTypeAndObjMap.put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, 
queryStatistic);
-//    queryStatisticsModel.setStatisticsTypeAndObjMap(statisticsTypeAndObjMap);
-//    QueryStatisticsRecorder queryStatisticsRecorder = new 
QueryStatisticsRecorderImpl("1");
-//    queryStatisticsModel.setRecorder(queryStatisticsRecorder);
-//    filterScanner = new FilterScanner(blockExecutionInfo, 
queryStatisticsModel);
-//    blockletIndex = new BlockletIndex();
-//    blockletMinMaxIndex = new BlockletMinMaxIndex();
-//    blockletMinMaxIndex.setMinValues(new byte[][] { { 1, 2 } });
-//    blockletMinMaxIndex.setMaxValues(new byte[][] { { 10, 12 } });
-//    blockletIndex.setMinMaxIndex(blockletMinMaxIndex);
-//    dataFileFooter = new DataFileFooter();
-//    dataFileFooter.setBlockletIndex(blockletIndex);
-//    bTreeBuilderInfo = new BTreeBuilderInfo(Arrays.asList(dataFileFooter), 
new int[] { 1 });
-//  }
-//
-//  @Test public void testToScanBlockletWithEmptyBitSet() throws 
QueryExecutionException {
-//    new MockUp<AndFilterExecuterImpl>() {
-//      @SuppressWarnings("unused") @Mock
-//      public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
-//        return new BitSet();
-//      }
-//    };
-//    BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1);
-//    DataRefNode dataRefNode = new BlockBTreeLeafNode(bTreeBuilderInfo, 0, 1);
-//    blocksChunkHolder.setDataBlock(dataRefNode);
-//    AbstractScannedResult abstractScannedResult = 
filterScanner.scanBlocklet(blocksChunkHolder);
-//    assertEquals(0, abstractScannedResult.numberOfOutputRows());
-//  }
-//
-//  @Test public void testToScanBlockletWithNonEmptyBitSet() throws 
QueryExecutionException {
-//    new MockUp<AndFilterExecuterImpl>() {
-//      @SuppressWarnings("unused") @Mock
-//      public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
-//        BitSet bitSet = new BitSet();
-//        bitSet.set(1);
-//        bitSet.set(2);
-//        bitSet.set(1);
-//        return bitSet;
-//      }
-//
-//      @SuppressWarnings("unused") @Mock
-//      public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
-//          throws FilterUnsupportedException {
-//        BitSet bitSet = new BitSet();
-//        bitSet.set(1);
-//        bitSet.set(2);
-//        bitSet.set(1);
-//        return bitSet;
-//      }
-//    };
-//    DataRefNode dataRefNode = new MockUp<DataRefNode>() {
-//      @Mock @SuppressWarnings("unused") DimensionColumnDataChunk[] 
getDimensionChunks(
-//          FileHolder fileReader, int[][] blockIndexes) {
-//        DimensionColumnDataChunk[] dimensionChunkAttributes =
-//            { new ColumnGroupDimensionDataChunk(null, null) };
-//        return dimensionChunkAttributes;
-//      }
-//
-//      @Mock @SuppressWarnings("unused") ColumnPage[] getMeasureChunks(
-//          FileHolder fileReader, int[][] blockIndexes) {
-//
-//        ColumnPage[] ColumnPages = { new ColumnPage() };
-//        return ColumnPages;
-//      }
-//    }.getMockInstance();
-//
-//    BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1);
-//    blocksChunkHolder.setDataBlock(dataRefNode);
-//    DimensionChunkAttributes dimensionChunkAttributes = new 
DimensionChunkAttributes();
-//    DimensionColumnDataChunk dimensionColumnDataChunk =
-//        new FixedLengthDimensionDataChunk(new byte[] { 0, 1 }, 
dimensionChunkAttributes);
-//    blocksChunkHolder.setDimensionRawDataChunk(new DimensionColumnDataChunk[]
-//
-//        { dimensionColumnDataChunk });
-//    ColumnPage ColumnPage = new ColumnPage();
-//    blocksChunkHolder.setMeasureDataChunk(new ColumnPage[]
-//
-//        { ColumnPage });
-//    FileHolder fileHolder = new DFSFileHolderImpl();
-//    blocksChunkHolder.setFileReader(fileHolder);
-//    AbstractScannedResult abstractScannedResult = 
filterScanner.scanBlocklet(blocksChunkHolder);
-//
-//    assertEquals(2, abstractScannedResult.numberOfOutputRows());
-//  }
-//
-//  @Test(expected = QueryExecutionException.class) public void 
testToScanBlockletWithException()
-//      throws QueryExecutionException {
-//    new MockUp<AndFilterExecuterImpl>() {
-//      @SuppressWarnings("unused") @Mock
-//      public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
-//        BitSet bitSet = new BitSet();
-//        bitSet.set(1);
-//        bitSet.set(2);
-//        bitSet.set(1);
-//        return bitSet;
-//      }
-//
-//      @SuppressWarnings("unused") @Mock
-//      public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
-//          throws FilterUnsupportedException {
-//        throw new FilterUnsupportedException("Filter unsupported");
-//      }
-//    };
-//    BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1);
-//    BTreeBuilderInfo bTreeBuilderInfo =
-//        new BTreeBuilderInfo(Arrays.asList(dataFileFooter), new int[] { 1 });
-//    DataRefNode dataRefNode = new BlockBTreeLeafNode(bTreeBuilderInfo, 0, 1);
-//    blocksChunkHolder.setDataBlock(dataRefNode);
-//    filterScanner.scanBlocklet(blocksChunkHolder);
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/dev/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index 1520cd4..b19db85 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -31,7 +31,7 @@
     <Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"/>
   </Match>
   <Match>
-    <Class name="org.apache.carbondata.core.datastore.impl.FileHolderImpl"/>
+    <Class name="org.apache.carbondata.core.datastore.impl.FileReaderImpl"/>
     <Method name="getDataInputStream"/>
     <Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"/>
   </Match>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 76afcbf..7a15327 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -62,7 +62,7 @@ object CarbonSessionExample {
 
     spark.sql(
       s"""
-         | SELECT *
+         | SELECT charField, stringField, intField
          | FROM carbon_table
          | WHERE stringfield = 'spark' AND decimalField > 40
       """.stripMargin).show()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 6f1e123..b1faa6a 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -57,7 +57,6 @@ import 
org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -110,11 +109,11 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   // comma separated list of input segment numbers
   public static final String INPUT_SEGMENT_NUMBERS =
       "mapreduce.input.carboninputformat.segmentnumbers";
-  public static final String VALIDATE_INPUT_SEGMENT_IDs =
+  private static final String VALIDATE_INPUT_SEGMENT_IDs =
       "mapreduce.input.carboninputformat.validsegments";
   // comma separated list of input files
   public static final String INPUT_FILES = 
"mapreduce.input.carboninputformat.files";
-  public static final String ALTER_PARTITION_ID = 
"mapreduce.input.carboninputformat.partitionid";
+  private static final String ALTER_PARTITION_ID = 
"mapreduce.input.carboninputformat.partitionid";
   private static final Log LOG = 
LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
@@ -125,7 +124,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   private static final String DATA_MAP_DSTR = 
"mapreduce.input.carboninputformat.datamapdstr";
   public static final String DATABASE_NAME = 
"mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = 
"mapreduce.input.carboninputformat.tableName";
-  public static final String PARTITIONS_TO_PRUNE =
+  private static final String PARTITIONS_TO_PRUNE =
       "mapreduce.input.carboninputformat.partitions.to.prune";
   public static final String UPADTE_T =
       "mapreduce.input.carboninputformat.partitions.to.prune";
@@ -307,7 +306,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   /**
    * get list of partitions to prune
    */
-  public static List<String> getPartitionsToPrune(Configuration configuration) 
throws IOException {
+  private static List<String> getPartitionsToPrune(Configuration 
configuration) throws IOException {
     String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
     if (partitionString != null) {
       return (List<String>) 
ObjectSerializationUtil.convertStringToObject(partitionString);
@@ -335,7 +334,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @return List<InputSplit> list of CarbonInputSplit
    * @throws IOException
    */
-  @Override public List<InputSplit> getSplits(JobContext job) throws 
IOException {
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = 
getAbsoluteTableIdentifier(job.getConfiguration());
     SegmentUpdateStatusManager updateStatusManager = new 
SegmentUpdateStatusManager(identifier);
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -789,28 +789,29 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     return split;
   }
 
-  @Override public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit,
+  @Override
+  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
     CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
     return new CarbonRecordReader<T>(queryModel, readSupport);
   }
 
-  public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
     TableProvider tableProvider = new SingleTableProvider(carbonTable);
-    // getting the table absoluteTableIdentifier from the carbonTable
-    // to avoid unnecessary deserialization
-    AbsoluteTableIdentifier identifier = 
carbonTable.getAbsoluteTableIdentifier();
 
     // query plan includes projection column
-    String projection = getColumnProjection(configuration);
-    CarbonQueryPlan queryPlan = 
CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
-    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, 
carbonTable,
-        getDataTypeConverter(configuration));
+    String projectionString = getColumnProjection(configuration);
+    String[] projectionColumnNames = null;
+    if (projectionString != null) {
+      projectionColumnNames = projectionString.split(",");
+    }
+    QueryModel queryModel = carbonTable.createQueryWithProjection(
+        projectionColumnNames, getDataTypeConverter(configuration));
 
     // set the filter to the query model in order to filter blocklet before 
scan
     Expression filter = getFilterPredicates(configuration);
@@ -865,7 +866,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     return readSupport;
   }
 
-  @Override protected boolean isSplitable(JobContext context, Path filename) {
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
     try {
       // Don't split the file if it is local file system
       FileSystem fileSystem = 
filename.getFileSystem(context.getConfiguration());
@@ -879,19 +881,9 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   }
 
   /**
-   * required to be moved to core
-   *
-   * @return updateExtension
-   */
-  private String getUpdateExtension() {
-    // TODO: required to modify when supporting update, mostly will be update 
timestamp
-    return "update";
-  }
-
-  /**
    * return valid segment to access
    */
-  public String[] getSegmentsToAccess(JobContext job) {
+  private String[] getSegmentsToAccess(JobContext job) {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, 
"");
     if (segmentString.trim().isEmpty()) {
       return new String[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index a590a5b..0fe0cbf 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -176,9 +176,7 @@ class InMemoryBTreeIndex implements Index {
         filterredBlocks = filterExpressionProcessor.getFilterredBlocks(
             abstractIndex.getDataRefNode(),
             resolver,
-            abstractIndex,
-            identifier
-        );
+            abstractIndex);
       }
       resultFilterredBlocks.addAll(filterredBlocks);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 19626f0..e7c6dda 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -152,7 +152,7 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
     hadoopConf = context.getConfiguration();
     if (model == null) {
       CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-      model = format.getQueryModel(split, context);
+      model = format.createQueryModel(split, context);
     }
     carbonTable = model.getTable();
     List<CarbonDimension> dimensions =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
index 89a4a9a..2f28861 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
@@ -67,7 +67,7 @@ public class BlockLevelTraverser {
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);
       blockName = blockName + CarbonTablePath.getCarbonDataExtension();
 
-      long rowCount = currentBlock.nodeSize();
+      long rowCount = currentBlock.numRows();
 
       String key = CarbonUpdateUtil.getSegmentBlockNameKey(segId, blockName);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 056c27b..9f8c5ec 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -26,18 +26,12 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
@@ -52,45 +46,14 @@ import 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  */
 public class CarbonInputFormatUtil {
 
-  public static CarbonQueryPlan createQueryPlan(CarbonTable carbonTable, 
String columnString) {
-    String[] columns = null;
-    if (columnString != null) {
-      columns = columnString.split(",");
-    }
-    String factTableName = carbonTable.getTableName();
-    CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), 
factTableName);
-    // fill dimensions
-    // If columns are null, set all dimensions and measures
-    int i = 0;
-    if (columns != null) {
-      for (String column : columns) {
-        CarbonDimension dimensionByName = 
carbonTable.getDimensionByName(factTableName, column);
-        if (dimensionByName != null) {
-          addQueryDimension(plan, i, dimensionByName);
-          i++;
-        } else {
-          CarbonMeasure measure = carbonTable.getMeasureByName(factTableName, 
column);
-          if (measure == null) {
-            throw new RuntimeException(column + " column not found in the 
table " + factTableName);
-          }
-          addQueryMeasure(plan, i, measure);
-          i++;
-        }
-      }
-    }
-
-    plan.setQueryId(System.nanoTime() + "");
-    return plan;
-  }
-
   public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
     CarbonTableInputFormat<V> carbonInputFormat = new 
CarbonTableInputFormat<>();
-    carbonInputFormat.setDatabaseName(job.getConfiguration(),
-        identifier.getCarbonTableIdentifier().getDatabaseName());
-    carbonInputFormat
-        .setTableName(job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
+    CarbonTableInputFormat.setDatabaseName(
+        job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat.setTableName(
+        job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
     return carbonInputFormat;
   }
@@ -98,30 +61,16 @@ public class CarbonInputFormatUtil {
   public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
       AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) 
throws IOException {
     CarbonTableInputFormat<V> carbonTableInputFormat = new 
CarbonTableInputFormat<>();
-    carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), 
partitionId);
-    carbonTableInputFormat.setDatabaseName(job.getConfiguration(),
-        identifier.getCarbonTableIdentifier().getDatabaseName());
-    carbonTableInputFormat
-        .setTableName(job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
+    CarbonTableInputFormat.setPartitionIdList(
+        job.getConfiguration(), partitionId);
+    CarbonTableInputFormat.setDatabaseName(
+        job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat.setTableName(
+        job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
     return carbonTableInputFormat;
   }
 
-  private static void addQueryMeasure(CarbonQueryPlan plan, int order, 
CarbonMeasure measure) {
-    QueryMeasure queryMeasure = new QueryMeasure(measure.getColName());
-    queryMeasure.setQueryOrder(order);
-    queryMeasure.setMeasure(measure);
-    plan.addMeasure(queryMeasure);
-  }
-
-  private static void addQueryDimension(CarbonQueryPlan plan, int order,
-      CarbonDimension dimension) {
-    QueryDimension queryDimension = new QueryDimension(dimension.getColName());
-    queryDimension.setQueryOrder(order);
-    queryDimension.setDimension(dimension);
-    plan.addDimension(queryDimension);
-  }
-
   public static void processFilterExpression(Expression filterExpression, 
CarbonTable carbonTable,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     QueryModel.processFilterExpression(carbonTable, filterExpression, 
isFilterDimensions,
@@ -130,7 +79,7 @@ public class CarbonInputFormatUtil {
     if (null != filterExpression) {
       // Optimize Filter Expression and fit RANGE filters is conditions apply.
       FilterOptimizer rangeFilterOptimizer =
-          new RangeFilterOptmizer(new FilterOptimizerBasic(), 
filterExpression);
+          new RangeFilterOptmizer(filterExpression);
       rangeFilterOptimizer.optimizeFilter();
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index f109e1c..1b57f93 100644
--- 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -140,11 +139,11 @@ public class MapredCarbonInputFormat extends 
CarbonTableInputFormat<ArrayWritabl
 
     AbsoluteTableIdentifier identifier = 
carbonTable.getAbsoluteTableIdentifier();
 
-    String projection = getProjection(configuration, carbonTable,
+    String projectionString = getProjection(configuration, carbonTable,
         identifier.getCarbonTableIdentifier().getTableName());
-    CarbonQueryPlan queryPlan = 
CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
-    QueryModel queryModel =
-        QueryModel.createModel(identifier, queryPlan, carbonTable, new 
DataTypeConverterImpl());
+    String[] projectionColumns = projectionString.split(",");
+    QueryModel queryModel = carbonTable.createQueryWithProjection(
+        projectionColumns, new DataTypeConverterImpl());
     // set the filter to the query model in order to filter blocklet before 
scan
     Expression filter = getFilterPredicates(configuration);
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
deleted file mode 100644
index 9a8f8c5..0000000
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.datatype.StructField;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import 
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.AbstractRecordReader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * A specialized RecordReader that reads into InternalRows or ColumnarBatches 
directly using the
- * carbondata column APIs and fills the data directly into columns.
- */
-class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
-
-  private int batchIdx = 0;
-
-  private int numBatched = 0;
-
-  private CarbonVectorBatch columnarBatch;
-
-  private CarbonColumnarBatch carbonColumnarBatch;
-
-  /**
-   * If true, this class returns batches instead of rows.
-   */
-  private boolean returnColumnarBatch;
-
-  private QueryModel queryModel;
-
-  private AbstractDetailQueryResultIterator iterator;
-
-  private QueryExecutor queryExecutor;
-
-  public CarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel 
queryModel, AbstractDetailQueryResultIterator iterator) {
-    this.queryModel = queryModel;
-    this.iterator = iterator;
-    this.queryExecutor = queryExecutor;
-    enableReturningBatches();
-  }
-
-  /**
-   * Implementation of RecordReader API.
-   */
-  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
-      throws IOException, InterruptedException, UnsupportedOperationException {
-    // The input split can contain single HDFS block or multiple blocks, so 
firstly get all the
-    // blocks and then set them in the query model.
-    List<CarbonInputSplit> splitList;
-    if (inputSplit instanceof CarbonInputSplit) {
-      splitList = new ArrayList<>(1);
-      splitList.add((CarbonInputSplit) inputSplit);
-    } else if (inputSplit instanceof CarbonMultiBlockSplit) {
-      // contains multiple blocks, this is an optimization for concurrent 
query.
-      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) 
inputSplit;
-      splitList = multiBlockSplit.getAllSplits();
-    } else {
-      throw new RuntimeException("unsupported input split type: " + 
inputSplit);
-    }
-    List<TableBlockInfo> tableBlockInfoList = 
CarbonInputSplit.createBlocks(splitList);
-    queryModel.setTableBlockInfos(tableBlockInfoList);
-    queryModel.setVectorReader(true);
-    try {
-      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
-      iterator = (AbstractDetailQueryResultIterator) 
queryExecutor.execute(queryModel);
-    } catch (QueryExecutionException e) {
-      throw new InterruptedException(e.getMessage());
-    }
-  }
-
-  @Override public void close() throws IOException {
-    logStatistics(rowCount, queryModel.getStatisticsRecorder());
-    if (columnarBatch != null) {
-      columnarBatch = null;
-    }
-    // clear dictionary cache
-    Map<String, Dictionary> columnToDictionaryMapping = 
queryModel.getColumnToDictionaryMapping();
-    if (null != columnToDictionaryMapping) {
-      for (Map.Entry<String, Dictionary> entry : 
columnToDictionaryMapping.entrySet()) {
-        CarbonUtil.clearDictionaryCache(entry.getValue());
-      }
-    }
-    try {
-      queryExecutor.finish();
-    } catch (QueryExecutionException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
-    resultBatch();
-
-    if (returnColumnarBatch) return nextBatch();
-
-    if (batchIdx >= numBatched) {
-      if (!nextBatch()) return false;
-    }
-    ++batchIdx;
-    return true;
-  }
-
-  @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
-    if (returnColumnarBatch) {
-      rowCount += columnarBatch.numValidRows();
-      return columnarBatch;
-    } else {
-      return null;
-    }
-  }
-
-  @Override public Void getCurrentKey() throws IOException, 
InterruptedException {
-    return null;
-  }
-
-  @Override public float getProgress() throws IOException, 
InterruptedException {
-    // TODO : Implement it based on total number of rows it is going to 
retrive.
-    return 0;
-  }
-
-  /**
-   * Returns the ColumnarBatch object that will be used for all rows returned 
by this reader.
-   * This object is reused. Calling this enables the vectorized reader. This 
should be called
-   * before any calls to nextKeyValue/nextBatch.
-   */
-
-  private void initBatch() {
-    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
-    List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
-    StructField[] fields = new StructField[queryDimension.size() + 
queryMeasures.size()];
-    for (int i = 0; i < queryDimension.size(); i++) {
-      QueryDimension dim = queryDimension.get(i);
-      if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        DirectDictionaryGenerator generator = 
DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-           generator.getReturnType());
-      } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            dim.getDimension().getDataType());
-      } else if (dim.getDimension().isComplex()) {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-           dim.getDimension().getDataType());
-      } else {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            DataTypes.INT);
-      }
-    }
-
-    for (QueryMeasure msr : queryMeasures) {
-      DataType dataType = msr.getMeasure().getDataType();
-      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || 
dataType == DataTypes.INT
-          || dataType == DataTypes.LONG) {
-        fields[msr.getQueryOrder()] =
-            new StructField(msr.getColumnName(), 
msr.getMeasure().getDataType());
-      } else if (DataTypes.isDecimal(dataType)) {
-        fields[msr.getQueryOrder()] =
-            new StructField(msr.getColumnName(), 
msr.getMeasure().getDataType());
-      } else {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), 
DataTypes.DOUBLE);
-      }
-    }
-
-    columnarBatch = CarbonVectorBatch.allocate(fields);
-    CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
-    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
-    for (int i = 0; i < fields.length; i++) {
-      vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), 
filteredRows);
-    }
-    carbonColumnarBatch = new CarbonColumnarBatch(vectors, 
columnarBatch.capacity(), filteredRows);
-  }
-
-
-  private CarbonVectorBatch resultBatch() {
-    if (columnarBatch == null) initBatch();
-    return columnarBatch;
-  }
-
-  /*
-   * Can be called before any rows are returned to enable returning columnar 
batches directly.
-   */
-  private void enableReturningBatches() {
-    returnColumnarBatch = true;
-  }
-
-  /**
-   * Advances to the next batch of rows. Returns false if there are no more.
-   */
-  private boolean nextBatch() {
-    columnarBatch.reset();
-    carbonColumnarBatch.reset();
-    if (iterator.hasNext()) {
-      iterator.processNextBatch(carbonColumnarBatch);
-      int actualSize = carbonColumnarBatch.getActualSize();
-      columnarBatch.setNumRows(actualSize);
-      numBatched = actualSize;
-      batchIdx = 0;
-      return true;
-    }
-    return false;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 1679f29..5f1f90a 100644
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -54,7 +54,7 @@ class CarbondataPageSource implements ConnectorPageSource {
   private final List<Type> types;
   private final PageBuilder pageBuilder;
   private boolean closed;
-  private CarbonVectorizedRecordReader vectorReader;
+  private PrestoCarbonVectorizedRecordReader vectorReader;
   private CarbonDictionaryDecodeReadSupport<Object[]> readSupport;
   private long sizeOfData = 0;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index c614fa9..5772fbf 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -54,7 +54,7 @@ public class CarbondataRecordCursor implements RecordCursor {
   private CarbondataSplit split;
   private CarbonDictionaryDecodeReadSupport readSupport;
   private Tuple3<DataType, Dictionary, Int>[] dictionary;
-  CarbonVectorizedRecordReader vectorizedRecordReader;
+  PrestoCarbonVectorizedRecordReader vectorizedRecordReader;
 
   private long totalBytes;
   private long nanoStart;
@@ -63,7 +63,7 @@ public class CarbondataRecordCursor implements RecordCursor {
 
 
   public CarbondataRecordCursor(CarbonDictionaryDecodeReadSupport readSupport,
-       CarbonVectorizedRecordReader vectorizedRecordReader,
+       PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
       List<CarbondataColumnHandle> columnHandles,
       CarbondataSplit split) {
     this.vectorizedRecordReader = vectorizedRecordReader;
@@ -194,7 +194,7 @@ public class CarbondataRecordCursor implements RecordCursor 
{
     //todo  delete cache from readSupport
   }
 
-  public CarbonVectorizedRecordReader getVectorizedRecordReader() {
+  public PrestoCarbonVectorizedRecordReader getVectorizedRecordReader() {
     return vectorizedRecordReader;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index 0f8fe87..286ff0e 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -78,8 +78,8 @@ public class CarbondataRecordSet implements RecordSet {
       readSupport
           .initialize(queryModel.getProjectionColumns(), 
queryModel.getTable());
       CarbonIterator iterator = queryExecutor.execute(queryModel);
-      CarbonVectorizedRecordReader vectorReader =
-          new CarbonVectorizedRecordReader(queryExecutor, queryModel,
+      PrestoCarbonVectorizedRecordReader vectorReader =
+          new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
               (AbstractDetailQueryResultIterator) iterator);
       return new CarbondataRecordCursor(readSupport, vectorReader, columns, 
split);
     } catch (QueryExecutionException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f039daf..5a2f831 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -105,7 +105,7 @@ public class CarbondataRecordSetProvider implements 
ConnectorRecordSetProvider {
           new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, 
TaskType.MAP, 0, 0));
       CarbonInputSplit carbonInputSplit =
           
CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
-      queryModel = carbonTableInputFormat.getQueryModel(carbonInputSplit, 
hadoopAttemptContext);
+      queryModel = carbonTableInputFormat.createQueryModel(carbonInputSplit, 
hadoopAttemptContext);
       queryModel.setVectorReader(true);
     } catch (IOException e) {
       throw new RuntimeException("Unable to get the Query Model ", e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
new file mode 100644
index 0000000..a1907db
--- /dev/null
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import 
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches 
directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
+
+  private int batchIdx = 0;
+
+  private int numBatched = 0;
+
+  private CarbonVectorBatch columnarBatch;
+
+  private CarbonColumnarBatch carbonColumnarBatch;
+
+  /**
+   * If true, this class returns batches instead of rows.
+   */
+  private boolean returnColumnarBatch;
+
+  private QueryModel queryModel;
+
+  private AbstractDetailQueryResultIterator iterator;
+
+  private QueryExecutor queryExecutor;
+
+  public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, 
QueryModel queryModel, AbstractDetailQueryResultIterator iterator) {
+    this.queryModel = queryModel;
+    this.iterator = iterator;
+    this.queryExecutor = queryExecutor;
+    enableReturningBatches();
+  }
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+      throws IOException, InterruptedException, UnsupportedOperationException {
+    // The input split can contain single HDFS block or multiple blocks, so 
firstly get all the
+    // blocks and then set them in the query model.
+    List<CarbonInputSplit> splitList;
+    if (inputSplit instanceof CarbonInputSplit) {
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonInputSplit) inputSplit);
+    } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+      // contains multiple blocks, this is an optimization for concurrent 
query.
+      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) 
inputSplit;
+      splitList = multiBlockSplit.getAllSplits();
+    } else {
+      throw new RuntimeException("unsupported input split type: " + 
inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = 
CarbonInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+    queryModel.setVectorReader(true);
+    try {
+      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      iterator = (AbstractDetailQueryResultIterator) 
queryExecutor.execute(queryModel);
+    } catch (QueryExecutionException e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+
+  @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
+    if (columnarBatch != null) {
+      columnarBatch = null;
+    }
+    // clear dictionary cache
+    Map<String, Dictionary> columnToDictionaryMapping = 
queryModel.getColumnToDictionaryMapping();
+    if (null != columnToDictionaryMapping) {
+      for (Map.Entry<String, Dictionary> entry : 
columnToDictionaryMapping.entrySet()) {
+        CarbonUtil.clearDictionaryCache(entry.getValue());
+      }
+    }
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
+    resultBatch();
+
+    if (returnColumnarBatch) return nextBatch();
+
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) return false;
+    }
+    ++batchIdx;
+    return true;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
+    if (returnColumnarBatch) {
+      rowCount += columnarBatch.numValidRows();
+      return columnarBatch;
+    } else {
+      return null;
+    }
+  }
+
+  @Override public Void getCurrentKey() throws IOException, 
InterruptedException {
+    return null;
+  }
+
+  @Override public float getProgress() throws IOException, 
InterruptedException {
+    // TODO : Implement it based on total number of rows it is going to 
retrive.
+    return 0;
+  }
+
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned 
by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This 
should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+
+  private void initBatch() {
+    List<ProjectionDimension> queryDimension = 
queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+    StructField[] fields = new StructField[queryDimension.size() + 
queryMeasures.size()];
+    for (int i = 0; i < queryDimension.size(); i++) {
+      ProjectionDimension dim = queryDimension.get(i);
+      if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator generator = 
DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+           generator.getReturnType());
+      } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+            dim.getDimension().getDataType());
+      } else if (dim.getDimension().isComplex()) {
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+           dim.getDimension().getDataType());
+      } else {
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+            DataTypes.INT);
+      }
+    }
+
+    for (ProjectionMeasure msr : queryMeasures) {
+      DataType dataType = msr.getMeasure().getDataType();
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || 
dataType == DataTypes.INT
+          || dataType == DataTypes.LONG) {
+        fields[msr.getOrdinal()] =
+            new StructField(msr.getColumnName(), 
msr.getMeasure().getDataType());
+      } else if (DataTypes.isDecimal(dataType)) {
+        fields[msr.getOrdinal()] =
+            new StructField(msr.getColumnName(), 
msr.getMeasure().getDataType());
+      } else {
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), 
DataTypes.DOUBLE);
+      }
+    }
+
+    columnarBatch = CarbonVectorBatch.allocate(fields);
+    CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), 
filteredRows);
+    }
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, 
columnarBatch.capacity(), filteredRows);
+  }
+
+
+  private CarbonVectorBatch resultBatch() {
+    if (columnarBatch == null) initBatch();
+    return columnarBatch;
+  }
+
+  /*
+   * Can be called before any rows are returned to enable returning columnar 
batches directly.
+   */
+  private void enableReturningBatches() {
+    returnColumnarBatch = true;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  private boolean nextBatch() {
+    columnarBatch.reset();
+    carbonColumnarBatch.reset();
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(carbonColumnarBatch);
+      int actualSize = carbonColumnarBatch.getActualSize();
+      columnarBatch.setNumRows(actualSize);
+      numBatched = actualSize;
+      batchIdx = 0;
+      return true;
+    }
+    return false;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index a517bf4..7eff227 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -161,7 +161,8 @@ class CarbonMergerRDD[K, V](
         LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
         exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
-          carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
+          carbonTable, dataFileMetadataSegMapping, restructuredBlockExists,
+          new SparkDataTypeConverterImpl)
 
         // fire a query and get the results.
         var result2: java.util.List[RawResultIterator] = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e554a58..3dffbf6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -335,7 +335,7 @@ class CarbonScanRDD(
     TaskMetricsMap.getInstance().registerThreadCallback()
     inputMetricsStats.initBytesReadCallback(context, inputSplit)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
-      val model = format.getQueryModel(inputSplit, attemptContext)
+      val model = format.createQueryModel(inputSplit, attemptContext)
       // get RecordReader by FileFormat
       val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
         case FileFormat.ROW_V1 =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 7d42130..432d50a 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -40,7 +40,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private DataType blockDataType;
 
-  public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] 
filteredRows) {
+  ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
     this.columnVector = columnVector;
     this.filteredRows = filteredRows;
     this.dataType = 
CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 5d927df..73da878 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import 
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -100,7 +100,8 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
   /**
    * Implementation of RecordReader API.
    */
-  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
       throws IOException, InterruptedException, UnsupportedOperationException {
     // The input split can contain single HDFS block or multiple blocks, so 
firstly get all the
     // blocks and then set them in the query model.
@@ -145,7 +146,8 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
     }
   }
 
-  @Override public void close() throws IOException {
+  @Override
+  public void close() throws IOException {
     logStatistics(rowCount, queryModel.getStatisticsRecorder());
     if (columnarBatch != null) {
       columnarBatch.close();
@@ -165,10 +167,13 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
     }
   }
 
-  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
     resultBatch();
 
-    if (returnColumnarBatch) return nextBatch();
+    if (returnColumnarBatch) {
+      return nextBatch();
+    }
 
     if (batchIdx >= numBatched) {
       if (!nextBatch()) return false;
@@ -177,7 +182,8 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
     return true;
   }
 
-  @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
+  @Override
+  public Object getCurrentValue() throws IOException, InterruptedException {
     if (returnColumnarBatch) {
       int value = columnarBatch.numValidRows();
       rowCount += value;
@@ -190,11 +196,13 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
     return columnarBatch.getRow(batchIdx - 1);
   }
 
-  @Override public Void getCurrentKey() throws IOException, 
InterruptedException {
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
     return null;
   }
 
-  @Override public float getProgress() throws IOException, 
InterruptedException {
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
     // TODO : Implement it based on total number of rows it is going to 
retrive.
     return 0;
   }
@@ -206,44 +214,44 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
    */
 
   private void initBatch(MemoryMode memMode) {
-    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
-    List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
+    List<ProjectionDimension> queryDimension = 
queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
     StructField[] fields = new StructField[queryDimension.size() + 
queryMeasures.size()];
     for (int i = 0; i < queryDimension.size(); i++) {
-      QueryDimension dim = queryDimension.get(i);
+      ProjectionDimension dim = queryDimension.get(i);
       if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         DirectDictionaryGenerator generator = 
DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(dim.getDimension().getDataType());
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             
CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, 
null);
       } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             
CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), 
true,
             null);
       } else if (dim.getDimension().isComplex()) {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             
CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), 
true,
             null);
       } else {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.INT), true, 
null);
       }
     }
 
     for (int i = 0; i < queryMeasures.size(); i++) {
-      QueryMeasure msr = queryMeasures.get(i);
+      ProjectionMeasure msr = queryMeasures.get(i);
       DataType dataType = msr.getMeasure().getDataType();
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
           dataType == DataTypes.INT || dataType == DataTypes.LONG) {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             
CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), 
true,
             null);
       } else if (DataTypes.isDecimal(dataType)) {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             new DecimalType(msr.getMeasure().getPrecision(), 
msr.getMeasure().getScale()), true,
             null);
       } else {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), 
true, null);
       }
     }
@@ -261,9 +269,8 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
     initBatch(DEFAULT_MEMORY_MODE);
   }
 
-  private ColumnarBatch resultBatch() {
+  private void resultBatch() {
     if (columnarBatch == null) initBatch();
-    return columnarBatch;
   }
 
   /*

Reply via email to