http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 0000000..fde4e55
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator<List<Object[]>> {
+
+  /**
+   * iterator which will be used to iterate over blocklets
+   */
+  private BlockletIterator blockletIterator;
+
+  /**
+   * result collector which will be used to aggregate the scanned result
+   */
+  private ScannedResultCollector scannerResultAggregator;
+
+  /**
+   * processor which will be used to process the block processing can be
+   * filter processing or non filter processing
+   */
+  private BlockletScanner blockletScanner;
+
+  /**
+   * batch size of result
+   */
+  private int batchSize;
+
+  private ExecutorService executorService;
+
+  private Future<BlockletScannedResult> future;
+
+  private Future<RawBlockletColumnChunks> futureIo;
+
+  private BlockletScannedResult scannedResult;
+
+  private BlockExecutionInfo blockExecutionInfo;
+
+  private FileReader fileReader;
+
+  private AtomicBoolean nextBlock;
+
+  private AtomicBoolean nextRead;
+
+  public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader 
fileReader,
+      int batchSize, QueryStatisticsModel queryStatisticsModel, 
ExecutorService executorService) {
+    this.blockExecutionInfo = blockExecutionInfo;
+    this.fileReader = fileReader;
+    blockletIterator = new 
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+        blockExecutionInfo.getNumberOfBlockToScan());
+    if (blockExecutionInfo.getFilterExecuterTree() != null) {
+      blockletScanner = new BlockletFilterScanner(blockExecutionInfo, 
queryStatisticsModel);
+    } else {
+      blockletScanner = new BlockletFullScanner(blockExecutionInfo, 
queryStatisticsModel);
+    }
+    this.scannerResultAggregator =
+        ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+    this.batchSize = batchSize;
+    this.executorService = executorService;
+    this.nextBlock = new AtomicBoolean(false);
+    this.nextRead = new AtomicBoolean(false);
+  }
+
+  @Override
+  public List<Object[]> next() {
+    List<Object[]> collectedResult = null;
+    if (updateScanner()) {
+      collectedResult = 
this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize);
+      while (collectedResult.size() < batchSize && updateScanner()) {
+        List<Object[]> data = this.scannerResultAggregator
+            .collectResultInRow(scannedResult, batchSize - 
collectedResult.size());
+        collectedResult.addAll(data);
+      }
+    } else {
+      collectedResult = new ArrayList<>();
+    }
+    return collectedResult;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (scannedResult != null && scannedResult.hasNext()) {
+      return true;
+    } else {
+      if (null != scannedResult) {
+        scannedResult.freeMemory();
+      }
+      return blockletIterator.hasNext() || nextBlock.get() || nextRead.get();
+    }
+  }
+
+  /**
+   * Return true if scan result if non-empty
+   */
+  private boolean updateScanner() {
+    try {
+      if (scannedResult != null && scannedResult.hasNext()) {
+        return true;
+      } else {
+        scannedResult = processNextBlocklet();
+        while (scannedResult != null) {
+          if (scannedResult.hasNext()) {
+            return true;
+          }
+          scannedResult = processNextBlocklet();
+        }
+        nextBlock.set(false);
+        nextRead.set(false);
+        return false;
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private BlockletScannedResult processNextBlocklet() throws Exception {
+    BlockletScannedResult result = null;
+    if (blockExecutionInfo.isPrefetchBlocklet()) {
+      if (blockletIterator.hasNext() || nextBlock.get() || nextRead.get()) {
+        if (future == null) {
+          future = scanNextBlockletAsync();
+        }
+        result = future.get();
+        nextBlock.set(false);
+        if (blockletIterator.hasNext() || nextRead.get()) {
+          nextBlock.set(true);
+          future = scanNextBlockletAsync();
+        }
+      }
+    } else {
+      if (blockletIterator.hasNext()) {
+        RawBlockletColumnChunks rawChunks = readNextBlockletColumnChunks();
+        if (rawChunks != null) {
+          result = blockletScanner.scanBlocklet(rawChunks);
+        }
+      }
+    }
+    return result;
+  }
+
+  private RawBlockletColumnChunks readNextBlockletColumnChunks() throws 
IOException {
+    RawBlockletColumnChunks rawBlockletColumnChunks = 
getNextBlockletColumnChunks();
+    if (rawBlockletColumnChunks != null) {
+      blockletScanner.readBlocklet(rawBlockletColumnChunks);
+      return rawBlockletColumnChunks;
+    }
+    return null;
+  }
+
+  private RawBlockletColumnChunks getNextBlockletColumnChunks() {
+    RawBlockletColumnChunks rawBlockletColumnChunks = null;
+    do {
+      DataRefNode dataBlock = blockletIterator.next();
+      if (dataBlock.getColumnsMaxValue() == null || 
blockletScanner.isScanRequired(dataBlock)) {
+        rawBlockletColumnChunks =  RawBlockletColumnChunks.newInstance(
+            blockExecutionInfo.getTotalNumberDimensionToRead(),
+            blockExecutionInfo.getTotalNumberOfMeasureToRead(), fileReader, 
dataBlock);
+      }
+    } while (rawBlockletColumnChunks == null && blockletIterator.hasNext());
+    return rawBlockletColumnChunks;
+  }
+
+  private Future<BlockletScannedResult> scanNextBlockletAsync() {
+    return executorService.submit(new Callable<BlockletScannedResult>() {
+      @Override public BlockletScannedResult call() throws Exception {
+        if (futureIo == null) {
+          futureIo = readNextBlockletAsync();
+        }
+        RawBlockletColumnChunks rawBlockletColumnChunks = futureIo.get();
+        futureIo = null;
+        nextRead.set(false);
+        if (rawBlockletColumnChunks != null) {
+          if (blockletIterator.hasNext()) {
+            nextRead.set(true);
+            futureIo = readNextBlockletAsync();
+          }
+          return blockletScanner.scanBlocklet(rawBlockletColumnChunks);
+        }
+        return null;
+      }
+    });
+  }
+
+  private Future<RawBlockletColumnChunks> readNextBlockletAsync() {
+    return executorService.submit(new Callable<RawBlockletColumnChunks>() {
+      @Override public RawBlockletColumnChunks call() throws Exception {
+        try {
+          TaskMetricsMap.getInstance().registerThreadCallback();
+          if (blockletIterator.hasNext()) {
+            return readNextBlockletColumnChunks();
+          } else {
+            return null;
+          }
+        } finally {
+          // update read bytes metrics for this thread
+          
TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId());
+        }
+      }
+    });
+  }
+
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    if (updateScanner()) {
+      this.scannerResultAggregator.collectResultInColumnarBatch(scannedResult, 
columnarBatch);
+    }
+  }
+
+
+  /**
+   * Close the resources
+   */
+  public void close() {
+    // free the current scanned result
+    if (null != scannedResult && !scannedResult.hasNext()) {
+      scannedResult.freeMemory();
+    }
+    // free any pre-fetched memory if present
+    if (null != future) {
+      try {
+        BlockletScannedResult blockletScannedResult = future.get();
+        if (blockletScannedResult != null) {
+          blockletScannedResult.freeMemory();
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
new file mode 100644
index 0000000..6b7e880
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * Contains dimension and measure raw column chunks of one blocklet
+ */
+public class RawBlockletColumnChunks {
+
+  /**
+   * dimension column data chunk
+   */
+  private DimensionRawColumnChunk[] dimensionRawColumnChunks;
+
+  /**
+   * measure column data chunk
+   */
+  private MeasureRawColumnChunk[] measureRawColumnChunks;
+
+  /**
+   * file reader which will use to read the block from file
+   */
+  private FileReader fileReader;
+
+  /**
+   * data block
+   */
+  private DataRefNode dataBlock;
+
+  private BitSetGroup bitSetGroup;
+
+  private RawBlockletColumnChunks() { }
+
+  public static RawBlockletColumnChunks newInstance(int numberOfDimensionChunk,
+      int numberOfMeasureChunk, FileReader fileReader, DataRefNode dataBlock) {
+    RawBlockletColumnChunks instance = new RawBlockletColumnChunks();
+    instance.dimensionRawColumnChunks = new 
DimensionRawColumnChunk[numberOfDimensionChunk];
+    instance.measureRawColumnChunks = new 
MeasureRawColumnChunk[numberOfMeasureChunk];
+    instance.fileReader = fileReader;
+    instance.dataBlock = dataBlock;
+    return instance;
+  }
+
+  /**
+   * @return the dimensionRawColumnChunks
+   */
+  public DimensionRawColumnChunk[] getDimensionRawColumnChunks() {
+    return dimensionRawColumnChunks;
+  }
+
+  /**
+   * @param dimensionRawColumnChunks the dimensionRawColumnChunks to set
+   */
+  public void setDimensionRawColumnChunks(DimensionRawColumnChunk[] 
dimensionRawColumnChunks) {
+    this.dimensionRawColumnChunks = dimensionRawColumnChunks;
+  }
+
+  /**
+   * @return the measureRawColumnChunks
+   */
+  public MeasureRawColumnChunk[] getMeasureRawColumnChunks() {
+    return measureRawColumnChunks;
+  }
+
+  /**
+   * @param measureRawColumnChunks the measureRawColumnChunks to set
+   */
+  public void setMeasureRawColumnChunks(MeasureRawColumnChunk[] 
measureRawColumnChunks) {
+    this.measureRawColumnChunks = measureRawColumnChunks;
+  }
+
+  /**
+   * @return the fileReader
+   */
+  public FileReader getFileReader() {
+    return fileReader;
+  }
+
+  /**
+   * @return the dataBlock
+   */
+  public DataRefNode getDataBlock() {
+    return dataBlock;
+  }
+
+  public BitSetGroup getBitSetGroup() {
+    return bitSetGroup;
+  }
+
+  public void setBitSetGroup(BitSetGroup bitSetGroup) {
+    this.bitSetGroup = bitSetGroup;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
deleted file mode 100644
index 1c97725..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.processor.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-
-/**
- * Below class will be used to process the block for detail query
- */
-public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
-  /**
-   * DataBlockIteratorImpl Constructor
-   *
-   * @param blockExecutionInfo execution information
-   */
-  public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, 
FileHolder fileReader,
-      int batchSize, QueryStatisticsModel queryStatisticsModel, 
ExecutorService executorService) {
-    super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, 
executorService);
-  }
-
-  /**
-   * It scans the block and returns the result with @batchSize
-   *
-   * @return Result of @batchSize
-   */
-  public List<Object[]> next() {
-    List<Object[]> collectedResult = null;
-    if (updateScanner()) {
-      collectedResult = 
this.scannerResultAggregator.collectData(scannedResult, batchSize);
-      while (collectedResult.size() < batchSize && updateScanner()) {
-        List<Object[]> data = this.scannerResultAggregator
-            .collectData(scannedResult, batchSize - collectedResult.size());
-        collectedResult.addAll(data);
-      }
-    } else {
-      collectedResult = new ArrayList<>();
-    }
-    return collectedResult;
-  }
-
-  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
-    if (updateScanner()) {
-      this.scannerResultAggregator.collectVectorBatch(scannedResult, 
columnarBatch);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
deleted file mode 100644
index b089fad..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ /dev/null
@@ -1,698 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.result;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.mutate.DeleteDeltaVo;
-import org.apache.carbondata.core.mutate.TupleIdEnum;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
-import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * Scanned result class which will store and provide the result on request
- */
-public abstract class AbstractScannedResult {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(AbstractScannedResult.class.getName());
-  /**
-   * current row number
-   */
-  protected int currentRow = -1;
-
-  protected int pageCounter;
-  /**
-   * row mapping indexes
-   */
-  protected int[][] rowMapping;
-  /**
-   * key size of the fixed length column
-   */
-  private int fixedLengthKeySize;
-  /**
-   * total number of rows per page
-   */
-  private int[] numberOfRows;
-
-  /**
-   * Total number of rows.
-   */
-  private int totalNumberOfRows;
-  /**
-   * to keep track of number of rows process
-   */
-  protected int rowCounter;
-  /**
-   * dimension column data chunk
-   */
-  protected DimensionColumnDataChunk[][] dimensionDataChunks;
-
-  /**
-   * Raw dimension chunks;
-   */
-  protected DimensionRawColumnChunk[] dimRawColumnChunks;
-
-  /**
-   * Raw dimension chunks;
-   */
-  protected MeasureRawColumnChunk[] msrRawColumnChunks;
-  /**
-   * measure column data chunk
-   */
-  protected ColumnPage[][] measureDataChunks;
-  /**
-   * dictionary column block index in file
-   */
-  protected int[] dictionaryColumnBlockIndexes;
-
-  /**
-   * no dictionary column block index in file
-   */
-  protected int[] noDictionaryColumnBlockIndexes;
-
-  /**
-   * column group to is key structure info
-   * which will be used to get the key from the complete
-   * column group key
-   * For example if only one dimension of the column group is selected
-   * then from complete column group key it will be used to mask the key and
-   * get the particular column key
-   */
-  protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
-
-  /**
-   *
-   */
-  private Map<Integer, GenericQueryType> complexParentIndexToQueryMap;
-
-  private int totalDimensionsSize;
-
-  /**
-   * blockedId which will be blockId + blocklet number in the block
-   */
-  private String blockletId;
-
-  private long rowId;
-
-  /**
-   * parent block indexes
-   */
-  private int[] complexParentBlockIndexes;
-
-  /**
-   * blockletid+pageumber to deleted reocrd map
-   */
-  private Map<String, DeleteDeltaVo> deletedRecordMap;
-
-  /**
-   * current page delete delta vo
-   */
-  private DeleteDeltaVo currentDeleteDeltaVo;
-
-  /**
-   * actual blocklet number
-   */
-  private String blockletNumber;
-
-  public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) {
-    this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
-    this.noDictionaryColumnBlockIndexes = 
blockExecutionInfo.getNoDictionaryBlockIndexes();
-    this.dictionaryColumnBlockIndexes = 
blockExecutionInfo.getDictionaryColumnBlockIndex();
-    this.columnGroupKeyStructureInfo = 
blockExecutionInfo.getColumnGroupToKeyStructureInfo();
-    this.complexParentIndexToQueryMap = 
blockExecutionInfo.getComlexDimensionInfoMap();
-    this.complexParentBlockIndexes = 
blockExecutionInfo.getComplexColumnParentBlockIndexes();
-    this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length;
-    this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
-  }
-
-  /**
-   * Below method will be used to set the dimension chunks
-   * which will be used to create a row
-   *
-   * @param dataChunks dimension chunks used in query
-   */
-  public void setDimensionChunks(DimensionColumnDataChunk[][] dataChunks) {
-    this.dimensionDataChunks = dataChunks;
-  }
-
-  /**
-   * Below method will be used to set the measure column chunks
-   *
-   * @param measureDataChunks measure data chunks
-   */
-  public void setMeasureChunks(ColumnPage[][] measureDataChunks) {
-    this.measureDataChunks = measureDataChunks;
-  }
-
-  public void setDimRawColumnChunks(DimensionRawColumnChunk[] 
dimRawColumnChunks) {
-    this.dimRawColumnChunks = dimRawColumnChunks;
-  }
-
-  public void setMsrRawColumnChunks(MeasureRawColumnChunk[] 
msrRawColumnChunks) {
-    this.msrRawColumnChunks = msrRawColumnChunks;
-  }
-
-  /**
-   * Below method will be used to get the chunk based in measure ordinal
-   *
-   * @param ordinal measure ordinal
-   * @return measure column chunk
-   */
-  public ColumnPage getMeasureChunk(int ordinal) {
-    return measureDataChunks[ordinal][pageCounter];
-  }
-
-  /**
-   * Below method will be used to get the key for all the dictionary dimensions
-   * which is present in the query
-   *
-   * @param rowId row id selected after scanning
-   * @return return the dictionary key
-   */
-  protected byte[] getDictionaryKeyArray(int rowId) {
-    byte[] completeKey = new byte[fixedLengthKeySize];
-    int offset = 0;
-    for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      offset += 
dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
-          .fillChunkData(completeKey, offset, rowId,
-              
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
-    }
-    rowCounter++;
-    return completeKey;
-  }
-
-  /**
-   * Below method will be used to get the key for all the dictionary dimensions
-   * in integer array format which is present in the query
-   *
-   * @param rowId row id selected after scanning
-   * @return return the dictionary key
-   */
-  protected int[] getDictionaryKeyIntegerArray(int rowId) {
-    int[] completeKey = new int[totalDimensionsSize];
-    int column = 0;
-    for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = 
dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
-          .fillConvertedChunkData(rowId, column, completeKey,
-              
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
-    }
-    rowCounter++;
-    return completeKey;
-  }
-
-  /**
-   * Fill the column data of dictionary to vector
-   */
-  public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
-    int column = 0;
-    for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = 
dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
-          .fillConvertedChunkData(vectorInfo, column,
-              
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
-    }
-  }
-
-  /**
-   * Fill the column data to vector
-   */
-  public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
-    int column = 0;
-    for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      column = 
dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
-          .fillConvertedChunkData(vectorInfo, column,
-              
columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
-    }
-  }
-
-  /**
-   * Fill the measure column data to vector
-   */
-  public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] 
measuresOrdinal) {
-    for (int i = 0; i < measuresOrdinal.length; i++) {
-      vectorInfo[i].measureVectorFiller
-          
.fillMeasureVector(measureDataChunks[measuresOrdinal[i]][pageCounter], 
vectorInfo[i]);
-    }
-  }
-
-  public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
-    for (int i = 0; i < vectorInfos.length; i++) {
-      int offset = vectorInfos[i].offset;
-      int len = offset + vectorInfos[i].size;
-      int vectorOffset = vectorInfos[i].vectorOffset;
-      CarbonColumnVector vector = vectorInfos[i].vector;
-      for (int j = offset; j < len; j++) {
-        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-        DataOutputStream dataOutput = new DataOutputStream(byteStream);
-        try {
-          vectorInfos[i].genericQueryType
-              .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks,
-                  rowMapping == null ? j : rowMapping[pageCounter][j], 
pageCounter, dataOutput);
-          Object data = vectorInfos[i].genericQueryType
-              
.getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
-          vector.putObject(vectorOffset++, data);
-        } catch (IOException e) {
-          LOGGER.error(e);
-        } finally {
-          CarbonUtil.closeStreams(dataOutput);
-          CarbonUtil.closeStreams(byteStream);
-        }
-      }
-    }
-  }
-
-  /**
-   * Fill the column data to vector
-   */
-  public void fillColumnarImplicitBatch(ColumnVectorInfo[] vectorInfo) {
-    for (int i = 0; i < vectorInfo.length; i++) {
-      ColumnVectorInfo columnVectorInfo = vectorInfo[i];
-      CarbonColumnVector vector = columnVectorInfo.vector;
-      int offset = columnVectorInfo.offset;
-      int vectorOffset = columnVectorInfo.vectorOffset;
-      int len = offset + columnVectorInfo.size;
-      for (int j = offset; j < len; j++) {
-        // Considering only String case now as we support only
-        String data = getBlockletId();
-        if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
-            .equals(columnVectorInfo.dimension.getColumnName())) {
-          data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter
-              + CarbonCommonConstants.FILE_SEPARATOR + (rowMapping == null ?
-              j :
-              rowMapping[pageCounter][j]);
-        }
-        vector.putBytes(vectorOffset++,
-            
data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-      }
-    }
-  }
-
-  /**
-   * Just increment the counter incase of query only on measures.
-   */
-  public void incrementCounter() {
-    rowCounter++;
-    currentRow++;
-  }
-
-  /**
-   * Just increment the page counter and reset the remaining counters.
-   */
-  public void incrementPageCounter() {
-    rowCounter = 0;
-    currentRow = -1;
-    pageCounter++;
-    fillDataChunks();
-    if (null != deletedRecordMap) {
-      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + 
pageCounter);
-    }
-  }
-
-  /**
-   * This case is used only in case of compaction, since it does not use 
filter flow.
-   */
-  public void fillDataChunks() {
-    freeDataChunkMemory();
-    if (pageCounter >= numberOfRows.length) {
-      return;
-    }
-    for (int i = 0; i < dimensionDataChunks.length; i++) {
-      if (dimensionDataChunks[i][pageCounter] == null && dimRawColumnChunks[i] 
!= null) {
-        dimensionDataChunks[i][pageCounter] =
-            
dimRawColumnChunks[i].convertToDimColDataChunkWithOutCache(pageCounter);
-      }
-    }
-
-    for (int i = 0; i < measureDataChunks.length; i++) {
-      if (measureDataChunks[i][pageCounter] == null && msrRawColumnChunks[i] 
!= null) {
-        measureDataChunks[i][pageCounter] =
-            msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter);
-      }
-    }
-  }
-
-  // free the memory for the last page chunk
-  private void freeDataChunkMemory() {
-    for (int i = 0; i < dimensionDataChunks.length; i++) {
-      if (pageCounter > 0 && dimensionDataChunks[i][pageCounter - 1] != null) {
-        dimensionDataChunks[i][pageCounter - 1].freeMemory();
-        dimensionDataChunks[i][pageCounter - 1] = null;
-      }
-    }
-    for (int i = 0; i < measureDataChunks.length; i++) {
-      if (pageCounter > 0 && measureDataChunks[i][pageCounter - 1] != null) {
-        measureDataChunks[i][pageCounter - 1].freeMemory();
-        measureDataChunks[i][pageCounter - 1] = null;
-      }
-    }
-  }
-
-  public int numberOfpages() {
-    return numberOfRows.length;
-  }
-
-  /**
-   * Get total rows in the current page
-   *
-   * @return
-   */
-  public int getCurrentPageRowCount() {
-    return numberOfRows[pageCounter];
-  }
-
-  public int getCurrentPageCounter() {
-    return pageCounter;
-  }
-
-  /**
-   * increment the counter.
-   */
-  public void setRowCounter(int rowCounter) {
-    this.rowCounter = rowCounter;
-  }
-
-  /**
-   * Below method will be used to get the dimension data based on dimension
-   * ordinal and index
-   *
-   * @param dimOrdinal dimension ordinal present in the query
-   * @param rowId      row index
-   * @return dimension data based on row id
-   */
-  protected byte[] getDimensionData(int dimOrdinal, int rowId) {
-    return dimensionDataChunks[dimOrdinal][pageCounter].getChunkData(rowId);
-  }
-
-  /**
-   * Below method will be used to get the dimension key array
-   * for all the no dictionary dimension present in the query
-   *
-   * @param rowId row number
-   * @return no dictionary keys for all no dictionary dimension
-   */
-  protected byte[][] getNoDictionaryKeyArray(int rowId) {
-    byte[][] noDictionaryColumnsKeys = new 
byte[noDictionaryColumnBlockIndexes.length][];
-    int position = 0;
-    for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      noDictionaryColumnsKeys[position++] =
-          
dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId);
-    }
-    return noDictionaryColumnsKeys;
-  }
-
-  /**
-   * Below method will be used to get the dimension key array
-   * for all the no dictionary dimension present in the query
-   *
-   * @param rowId row number
-   * @return no dictionary keys for all no dictionary dimension
-   */
-  protected String[] getNoDictionaryKeyStringArray(int rowId) {
-    String[] noDictionaryColumnsKeys = new 
String[noDictionaryColumnBlockIndexes.length];
-    int position = 0;
-    for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      noDictionaryColumnsKeys[position++] = new String(
-          
dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId),
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-    }
-    return noDictionaryColumnsKeys;
-  }
-
-  /**
-   * @return blockletId
-   */
-  public String getBlockletId() {
-    return blockletId;
-  }
-
-  /**
-   * @param blockletId
-   */
-  public void setBlockletId(String blockletId) {
-    this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
-    blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, 
TupleIdEnum.BLOCKLET_ID);
-    // if deleted recors map is present for this block
-    // then get the first page deleted vo
-    if (null != deletedRecordMap) {
-      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + 
pageCounter);
-    }
-  }
-
-  /**
-   * @return blockletId
-   */
-  public long getRowId() {
-    return rowId;
-  }
-
-  /**
-   * @param rowId
-   */
-  public void setRowId(long rowId) {
-    this.rowId = rowId;
-  }
-
-  /**
-   * Below method will be used to get the complex type keys array based
-   * on row id for all the complex type dimension selected in query
-   *
-   * @param rowId row number
-   * @return complex type key array for all the complex dimension selected in 
query
-   */
-  protected byte[][] getComplexTypeKeyArray(int rowId) {
-    byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][];
-    for (int i = 0; i < complexTypeData.length; i++) {
-      GenericQueryType genericQueryType =
-          complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
-      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-      DataOutputStream dataOutput = new DataOutputStream(byteStream);
-      try {
-        genericQueryType
-            .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, 
rowId, pageCounter,
-                dataOutput);
-        complexTypeData[i] = byteStream.toByteArray();
-      } catch (IOException e) {
-        LOGGER.error(e);
-      } finally {
-        CarbonUtil.closeStreams(dataOutput);
-        CarbonUtil.closeStreams(byteStream);
-      }
-    }
-    return complexTypeData;
-  }
-
-  /**
-   * @return return the total number of row after scanning
-   */
-  public int numberOfOutputRows() {
-    return this.totalNumberOfRows;
-  }
-
-  /**
-   * to check whether any more row is present in the result
-   *
-   * @return
-   */
-  public boolean hasNext() {
-    if (pageCounter < numberOfRows.length && rowCounter < 
this.numberOfRows[pageCounter]) {
-      return true;
-    } else if (pageCounter < numberOfRows.length) {
-      pageCounter++;
-      fillDataChunks();
-      rowCounter = 0;
-      currentRow = -1;
-      if (null != deletedRecordMap) {
-        currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + 
pageCounter);
-      }
-      return hasNext();
-    }
-    return false;
-  }
-
-  /**
-   * Below method will be used to free the occupied memory
-   */
-  public void freeMemory() {
-    // first free the dimension chunks
-    if (null != dimensionDataChunks) {
-      for (int i = 0; i < dimensionDataChunks.length; i++) {
-        if (null != dimensionDataChunks[i]) {
-          for (int j = 0; j < dimensionDataChunks[i].length; j++) {
-            if (null != dimensionDataChunks[i][j]) {
-              dimensionDataChunks[i][j].freeMemory();
-            }
-          }
-        }
-      }
-    }
-    // free the measure data chunks
-    if (null != measureDataChunks) {
-      for (int i = 0; i < measureDataChunks.length; i++) {
-        if (null != measureDataChunks[i]) {
-          for (int j = 0; j < measureDataChunks[i].length; j++) {
-            if (null != measureDataChunks[i][j]) {
-              measureDataChunks[i][j].freeMemory();
-            }
-          }
-        }
-      }
-    }
-    // free the raw chunks
-    if (null != dimRawColumnChunks) {
-      for (int i = 0; i < dimRawColumnChunks.length; i++) {
-        if (null != dimRawColumnChunks[i]) {
-          dimRawColumnChunks[i].freeMemory();
-        }
-      }
-    }
-  }
-
-  /**
-   * As this class will be a flyweight object so
-   * for one block all the blocklet scanning will use same result object
-   * in that case we need to reset the counter to zero so
-   * for new result it will give the result from zero
-   */
-  public void reset() {
-    rowCounter = 0;
-    currentRow = -1;
-    pageCounter = 0;
-  }
-
-  /**
-   * @param numberOfRows set total of number rows valid after scanning
-   */
-  public void setNumberOfRows(int[] numberOfRows) {
-    this.numberOfRows = numberOfRows;
-
-    for (int count : numberOfRows) {
-      totalNumberOfRows += count;
-    }
-  }
-
-  /**
-   * After applying filter it will return the  bit set with the valid row 
indexes
-   * so below method will be used to set the row indexes
-   *
-   * @param indexes
-   */
-  public void setIndexes(int[][] indexes) {
-    this.rowMapping = indexes;
-  }
-
-  public int getRowCounter() {
-    return rowCounter;
-  }
-
-  /**
-   * will return the current valid row id
-   *
-   * @return valid row id
-   */
-  public abstract int getCurrentRowId();
-
-  /**
-   * @return dictionary key array for all the dictionary dimension
-   * selected in query
-   */
-  public abstract byte[] getDictionaryKeyArray();
-
-  /**
-   * @return dictionary key array for all the dictionary dimension in integer 
array forat
-   * selected in query
-   */
-  public abstract int[] getDictionaryKeyIntegerArray();
-
-  /**
-   * Below method will be used to get the complex type key array
-   *
-   * @return complex type key array
-   */
-  public abstract byte[][] getComplexTypeKeyArray();
-
-  /**
-   * Below method will be used to get the no dictionary key
-   * array for all the no dictionary dimension selected in query
-   *
-   * @return no dictionary key array for all the no dictionary dimension
-   */
-  public abstract byte[][] getNoDictionaryKeyArray();
-
-  /**
-   * Below method will be used to get the no dictionary key
-   * array in string array format for all the no dictionary dimension selected 
in query
-   *
-   * @return no dictionary key array for all the no dictionary dimension
-   */
-  public abstract String[] getNoDictionaryKeyStringArray();
-
-  /**
-   * Mark the filtered rows in columnar batch. These rows will not be added to 
vector batches later.
-   * @param columnarBatch
-   * @param startRow
-   * @param size
-   * @param vectorOffset
-   */
-  public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, 
int size,
-      int vectorOffset) {
-    int rowsFiltered = 0;
-    if (currentDeleteDeltaVo != null) {
-      int len = startRow + size;
-      for (int i = startRow; i < len; i++) {
-        int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
-        if (currentDeleteDeltaVo.containsRow(rowId)) {
-          columnarBatch.markFiltered(vectorOffset);
-          rowsFiltered++;
-        }
-        vectorOffset++;
-      }
-    }
-    return rowsFiltered;
-  }
-
-  /**
-   * Below method will be used to check row got deleted
-   *
-   * @param rowId
-   * @return is present in deleted row
-   */
-  public boolean containsDeletedRow(int rowId) {
-    if (null != currentDeleteDeltaVo) {
-      return currentDeleteDeltaVo.containsRow(rowId);
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java
deleted file mode 100644
index 56ca2ac..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.scan.result;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.carbondata.common.CarbonIterator;
-
-/**
- * Below class holds the query result
- */
-public class BatchResult extends CarbonIterator<Object[]> {
-
-  /**
-   * list of keys
-   */
-  protected List<Object[]> rows;
-
-  /**
-   * counter to check whether all the records are processed or not
-   */
-  protected int counter;
-
-  public BatchResult() {
-    this.rows = new ArrayList<>();
-  }
-
-  /**
-   * Below method will be used to get the rows
-   *
-   * @return
-   */
-  public List<Object[]> getRows() {
-    return rows;
-  }
-
-  /**
-   * Below method will be used to get the set the values
-   *
-   * @param rows
-   */
-  public void setRows(List<Object[]> rows) {
-    this.rows = rows;
-  }
-
-  /**
-   * This method will return one row at a time based on the counter given.
-   * @param counter
-   * @return
-   */
-  public Object[] getRawRow(int counter) {
-    return rows.get(counter);
-  }
-
-  /**
-   * For getting the total size.
-   * @return
-   */
-  public int getSize() {
-    return rows.size();
-  }
-
-
-  /**
-   * Returns {@code true} if the iteration has more elements.
-   *
-   * @return {@code true} if the iteration has more elements
-   */
-  @Override public boolean hasNext() {
-    return counter < rows.size();
-  }
-
-  /**
-   * Returns the next element in the iteration.
-   *
-   * @return the next element in the iteration
-   */
-  @Override public Object[] next() {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
-    }
-    Object[] row = rows.get(counter);
-    counter++;
-    return row;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
new file mode 100644
index 0000000..29404b4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Scanned result class which will store and provide the result on request
+ */
+public abstract class BlockletScannedResult {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletScannedResult.class.getName());
+  /**
+   * current row number
+   */
+  protected int currentRow = -1;
+
+  protected int pageCounter;
+  /**
+   * matched rowId for each page
+   */
+  protected int[][] pageFilteredRowId;
+  /**
+   * key size of the fixed length column
+   */
+  private int fixedLengthKeySize;
+  /**
+   * total number of filtered rows for each page
+   */
+  private int[] pageFilteredRowCount;
+
+  /**
+   * to keep track of number of rows process
+   */
+  protected int rowCounter;
+  /**
+   * dimension column data chunk
+   */
+  protected DimensionColumnPage[][] dimensionColumnPages;
+
+  /**
+   * Raw dimension chunks;
+   */
+  protected DimensionRawColumnChunk[] dimRawColumnChunks;
+
+  /**
+   * Raw dimension chunks;
+   */
+  protected MeasureRawColumnChunk[] msrRawColumnChunks;
+  /**
+   * measure column data chunk
+   */
+  protected ColumnPage[][] measureColumnPages;
+  /**
+   * dictionary column block index in file
+   */
+  protected int[] dictionaryColumnChunkIndexes;
+
+  /**
+   * no dictionary column chunk index in file
+   */
+  protected int[] noDictionaryColumnChunkIndexes;
+
+  /**
+   * column group to is key structure info
+   * which will be used to get the key from the complete
+   * column group key
+   * For example if only one dimension of the column group is selected
+   * then from complete column group key it will be used to mask the key and
+   * get the particular column key
+   */
+  protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
+
+  /**
+   *
+   */
+  private Map<Integer, GenericQueryType> complexParentIndexToQueryMap;
+
+  private int totalDimensionsSize;
+
+  /**
+   * blockedId which will be blockId + blocklet number in the block
+   */
+  private String blockletId;
+
+  /**
+   * parent block indexes
+   */
+  private int[] complexParentBlockIndexes;
+
+  /**
+   * blockletid+pageumber to deleted reocrd map
+   */
+  private Map<String, DeleteDeltaVo> deletedRecordMap;
+
+  /**
+   * current page delete delta vo
+   */
+  private DeleteDeltaVo currentDeleteDeltaVo;
+
+  /**
+   * actual blocklet number
+   */
+  private String blockletNumber;
+
+  public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo) {
+    this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
+    this.noDictionaryColumnChunkIndexes = 
blockExecutionInfo.getNoDictionaryColumnChunkIndexes();
+    this.dictionaryColumnChunkIndexes = 
blockExecutionInfo.getDictionaryColumnChunkIndex();
+    this.columnGroupKeyStructureInfo = 
blockExecutionInfo.getColumnGroupToKeyStructureInfo();
+    this.complexParentIndexToQueryMap = 
blockExecutionInfo.getComlexDimensionInfoMap();
+    this.complexParentBlockIndexes = 
blockExecutionInfo.getComplexColumnParentBlockIndexes();
+    this.totalDimensionsSize = 
blockExecutionInfo.getProjectionDimensions().length;
+    this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
+  }
+
+  /**
+   * Below method will be used to set the dimension chunks
+   * which will be used to create a row
+   *
+   * @param columnPages dimension chunks used in query
+   */
+  public void setDimensionColumnPages(DimensionColumnPage[][] columnPages) {
+    this.dimensionColumnPages = columnPages;
+  }
+
+  /**
+   * Below method will be used to set the measure column chunks
+   *
+   * @param columnPages measure data chunks
+   */
+  public void setMeasureColumnPages(ColumnPage[][] columnPages) {
+    this.measureColumnPages = columnPages;
+  }
+
+  public void setDimRawColumnChunks(DimensionRawColumnChunk[] 
dimRawColumnChunks) {
+    this.dimRawColumnChunks = dimRawColumnChunks;
+  }
+
+  public void setMsrRawColumnChunks(MeasureRawColumnChunk[] 
msrRawColumnChunks) {
+    this.msrRawColumnChunks = msrRawColumnChunks;
+  }
+
+  /**
+   * Below method will be used to get the chunk based in measure ordinal
+   *
+   * @param ordinal measure ordinal
+   * @return measure column chunk
+   */
+  public ColumnPage getMeasureChunk(int ordinal) {
+    return measureColumnPages[ordinal][pageCounter];
+  }
+
+  /**
+   * Below method will be used to get the key for all the dictionary dimensions
+   * which is present in the query
+   *
+   * @param rowId row id selected after scanning
+   * @return return the dictionary key
+   */
+  protected byte[] getDictionaryKeyArray(int rowId) {
+    byte[] completeKey = new byte[fixedLengthKeySize];
+    int offset = 0;
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      offset += 
dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter].fillRawData(
+          rowId, offset, completeKey,
+          columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+    }
+    rowCounter++;
+    return completeKey;
+  }
+
+  /**
+   * Below method will be used to get the key for all the dictionary dimensions
+   * in integer array format which is present in the query
+   *
+   * @param rowId row id selected after scanning
+   * @return return the dictionary key
+   */
+  protected int[] getDictionaryKeyIntegerArray(int rowId) {
+    int[] completeKey = new int[totalDimensionsSize];
+    int column = 0;
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      column = 
dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+          .fillSurrogateKey(rowId, column, completeKey,
+              
columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+    }
+    rowCounter++;
+    return completeKey;
+  }
+
+  /**
+   * Fill the column data of dictionary to vector
+   */
+  public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+    int column = 0;
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      column = 
dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+          .fillVector(vectorInfo, column,
+              
columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+    }
+  }
+
+  /**
+   * Fill the column data to vector
+   */
+  public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+    int column = 0;
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      column = 
dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
+          .fillVector(vectorInfo, column,
+              
columnGroupKeyStructureInfo.get(noDictionaryColumnChunkIndexes[i]));
+    }
+  }
+
+  /**
+   * Fill the measure column data to vector
+   */
+  public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] 
measuresOrdinal) {
+    for (int i = 0; i < measuresOrdinal.length; i++) {
+      vectorInfo[i].measureVectorFiller
+          
.fillMeasureVector(measureColumnPages[measuresOrdinal[i]][pageCounter], 
vectorInfo[i]);
+    }
+  }
+
+  public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
+    for (int i = 0; i < vectorInfos.length; i++) {
+      int offset = vectorInfos[i].offset;
+      int len = offset + vectorInfos[i].size;
+      int vectorOffset = vectorInfos[i].vectorOffset;
+      CarbonColumnVector vector = vectorInfos[i].vector;
+      for (int j = offset; j < len; j++) {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutput = new DataOutputStream(byteStream);
+        try {
+          
vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(
+              dimRawColumnChunks,
+              pageFilteredRowId == null ? j : 
pageFilteredRowId[pageCounter][j], pageCounter,
+              dataOutput);
+          Object data = vectorInfos[i].genericQueryType
+              
.getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+          vector.putObject(vectorOffset++, data);
+        } catch (IOException e) {
+          LOGGER.error(e);
+        } finally {
+          CarbonUtil.closeStreams(dataOutput);
+          CarbonUtil.closeStreams(byteStream);
+        }
+      }
+    }
+  }
+
+  /**
+   * Fill the column data to vector
+   */
+  public void fillColumnarImplicitBatch(ColumnVectorInfo[] vectorInfo) {
+    for (int i = 0; i < vectorInfo.length; i++) {
+      ColumnVectorInfo columnVectorInfo = vectorInfo[i];
+      CarbonColumnVector vector = columnVectorInfo.vector;
+      int offset = columnVectorInfo.offset;
+      int vectorOffset = columnVectorInfo.vectorOffset;
+      int len = offset + columnVectorInfo.size;
+      for (int j = offset; j < len; j++) {
+        // Considering only String case now as we support only
+        String data = getBlockletId();
+        if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
+            .equals(columnVectorInfo.dimension.getColumnName())) {
+          data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter
+              + CarbonCommonConstants.FILE_SEPARATOR + (pageFilteredRowId == 
null ?
+              j :
+              pageFilteredRowId[pageCounter][j]);
+        }
+        vector.putBytes(vectorOffset++,
+            
data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+      }
+    }
+  }
+
+  /**
+   * Just increment the counter incase of query only on measures.
+   */
+  public void incrementCounter() {
+    rowCounter++;
+    currentRow++;
+  }
+
+  /**
+   * Just increment the page counter and reset the remaining counters.
+   */
+  public void incrementPageCounter() {
+    rowCounter = 0;
+    currentRow = -1;
+    pageCounter++;
+    fillDataChunks();
+    if (null != deletedRecordMap) {
+      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + 
pageCounter);
+    }
+  }
+
+  /**
+   * This case is used only in case of compaction, since it does not use 
filter flow.
+   */
+  public void fillDataChunks() {
+    freeDataChunkMemory();
+    if (pageCounter >= pageFilteredRowCount.length) {
+      return;
+    }
+    for (int i = 0; i < dimensionColumnPages.length; i++) {
+      if (dimensionColumnPages[i][pageCounter] == null && 
dimRawColumnChunks[i] != null) {
+        dimensionColumnPages[i][pageCounter] =
+            
dimRawColumnChunks[i].convertToDimColDataChunkWithOutCache(pageCounter);
+      }
+    }
+
+    for (int i = 0; i < measureColumnPages.length; i++) {
+      if (measureColumnPages[i][pageCounter] == null && msrRawColumnChunks[i] 
!= null) {
+        measureColumnPages[i][pageCounter] =
+            msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter);
+      }
+    }
+  }
+
+  // free the memory for the last page chunk
+  private void freeDataChunkMemory() {
+    for (int i = 0; i < dimensionColumnPages.length; i++) {
+      if (pageCounter > 0 && dimensionColumnPages[i][pageCounter - 1] != null) 
{
+        dimensionColumnPages[i][pageCounter - 1].freeMemory();
+        dimensionColumnPages[i][pageCounter - 1] = null;
+      }
+    }
+    for (int i = 0; i < measureColumnPages.length; i++) {
+      if (pageCounter > 0 && measureColumnPages[i][pageCounter - 1] != null) {
+        measureColumnPages[i][pageCounter - 1].freeMemory();
+        measureColumnPages[i][pageCounter - 1] = null;
+      }
+    }
+  }
+
+  public int numberOfpages() {
+    return pageFilteredRowCount.length;
+  }
+
+  /**
+   * Get total rows in the current page
+   *
+   * @return
+   */
+  public int getCurrentPageRowCount() {
+    return pageFilteredRowCount[pageCounter];
+  }
+
+  public int getCurrentPageCounter() {
+    return pageCounter;
+  }
+
+  /**
+   * increment the counter.
+   */
+  public void setRowCounter(int rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+
+  /**
+   * Below method will be used to get the dimension key array
+   * for all the no dictionary dimension present in the query
+   *
+   * @param rowId row number
+   * @return no dictionary keys for all no dictionary dimension
+   */
+  protected byte[][] getNoDictionaryKeyArray(int rowId) {
+    byte[][] noDictionaryColumnsKeys = new 
byte[noDictionaryColumnChunkIndexes.length][];
+    int position = 0;
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      noDictionaryColumnsKeys[position++] =
+          
dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter].getChunkData(rowId);
+    }
+    return noDictionaryColumnsKeys;
+  }
+
+  /**
+   * @return blockletId
+   */
+  public String getBlockletId() {
+    return blockletId;
+  }
+
+  /**
+   * Set blocklet id, which looks like
+   * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0"
+   */
+  public void setBlockletId(String blockletId) {
+    this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
+    blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, 
TupleIdEnum.BLOCKLET_ID);
+    // if deleted recors map is present for this block
+    // then get the first page deleted vo
+    if (null != deletedRecordMap) {
+      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + 
pageCounter);
+    }
+  }
+
+  /**
+   * Below method will be used to get the complex type keys array based
+   * on row id for all the complex type dimension selected in query
+   *
+   * @param rowId row number
+   * @return complex type key array for all the complex dimension selected in 
query
+   */
+  protected byte[][] getComplexTypeKeyArray(int rowId) {
+    byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][];
+    for (int i = 0; i < complexTypeData.length; i++) {
+      GenericQueryType genericQueryType =
+          complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      DataOutputStream dataOutput = new DataOutputStream(byteStream);
+      try {
+        genericQueryType
+            .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, 
rowId, pageCounter,
+                dataOutput);
+        complexTypeData[i] = byteStream.toByteArray();
+      } catch (IOException e) {
+        LOGGER.error(e);
+      } finally {
+        CarbonUtil.closeStreams(dataOutput);
+        CarbonUtil.closeStreams(byteStream);
+      }
+    }
+    return complexTypeData;
+  }
+
+  /**
+   * to check whether any more row is present in the result
+   *
+   * @return
+   */
+  public boolean hasNext() {
+    if (pageCounter
+        < pageFilteredRowCount.length && rowCounter < 
this.pageFilteredRowCount[pageCounter]) {
+      return true;
+    } else if (pageCounter < pageFilteredRowCount.length) {
+      pageCounter++;
+      fillDataChunks();
+      rowCounter = 0;
+      currentRow = -1;
+      if (null != deletedRecordMap) {
+        currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + 
pageCounter);
+      }
+      return hasNext();
+    }
+    return false;
+  }
+
+  /**
+   * Below method will be used to free the occupied memory
+   */
+  public void freeMemory() {
+    // first free the dimension chunks
+    if (null != dimensionColumnPages) {
+      for (int i = 0; i < dimensionColumnPages.length; i++) {
+        if (null != dimensionColumnPages[i]) {
+          for (int j = 0; j < dimensionColumnPages[i].length; j++) {
+            if (null != dimensionColumnPages[i][j]) {
+              dimensionColumnPages[i][j].freeMemory();
+            }
+          }
+        }
+      }
+    }
+    // free the measure data chunks
+    if (null != measureColumnPages) {
+      for (int i = 0; i < measureColumnPages.length; i++) {
+        if (null != measureColumnPages[i]) {
+          for (int j = 0; j < measureColumnPages[i].length; j++) {
+            if (null != measureColumnPages[i][j]) {
+              measureColumnPages[i][j].freeMemory();
+            }
+          }
+        }
+      }
+    }
+    // free the raw chunks
+    if (null != dimRawColumnChunks) {
+      for (int i = 0; i < dimRawColumnChunks.length; i++) {
+        if (null != dimRawColumnChunks[i]) {
+          dimRawColumnChunks[i].freeMemory();
+        }
+      }
+    }
+  }
+
+  /**
+   * @param pageFilteredRowCount set total of number rows valid after scanning
+   */
+  public void setPageFilteredRowCount(int[] pageFilteredRowCount) {
+    this.pageFilteredRowCount = pageFilteredRowCount;
+  }
+
+  /**
+   * After applying filter it will return the  bit set with the valid row 
indexes
+   * so below method will be used to set the row indexes
+   */
+  public void setPageFilteredRowId(int[][] pageFilteredRowId) {
+    this.pageFilteredRowId = pageFilteredRowId;
+  }
+
+  public int getRowCounter() {
+    return rowCounter;
+  }
+
+  /**
+   * will return the current valid row id
+   *
+   * @return valid row id
+   */
+  public abstract int getCurrentRowId();
+
+  /**
+   * @return dictionary key array for all the dictionary dimension
+   * selected in query
+   */
+  public abstract byte[] getDictionaryKeyArray();
+
+  /**
+   * @return dictionary key array for all the dictionary dimension in integer 
array forat
+   * selected in query
+   */
+  public abstract int[] getDictionaryKeyIntegerArray();
+
+  /**
+   * Below method will be used to get the complex type key array
+   *
+   * @return complex type key array
+   */
+  public abstract byte[][] getComplexTypeKeyArray();
+
+  /**
+   * Below method will be used to get the no dictionary key
+   * array for all the no dictionary dimension selected in query
+   *
+   * @return no dictionary key array for all the no dictionary dimension
+   */
+  public abstract byte[][] getNoDictionaryKeyArray();
+
+  /**
+   * Mark the filtered rows in columnar batch. These rows will not be added to 
vector batches later.
+   * @param columnarBatch
+   * @param startRow
+   * @param size
+   * @param vectorOffset
+   */
+  public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, 
int size,
+      int vectorOffset) {
+    int rowsFiltered = 0;
+    if (currentDeleteDeltaVo != null) {
+      int len = startRow + size;
+      for (int i = startRow; i < len; i++) {
+        int rowId = pageFilteredRowId != null ? 
pageFilteredRowId[pageCounter][i] : i;
+        if (currentDeleteDeltaVo.containsRow(rowId)) {
+          columnarBatch.markFiltered(vectorOffset);
+          rowsFiltered++;
+        }
+        vectorOffset++;
+      }
+    }
+    return rowsFiltered;
+  }
+
+  /**
+   * Below method will be used to check row got deleted
+   *
+   * @param rowId
+   * @return is present in deleted row
+   */
+  public boolean containsDeletedRow(int rowId) {
+    if (null != currentDeleteDeltaVo) {
+      return currentDeleteDeltaVo.containsRow(rowId);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java 
b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
new file mode 100644
index 0000000..c129161
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.carbondata.common.CarbonIterator;
+
+/**
+ * Below class holds the query result
+ */
+public class RowBatch extends CarbonIterator<Object[]> {
+
+  /**
+   * list of keys
+   */
+  protected List<Object[]> rows;
+
+  /**
+   * counter to check whether all the records are processed or not
+   */
+  protected int counter;
+
+  public RowBatch() {
+    this.rows = new ArrayList<>();
+  }
+
+  /**
+   * Below method will be used to get the rows
+   *
+   * @return
+   */
+  public List<Object[]> getRows() {
+    return rows;
+  }
+
+  /**
+   * Below method will be used to get the set the values
+   *
+   * @param rows
+   */
+  public void setRows(List<Object[]> rows) {
+    this.rows = rows;
+  }
+
+  /**
+   * This method will return one row at a time based on the counter given.
+   * @param counter
+   * @return
+   */
+  public Object[] getRawRow(int counter) {
+    return rows.get(counter);
+  }
+
+  /**
+   * For getting the total size.
+   * @return
+   */
+  public int getSize() {
+    return rows.size();
+  }
+
+
+  /**
+   * Returns {@code true} if the iteration has more elements.
+   *
+   * @return {@code true} if the iteration has more elements
+   */
+  @Override public boolean hasNext() {
+    return counter < rows.size();
+  }
+
+  /**
+   * Returns the next element in the iteration.
+   *
+   * @return the next element in the iteration
+   */
+  @Override public Object[] next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    Object[] row = rows.get(counter);
+    counter++;
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 8120310..bcc5634 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -17,7 +17,7 @@
 package org.apache.carbondata.core.scan.result.impl;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
@@ -25,7 +25,7 @@ import 
org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
  * In case of filter query data will be send
  * based on filtered row index
  */
-public class FilterQueryScannedResult extends AbstractScannedResult {
+public class FilterQueryScannedResult extends BlockletScannedResult {
 
   public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) 
{
     super(tableBlockExecutionInfos);
@@ -37,7 +37,7 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    */
   @Override public byte[] getDictionaryKeyArray() {
     ++currentRow;
-    return getDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
+    return getDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
   /**
@@ -46,7 +46,7 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    */
   @Override public int[] getDictionaryKeyIntegerArray() {
     ++currentRow;
-    return getDictionaryKeyIntegerArray(rowMapping[pageCounter][currentRow]);
+    return 
getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
   /**
@@ -55,7 +55,7 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    * @return complex type key array
    */
   @Override public byte[][] getComplexTypeKeyArray() {
-    return getComplexTypeKeyArray(rowMapping[pageCounter][currentRow]);
+    return getComplexTypeKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
   /**
@@ -65,17 +65,7 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    * @return no dictionary key array for all the no dictionary dimension
    */
   @Override public byte[][] getNoDictionaryKeyArray() {
-    return getNoDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
-  }
-
-  /**
-   * Below method will be used to get the no dictionary key
-   * string array for all the no dictionary dimension selected in query
-   *
-   * @return no dictionary key array for all the no dictionary dimension
-   */
-  @Override public String[] getNoDictionaryKeyStringArray() {
-    return getNoDictionaryKeyStringArray(rowMapping[pageCounter][currentRow]);
+    return getNoDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
   /**
@@ -84,7 +74,7 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    * @return valid row id
    */
   @Override public int getCurrentRowId() {
-    return rowMapping[pageCounter][currentRow];
+    return pageFilteredRowId[pageCounter][currentRow];
   }
 
   /**
@@ -92,10 +82,12 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    */
   public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
-    for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = 
dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
-          .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
-              
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
+    for (int chunkIndex : this.dictionaryColumnChunkIndexes) {
+      column = dimensionColumnPages[chunkIndex][pageCounter].fillVector(
+          pageFilteredRowId[pageCounter],
+          vectorInfo,
+          column,
+          columnGroupKeyStructureInfo.get(chunkIndex));
     }
   }
 
@@ -104,10 +96,12 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    */
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
-    for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      column = 
dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
-          .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
-              
columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
+    for (int chunkIndex : this.noDictionaryColumnChunkIndexes) {
+      column = dimensionColumnPages[chunkIndex][pageCounter].fillVector(
+          pageFilteredRowId[pageCounter],
+          vectorInfo,
+          column,
+          columnGroupKeyStructureInfo.get(chunkIndex));
     }
   }
 
@@ -116,8 +110,10 @@ public class FilterQueryScannedResult extends 
AbstractScannedResult {
    */
   public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] 
measuresOrdinal) {
     for (int i = 0; i < measuresOrdinal.length; i++) {
-      
vectorInfo[i].measureVectorFiller.fillMeasureVectorForFilter(rowMapping[pageCounter],
-          measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
+      vectorInfo[i].measureVectorFiller.fillMeasureVector(
+          pageFilteredRowId[pageCounter],
+          measureColumnPages[measuresOrdinal[i]][pageCounter],
+          vectorInfo[i]);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index 3978f9e..06687c2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -17,14 +17,14 @@
 package org.apache.carbondata.core.scan.result.impl;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 
 /**
  * Result provide class for non filter query
  * In case of no filter query we need to return
  * complete data
  */
-public class NonFilterQueryScannedResult extends AbstractScannedResult {
+public class NonFilterQueryScannedResult extends BlockletScannedResult {
 
   public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) {
     super(blockExecutionInfo);
@@ -68,16 +68,6 @@ public class NonFilterQueryScannedResult extends 
AbstractScannedResult {
   }
 
   /**
-   * Below method will be used to get the no dictionary key
-   * string array for all the no dictionary dimension selected in query
-   *
-   * @return no dictionary key array for all the no dictionary dimension
-   */
-  @Override public String[] getNoDictionaryKeyStringArray() {
-    return getNoDictionaryKeyStringArray(currentRow);
-  }
-
-  /**
    * will return the current valid row id
    *
    * @return valid row id

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 6172b40..4e628fe 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -28,18 +28,17 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.DataRefNodeFinder;
-import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.infos.DeleteDeltaInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator;
-import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl;
+import org.apache.carbondata.core.scan.processor.DataBlockIterator;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -63,23 +62,23 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
   private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap 
=
       new ConcurrentHashMap<>();
 
-  protected ExecutorService execService;
+  private ExecutorService execService;
   /**
    * execution info of the block
    */
-  protected List<BlockExecutionInfo> blockExecutionInfos;
+  private List<BlockExecutionInfo> blockExecutionInfos;
 
   /**
    * file reader which will be used to execute the query
    */
-  protected FileHolder fileReader;
+  protected FileReader fileReader;
 
-  protected AbstractDataBlockIterator dataBlockIterator;
+  DataBlockIterator dataBlockIterator;
 
   /**
    * QueryStatisticsRecorder
    */
-  protected QueryStatisticsRecorder recorder;
+  private QueryStatisticsRecorder recorder;
   /**
    * number of cores which can be used
    */
@@ -89,7 +88,7 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
    */
   private QueryStatisticsModel queryStatisticsModel;
 
-  public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, 
QueryModel queryModel,
+  AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel 
queryModel,
       ExecutorService execService) {
     String batchSizeString =
         
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
@@ -107,7 +106,6 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
     this.blockExecutionInfos = infos;
     this.fileReader = FileFactory.getFileHolder(
         
FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath()));
-    this.fileReader.setQueryId(queryModel.getQueryId());
     this.fileReader.setReadPageByPage(queryModel.isReadPageByPage());
     this.execService = execService;
     intialiseInfos();
@@ -130,22 +128,21 @@ public abstract class 
AbstractDetailQueryResultIterator<E> extends CarbonIterato
         blockInfo.setDeletedRecordsMap(deletedRowsMap);
       }
       DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
-      if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
-        BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) 
dataRefNode;
-        blockInfo.setFirstDataBlock(wrapper);
-        blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
-
+      if (dataRefNode instanceof BlockletDataRefNode) {
+        BlockletDataRefNode node = (BlockletDataRefNode) dataRefNode;
+        blockInfo.setFirstDataBlock(node);
+        blockInfo.setNumberOfBlockToScan(node.numberOfNodes());
       } else {
         DataRefNode startDataBlock =
             finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
-        while (startDataBlock.nodeNumber() < 
blockInfo.getStartBlockletIndex()) {
+        while (startDataBlock.nodeIndex() < blockInfo.getStartBlockletIndex()) 
{
           startDataBlock = startDataBlock.getNextDataRefNode();
         }
         long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
         //if number of block is less than 0 then take end block.
         if (numberOfBlockToScan <= 0) {
           DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, 
blockInfo.getEndKey());
-          numberOfBlockToScan = endDataBlock.nodeNumber() - 
startDataBlock.nodeNumber() + 1;
+          numberOfBlockToScan = endDataBlock.nodeIndex() - 
startDataBlock.nodeIndex() + 1;
         }
         blockInfo.setFirstDataBlock(startDataBlock);
         blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
@@ -230,7 +227,8 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
     }
   }
 
-  @Override public boolean hasNext() {
+  @Override
+  public boolean hasNext() {
     if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
       return true;
     } else if (blockExecutionInfos.size() > 0) {
@@ -240,7 +238,7 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
     }
   }
 
-  protected void updateDataBlockIterator() {
+  void updateDataBlockIterator() {
     if (dataBlockIterator == null || !dataBlockIterator.hasNext()) {
       dataBlockIterator = getDataBlockIterator();
       while (dataBlockIterator != null && !dataBlockIterator.hasNext()) {
@@ -249,17 +247,17 @@ public abstract class 
AbstractDetailQueryResultIterator<E> extends CarbonIterato
     }
   }
 
-  private DataBlockIteratorImpl getDataBlockIterator() {
+  private DataBlockIterator getDataBlockIterator() {
     if (blockExecutionInfos.size() > 0) {
       BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
       blockExecutionInfos.remove(executionInfo);
-      return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, 
queryStatisticsModel,
+      return new DataBlockIterator(executionInfo, fileReader, batchSize, 
queryStatisticsModel,
           execService);
     }
     return null;
   }
 
-  protected void initQueryStatiticsModel() {
+  private void initQueryStatiticsModel() {
     this.queryStatisticsModel = new QueryStatisticsModel();
     this.queryStatisticsModel.setRecorder(recorder);
     QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
index 1efac30..1235789 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
@@ -18,7 +18,7 @@
 package org.apache.carbondata.core.scan.result.iterator;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 
 /**
  * Iterator over row result
@@ -28,14 +28,14 @@ public class ChunkRowIterator extends 
CarbonIterator<Object[]> {
   /**
    * iterator over chunk result
    */
-  private CarbonIterator<BatchResult> iterator;
+  private CarbonIterator<RowBatch> iterator;
 
   /**
    * currect chunk
    */
-  private BatchResult currentchunk;
+  private RowBatch currentchunk;
 
-  public ChunkRowIterator(CarbonIterator<BatchResult> iterator) {
+  public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
     this.iterator = iterator;
     if (iterator.hasNext()) {
       currentchunk = iterator.next();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b79ff18/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index 747f5a9..c073c78 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -21,14 +21,14 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 
 /**
  * In case of detail query we cannot keep all the records in memory so for
  * executing that query are returning a iterator over block and every time next
  * call will come it will execute the block and return the result
  */
-public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator<BatchResult> {
+public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator<RowBatch> {
 
   private final Object lock = new Object();
 
@@ -37,18 +37,18 @@ public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator
     super(infos, queryModel, execService);
   }
 
-  @Override public BatchResult next() {
+  @Override public RowBatch next() {
     return getBatchResult();
   }
 
-  private BatchResult getBatchResult() {
-    BatchResult batchResult = new BatchResult();
+  private RowBatch getBatchResult() {
+    RowBatch rowBatch = new RowBatch();
     synchronized (lock) {
       updateDataBlockIterator();
       if (dataBlockIterator != null) {
-        batchResult.setRows(dataBlockIterator.next());
+        rowBatch.setRows(dataBlockIterator.next());
       }
     }
-    return batchResult;
+    return rowBatch;
   }
 }

Reply via email to