Repository: carbondata
Updated Branches:
  refs/heads/master 03a735bf7 -> 26607fb9c


Compaction performance is slow as compared to data load

During compaction result filling is done in row format. Due to this as the 
number of columns increases the dimension and measure data filling time 
increases. This happens because in row filling we are not able to take 
advantage of OS cacheable buffers as we continuously read data for next 
column.Implement a columnar format filling data structure for compaction 
process for filling dimension and measure data

This closes #2210


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/26607fb9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/26607fb9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/26607fb9

Branch: refs/heads/master
Commit: 26607fb9c5507faa92bafec33f0fabab2e47935d
Parents: 03a735b
Author: manishgupta88 <[email protected]>
Authored: Mon Apr 23 12:36:39 2018 +0530
Committer: kumarvishal09 <[email protected]>
Committed: Mon Apr 30 15:16:43 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +-
 .../impl/AbstractScannedResultCollector.java    |  49 ++++++
 .../collector/impl/RawBasedResultCollector.java | 131 ++++++++++++----
 .../RestructureBasedRawResultCollector.java     | 153 ++++++++++---------
 .../executor/impl/AbstractQueryExecutor.java    |   9 +-
 .../scan/executor/infos/BlockExecutionInfo.java |  14 ++
 .../core/scan/model/QueryModelBuilder.java      |   3 +-
 .../core/scan/processor/BlockScan.java          |   1 +
 .../core/scan/processor/DataBlockIterator.java  |   1 +
 .../core/scan/result/BlockletScannedResult.java | 129 +++++++++++++++-
 .../result/impl/FilterQueryScannedResult.java   |  20 ++-
 .../impl/NonFilterQueryScannedResult.java       |  68 ++++++++-
 .../AbstractDetailQueryResultIterator.java      |  21 +++
 .../AbstractSearchModeResultIterator.java       |  21 +++
 .../scanner/impl/BlockletFilterScanner.java     |   3 +-
 .../scan/scanner/impl/BlockletFullScanner.java  |   5 +-
 .../core/scan/wrappers/ByteArrayWrapper.java    |  19 +++
 .../core/stats/QueryStatisticsConstants.java    |  22 +++
 .../carbondata/core/stats/TaskStatistics.java   |   6 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   5 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +
 .../merger/CarbonCompactionExecutor.java        |  39 ++++-
 22 files changed, 602 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c4b0507..b902359 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1568,10 +1568,12 @@ public final class CarbonCommonConstants {
    * memory.
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION =
       "carbon.enable.page.level.reader.in.compaction";
 
-  public static final String 
CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "true";
+  // Note: If this property is set to true it can impact compaction 
performance as IO will increase
+  public static final String 
CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "false";
 
   @CarbonProperty
   public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 9ac5a06..a160778 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.collector.impl;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.util.List;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -30,6 +31,7 @@ import 
org.apache.carbondata.core.scan.executor.infos.MeasureInfo;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -52,10 +54,17 @@ public abstract class AbstractScannedResultCollector 
implements ScannedResultCol
    */
   DimensionInfo dimensionInfo;
 
+  /**
+   * model object to be used for collecting query statistics during normal 
query execution,
+   * compaction and other flows that uses the query flow
+   */
+  QueryStatisticsModel queryStatisticsModel;
+
   AbstractScannedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     this.executionInfo = blockExecutionInfos;
     measureInfo = blockExecutionInfos.getMeasureInfo();
     dimensionInfo = blockExecutionInfos.getDimensionInfo();
+    this.queryStatisticsModel = blockExecutionInfos.getQueryStatisticsModel();
   }
 
   protected void fillMeasureData(Object[] msrValues, int offset,
@@ -83,6 +92,46 @@ public abstract class AbstractScannedResultCollector 
implements ScannedResultCol
     }
   }
 
+  /**
+   * This method will be used to fill measure data column wise
+   *
+   * @param rows
+   * @param offset
+   * @param scannedResult
+   */
+  protected void fillMeasureDataBatch(List<Object[]> rows, int offset,
+      BlockletScannedResult scannedResult) {
+    int measureExistIndex = 0;
+    for (short i = 0; i < measureInfo.getMeasureDataTypes().length; i++) {
+      // if measure exists is block then pass measure column
+      // data chunk to the collector
+      if (measureInfo.getMeasureExists()[i]) {
+        ProjectionMeasure queryMeasure = 
executionInfo.getProjectionMeasures()[measureExistIndex];
+        ColumnPage measureChunk =
+            
scannedResult.getMeasureChunk(measureInfo.getMeasureOrdinals()[measureExistIndex]);
+        for (short j = 0; j < rows.size(); j++) {
+          Object[] rowValues = rows.get(j);
+          rowValues[i + offset] =
+              getMeasureData(measureChunk, 
scannedResult.getValidRowIds().get(j),
+                  queryMeasure.getMeasure());
+        }
+        measureExistIndex++;
+      } else {
+        // if not then get the default value and use that value in aggregation
+        Object defaultValue = measureInfo.getDefaultValues()[i];
+        if (null != defaultValue && 
DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) {
+          // convert data type as per the computing engine
+          defaultValue =
+              
DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(defaultValue);
+        }
+        for (short j = 0; j < rows.size(); j++) {
+          Object[] rowValues = rows.get(j);
+          rowValues[i + offset] = defaultValue;
+        }
+      }
+    }
+  }
+
   Object getMeasureData(ColumnPage dataChunk, int index, CarbonMeasure 
carbonMeasure) {
     if (!dataChunk.getNullBits().get(index)) {
       DataType dataType = carbonMeasure.getDataType();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 0780675..d28df0a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -16,25 +16,23 @@
  */
 package org.apache.carbondata.core.scan.collector.impl;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 
 /**
  * It is not a collector it is just a scanned result holder.
  */
 public class RawBasedResultCollector extends AbstractScannedResultCollector {
 
-  byte[] dictionaryKeyArray;
-
-  byte[][] noDictionaryKeyArray;
-
-  private byte[][] complexTypeKeyArray;
-
   public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
   }
@@ -44,37 +42,114 @@ public class RawBasedResultCollector extends 
AbstractScannedResultCollector {
    * it will keep track of how many record is processed, to handle limit 
scenario
    */
   @Override
-  public List<Object[]> collectResultInRow(BlockletScannedResult 
scannedResult, int batchSize) {
+  public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult,
+      int batchSize) {
+    long startTime = System.currentTimeMillis();
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     ProjectionMeasure[] queryMeasures = executionInfo.getProjectionMeasures();
     // scan the record and add to list
-    int rowCounter = 0;
-    while (scannedResult.hasNext() && rowCounter < batchSize) {
-      scanResultAndGetData(scannedResult);
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+    scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures);
+    QueryStatistic resultPrepTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.RESULT_PREP_TIME);
+    resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME,
+        resultPrepTime.getCount() + (System.currentTimeMillis() - startTime));
+    return listBasedResult;
+  }
+
+  /**
+   * This method will scan and fill dimension and measure data
+   *
+   * @param scannedResult
+   * @param batchSize
+   * @param listBasedResult
+   * @param queryMeasures
+   */
+  protected void scanAndFillData(BlockletScannedResult scannedResult, int 
batchSize,
+      List<Object[]> listBasedResult, ProjectionMeasure[] queryMeasures) {
+    int numberOfPages = scannedResult.numberOfpages();
+    // loop will exit once the batchSize data has been read or the pages have 
been exhausted
+    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+      if (currentPageRowCount == 0) {
+        scannedResult.incrementPageCounter();
         continue;
       }
-      prepareRow(scannedResult, listBasedResult, queryMeasures);
-      rowCounter++;
+      int rowCounter = scannedResult.getRowCounter();
+      // getRowCounter holds total number rows processed. Calculate the
+      // Left over space through getRowCounter only.
+      int availableRows = currentPageRowCount - rowCounter;
+      // rows available in current page that can be processed from current page
+      int availableBatchRowCount = Math.min(batchSize, availableRows);
+      // this condition will be true if no data left in the current 
block/blocklet to be scanned
+      if (availableBatchRowCount < 1) {
+        break;
+      }
+      if (batchSize > availableRows) {
+        batchSize = batchSize - availableRows;
+      } else {
+        // this is done because in IUD cases actuals rows fetch can be less 
than batch size as
+        // some of the rows could have deleted. So in those cases batchSize 
need to be
+        // re initialized with left over value
+        batchSize = 0;
+      }
+      // fill dimension data
+      fillDimensionData(scannedResult, listBasedResult, queryMeasures, 
availableBatchRowCount);
+      fillMeasureData(scannedResult, listBasedResult);
+      // increment the number of rows scanned in scanned result statistics
+      incrementScannedResultRowCounter(scannedResult, availableBatchRowCount);
+      // assign the left over rows to batch size if the number of rows fetched 
are lesser
+      // than batchSize
+      if (listBasedResult.size() < availableBatchRowCount) {
+        batchSize += availableBatchRowCount - listBasedResult.size();
+      }
     }
-    return listBasedResult;
   }
 
-  void prepareRow(BlockletScannedResult scannedResult, List<Object[]> 
listBasedResult,
-      ProjectionMeasure[] queryMeasures) {
-    Object[] row = new Object[1 + queryMeasures.length];
-    ByteArrayWrapper wrapper = new ByteArrayWrapper();
-    wrapper.setDictionaryKey(dictionaryKeyArray);
-    wrapper.setNoDictionaryKeys(noDictionaryKeyArray);
-    wrapper.setComplexTypesKeys(complexTypeKeyArray);
-    row[0] = wrapper;
-    fillMeasureData(row, 1, scannedResult);
-    listBasedResult.add(row);
+  private void fillDimensionData(BlockletScannedResult scannedResult,
+      List<Object[]> listBasedResult, ProjectionMeasure[] queryMeasures, int 
batchSize) {
+    long startTime = System.currentTimeMillis();
+    List<byte[]> dictionaryKeyArrayBatch = 
scannedResult.getDictionaryKeyArrayBatch(batchSize);
+    List<byte[][]> noDictionaryKeyArrayBatch =
+        scannedResult.getNoDictionaryKeyArrayBatch(batchSize);
+    List<byte[][]> complexTypeKeyArrayBatch = 
scannedResult.getComplexTypeKeyArrayBatch(batchSize);
+    // it will same for one blocklet so can be computed only once
+    byte[] implicitColumnByteArray = scannedResult.getBlockletId()
+        .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    // Note: size check in for loop is for dictionaryKeyArrayBatch as this 
size can be lesser than
+    // batch size in case of IUD scenarios
+    for (int i = 0; i < dictionaryKeyArrayBatch.size(); i++) {
+      // 1 for ByteArrayWrapper object which will contain dictionary and no 
dictionary data
+      Object[] row = new Object[1 + queryMeasures.length];
+      ByteArrayWrapper wrapper = new ByteArrayWrapper();
+      wrapper.setDictionaryKey(dictionaryKeyArrayBatch.get(i));
+      wrapper.setNoDictionaryKeys(noDictionaryKeyArrayBatch.get(i));
+      wrapper.setComplexTypesKeys(complexTypeKeyArrayBatch.get(i));
+      wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
+      row[0] = wrapper;
+      listBasedResult.add(row);
+    }
+    QueryStatistic keyColumnFillingTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME);
+    
keyColumnFillingTime.addCountStatistic(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME,
+        keyColumnFillingTime.getCount() + (System.currentTimeMillis() - 
startTime));
+  }
+
+  private void fillMeasureData(BlockletScannedResult scannedResult,
+      List<Object[]> listBasedResult) {
+    long startTime = System.currentTimeMillis();
+    // if list is not empty after filling the dimension data then only fill 
the measure data
+    if (!listBasedResult.isEmpty()) {
+      fillMeasureDataBatch(listBasedResult, 1, scannedResult);
+    }
+    QueryStatistic measureFillingTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.MEASURE_FILLING_TIME);
+    
measureFillingTime.addCountStatistic(QueryStatisticsConstants.MEASURE_FILLING_TIME,
+        measureFillingTime.getCount() + (System.currentTimeMillis() - 
startTime));
   }
 
-  void scanResultAndGetData(BlockletScannedResult scannedResult) {
-    dictionaryKeyArray = scannedResult.getDictionaryKeyArray();
-    noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray();
-    complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+  private void incrementScannedResultRowCounter(BlockletScannedResult 
scannedResult,
+      int batchSize) {
+    // increment row counter by batch size as those many number of rows have 
been processed at once
+    scannedResult.incrementCounter(batchSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index d776b5e..1c440cf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -33,6 +33,9 @@ import 
org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -151,106 +154,112 @@ public class RestructureBasedRawResultCollector extends 
RawBasedResultCollector
    */
   @Override
   public List<Object[]> collectResultInRow(BlockletScannedResult 
scannedResult, int batchSize) {
+    long startTime = System.currentTimeMillis();
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     ProjectionMeasure[] queryMeasures = executionInfo.getActualQueryMeasures();
     // scan the record and add to list
-    int rowCounter = 0;
-    while (scannedResult.hasNext() && rowCounter < batchSize) {
-      scanResultAndGetData(scannedResult);
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
-        continue;
-      }
-      // re-fill dictionary and no dictionary key arrays for the newly added 
columns
-      if (dimensionInfo.isDictionaryColumnAdded()) {
-        dictionaryKeyArray = 
fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray);
-      }
-      if (dimensionInfo.isNoDictionaryColumnAdded()) {
-        noDictionaryKeyArray = 
fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray);
-      }
-      prepareRow(scannedResult, listBasedResult, queryMeasures);
-      rowCounter++;
+    scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures);
+    // re-fill dictionary and no dictionary key arrays for the newly added 
columns
+    if (dimensionInfo.isDictionaryColumnAdded()) {
+      fillDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
+    }
+    if (dimensionInfo.isNoDictionaryColumnAdded()) {
+      fillNoDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
     }
+    QueryStatistic resultPrepTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.RESULT_PREP_TIME);
+    resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME,
+        resultPrepTime.getCount() + (System.currentTimeMillis() - startTime));
     return listBasedResult;
   }
 
   /**
    * This method will fill the dictionary key array with newly added 
dictionary columns if any
    *
-   * @param dictionaryKeyArray
+   * @param rows
    * @return
    */
-  private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] 
dictionaryKeyArray) {
-    ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
-    int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount();
-    long[] keyArray = null;
-    if (null != updatedCurrentBlockKeyGenerator) {
-      keyArray = 
updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray);
-      newKeyArrayLength += keyArray.length;
-    }
-    long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength];
-    int existingColumnKeyArrayIndex = 0;
-    int newKeyArrayIndex = 0;
-    for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
-      if (CarbonUtil
-          .hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), 
Encoding.DICTIONARY)) {
-        // if dimension exists then add the key array value else add the 
default value
-        if (dimensionInfo.getDimensionExists()[i]) {
-          keyArrayWithNewAddedColumns[newKeyArrayIndex++] = 
keyArray[existingColumnKeyArrayIndex++];
-        } else {
-          long defaultValueAsLong;
-          Object defaultValue = dimensionInfo.getDefaultValues()[i];
-          if (null != defaultValue) {
-            defaultValueAsLong = ((Integer) defaultValue).longValue();
+  private void fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> 
rows) {
+    for (Object[] row : rows) {
+      ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
+      byte[] dictKeyArray = byteArrayWrapper.getDictionaryKey();
+      ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
+      int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount();
+      long[] keyArray = null;
+      if (null != updatedCurrentBlockKeyGenerator) {
+        keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictKeyArray);
+        newKeyArrayLength += keyArray.length;
+      }
+      long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength];
+      int existingColumnKeyArrayIndex = 0;
+      int newKeyArrayIndex = 0;
+      for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
+        if 
(CarbonUtil.hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(),
+            Encoding.DICTIONARY)) {
+          // if dimension exists then add the key array value else add the 
default value
+          if (dimensionInfo.getDimensionExists()[i]) {
+            keyArrayWithNewAddedColumns[newKeyArrayIndex++] =
+                keyArray[existingColumnKeyArrayIndex++];
           } else {
-            defaultValueAsLong = 
(long)CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+            long defaultValueAsLong;
+            Object defaultValue = dimensionInfo.getDefaultValues()[i];
+            if (null != defaultValue) {
+              defaultValueAsLong = ((Integer) defaultValue).longValue();
+            } else {
+              defaultValueAsLong = (long) 
CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+            }
+            keyArrayWithNewAddedColumns[newKeyArrayIndex++] = 
defaultValueAsLong;
           }
-          keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong;
         }
       }
+      try {
+        dictKeyArray = 
restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns);
+        byteArrayWrapper.setDictionaryKey(dictKeyArray);
+      } catch (KeyGenException e) {
+        throw new RuntimeException(e);
+      }
     }
-    try {
-      dictionaryKeyArray = 
restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns);
-    } catch (KeyGenException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return dictionaryKeyArray;
   }
 
   /**
    * This method will fill the no dictionary byte array with newly added no 
dictionary columns
    *
-   * @param noDictionaryKeyArray
+   * @param rows
    * @return
    */
-  private byte[][] fillNoDictionaryKeyArrayWithLatestSchema(byte[][] 
noDictionaryKeyArray) {
-    ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
-    byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
-        new byte[noDictionaryKeyArray.length + 
dimensionInfo.getNewNoDictionaryColumnCount()][];
-    int existingColumnValueIndex = 0;
-    int newKeyArrayIndex = 0;
-    for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
-      if 
(!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)
-          && 
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
-        // if dimension exists then add the byte array value else add the 
default value
-        if (dimensionInfo.getDimensionExists()[i]) {
-          noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-              noDictionaryKeyArray[existingColumnValueIndex++];
-        } else {
-          byte[] newColumnDefaultValue = null;
-          Object defaultValue = dimensionInfo.getDefaultValues()[i];
-          if (null != defaultValue) {
-            newColumnDefaultValue = (byte[]) defaultValue;
-          } else if (actualQueryDimensions[i].getDimension().getDataType() == 
DataTypes.STRING) {
-            newColumnDefaultValue =
-                DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
-                    CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> 
rows) {
+    for (Object[] row : rows) {
+      ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
+      byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      ProjectionDimension[] actualQueryDimensions = 
executionInfo.getActualQueryDimensions();
+      byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
+          new byte[noDictKeyArray.length + 
dimensionInfo.getNewNoDictionaryColumnCount()][];
+      int existingColumnValueIndex = 0;
+      int newKeyArrayIndex = 0;
+      for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
+        if 
(!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)
+            && 
!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          // if dimension exists then add the byte array value else add the 
default value
+          if (dimensionInfo.getDimensionExists()[i]) {
+            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                noDictKeyArray[existingColumnValueIndex++];
           } else {
-            newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+            byte[] newColumnDefaultValue = null;
+            Object defaultValue = dimensionInfo.getDefaultValues()[i];
+            if (null != defaultValue) {
+              newColumnDefaultValue = (byte[]) defaultValue;
+            } else if (actualQueryDimensions[i].getDimension().getDataType() 
== DataTypes.STRING) {
+              newColumnDefaultValue =
+                  
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
+                      CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else {
+              newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+            }
+            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = 
newColumnDefaultValue;
           }
-          noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = 
newColumnDefaultValue;
         }
       }
+      
byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
     }
-    return noDictionaryKeyArrayWithNewlyAddedColumns;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index bc40be8..ed0350f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -116,9 +116,12 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
         .getCarbonTableIdentifier().getTableName());
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
-    queryProperties.queryStatisticsRecorder =
-        
CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
-    queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+    queryProperties.queryStatisticsRecorder = 
queryModel.getStatisticsRecorder();
+    if (null == queryProperties.queryStatisticsRecorder) {
+      queryProperties.queryStatisticsRecorder =
+          
CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
+      
queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+    }
     QueryStatistic queryStatistic = new QueryStatistic();
     // sort the block info
     // so block will be loaded in sorted order this will be required for

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index 6633195..3165bfd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -26,6 +26,7 @@ import 
org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
 /**
  * Below class will have all the properties which needed during query execution
@@ -216,6 +217,11 @@ public class BlockExecutionInfo {
   private boolean requiredRowId;
 
   /**
+   * model for collecting query stats
+   */
+  private QueryStatisticsModel queryStatisticsModel;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -631,4 +637,12 @@ public class BlockExecutionInfo {
   public void setRequiredRowId(boolean requiredRowId) {
     this.requiredRowId = requiredRowId;
   }
+
+  public QueryStatisticsModel getQueryStatisticsModel() {
+    return queryStatisticsModel;
+  }
+
+  public void setQueryStatisticsModel(QueryStatisticsModel 
queryStatisticsModel) {
+    this.queryStatisticsModel = queryStatisticsModel;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index f7b828e..31f5d0b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -96,9 +96,8 @@ public class QueryModelBuilder {
     return this;
   }
 
-  public QueryModelBuilder enableReadPageByPage() {
+  public void enableReadPageByPage() {
     this.readPageByPage = true;
-    return this;
   }
 
   public QueryModel build() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
index 4d46b3b..88f1503 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
@@ -45,6 +45,7 @@ public class BlockScan {
   public BlockScan(BlockExecutionInfo blockExecutionInfo, FileReader 
fileReader,
       QueryStatisticsModel queryStatisticsModel) {
     this.blockExecutionInfo = blockExecutionInfo;
+    this.blockExecutionInfo.setQueryStatisticsModel(queryStatisticsModel);
     this.fileReader = fileReader;
     this.blockletIterator = new 
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
         blockExecutionInfo.getNumberOfBlockToScan());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
index fde4e55..1eeb579 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -85,6 +85,7 @@ public class DataBlockIterator extends 
CarbonIterator<List<Object[]>> {
   public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader 
fileReader,
       int batchSize, QueryStatisticsModel queryStatisticsModel, 
ExecutorService executorService) {
     this.blockExecutionInfo = blockExecutionInfo;
+    this.blockExecutionInfo.setQueryStatisticsModel(queryStatisticsModel);
     this.fileReader = fileReader;
     blockletIterator = new 
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
         blockExecutionInfo.getNumberOfBlockToScan());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 33caa98..35d4f51 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -21,6 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -39,6 +41,9 @@ import 
org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -62,7 +67,7 @@ public abstract class BlockletScannedResult {
   /**
    * key size of the fixed length column
    */
-  private int fixedLengthKeySize;
+  protected int fixedLengthKeySize;
   /**
    * total number of filtered rows for each page
    */
@@ -142,7 +147,12 @@ public abstract class BlockletScannedResult {
    */
   private String blockletNumber;
 
-  public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo) {
+  protected List<Integer> validRowIds;
+
+  protected QueryStatisticsModel queryStatisticsModel;
+
+  public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
     this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
     this.noDictionaryColumnChunkIndexes = 
blockExecutionInfo.getNoDictionaryColumnChunkIndexes();
     this.dictionaryColumnChunkIndexes = 
blockExecutionInfo.getDictionaryColumnChunkIndex();
@@ -151,6 +161,8 @@ public abstract class BlockletScannedResult {
     this.complexParentBlockIndexes = 
blockExecutionInfo.getComplexColumnParentBlockIndexes();
     this.totalDimensionsSize = 
blockExecutionInfo.getProjectionDimensions().length;
     this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
+    this.queryStatisticsModel = queryStatisticsModel;
+    validRowIds = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
 
   /**
@@ -324,6 +336,16 @@ public abstract class BlockletScannedResult {
   }
 
   /**
+   * This method will add the delta to row counter
+   *
+   * @param delta
+   */
+  public void incrementCounter(int delta) {
+    rowCounter += delta;
+    currentRow += delta;
+  }
+
+  /**
    * Just increment the page counter and reset the remaining counters.
    */
   public void incrementPageCounter() {
@@ -344,6 +366,7 @@ public abstract class BlockletScannedResult {
     if (pageCounter >= pageFilteredRowCount.length) {
       return;
     }
+    long startTime = System.currentTimeMillis();
     for (int i = 0; i < dimensionColumnPages.length; i++) {
       if (dimensionColumnPages[i][pageCounter] == null && 
dimRawColumnChunks[i] != null) {
         dimensionColumnPages[i][pageCounter] =
@@ -357,6 +380,10 @@ public abstract class BlockletScannedResult {
             msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter);
       }
     }
+    QueryStatistic pageUncompressTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
+    
pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
+        pageUncompressTime.getCount() + (System.currentTimeMillis() - 
startTime));
   }
 
   // free the memory for the last page chunk
@@ -373,6 +400,7 @@ public abstract class BlockletScannedResult {
         measureColumnPages[i][pageCounter - 1] = null;
       }
     }
+    clearValidRowIdList();
   }
 
   public int numberOfpages() {
@@ -417,6 +445,75 @@ public abstract class BlockletScannedResult {
   }
 
   /**
+   * This method will return the bitsets for valid row Id's to be scanned
+   *
+   * @param rowId
+   * @param batchSize
+   * @return
+   */
+  protected void fillValidRowIdsBatchFilling(int rowId, int batchSize) {
+    // row id will be different for every batch so clear it before filling
+    clearValidRowIdList();
+    int startPosition = rowId;
+    for (int i = 0; i < batchSize; i++) {
+      if (!containsDeletedRow(startPosition)) {
+        validRowIds.add(startPosition);
+      }
+      startPosition++;
+    }
+  }
+
+  private void clearValidRowIdList() {
+    if (null != validRowIds && !validRowIds.isEmpty()) {
+      validRowIds.clear();
+    }
+  }
+
+  public List<Integer> getValidRowIds() {
+    return validRowIds;
+  }
+
+  /**
+   * Below method will be used to get the complex type keys array based
+   * on row id for all the complex type dimension selected in query.
+   * This method will be used to fill the data column wise
+   *
+   * @return complex type key array for all the complex dimension selected in 
query
+   */
+  protected List<byte[][]> getComplexTypeKeyArrayBatch() {
+    List<byte[][]> complexTypeArrayList = new ArrayList<>(validRowIds.size());
+    byte[][] complexTypeData = null;
+    // everyTime it is initialized new as in case of prefetch it can modify 
the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      complexTypeData = new byte[complexParentBlockIndexes.length][];
+      complexTypeArrayList.add(complexTypeData);
+    }
+    for (int i = 0; i < complexParentBlockIndexes.length; i++) {
+      // get the genericQueryType for 1st column
+      GenericQueryType genericQueryType =
+          complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
+      for (int j = 0; j < validRowIds.size(); j++) {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutput = new DataOutputStream(byteStream);
+        try {
+          genericQueryType
+              .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, 
validRowIds.get(j),
+                  pageCounter, dataOutput);
+          // get the key array in columnar way
+          byte[][] complexKeyArray = complexTypeArrayList.get(j);
+          complexKeyArray[i] = byteStream.toByteArray();
+        } catch (IOException e) {
+          LOGGER.error(e);
+        } finally {
+          CarbonUtil.closeStreams(dataOutput);
+          CarbonUtil.closeStreams(byteStream);
+        }
+      }
+    }
+    return complexTypeArrayList;
+  }
+
+  /**
    * @return blockletId
    */
   public String getBlockletId() {
@@ -527,6 +624,8 @@ public abstract class BlockletScannedResult {
         }
       }
     }
+    clearValidRowIdList();
+    validRowIds = null;
   }
 
   /**
@@ -568,6 +667,14 @@ public abstract class BlockletScannedResult {
   public abstract int[] getDictionaryKeyIntegerArray();
 
   /**
+   * Method to fill each dictionary column data column wise
+   *
+   * @param batchSize
+   * @return
+   */
+  public abstract List<byte[]> getDictionaryKeyArrayBatch(int batchSize);
+
+  /**
    * Below method will be used to get the complex type key array
    *
    * @return complex type key array
@@ -575,6 +682,15 @@ public abstract class BlockletScannedResult {
   public abstract byte[][] getComplexTypeKeyArray();
 
   /**
+   * Below method will be used to get the complex type key array
+   * This method will fill the data column wise for the given batch size
+   *
+   * @param batchSize
+   * @return complex type key array
+   */
+  public abstract List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize);
+
+  /**
    * Below method will be used to get the no dictionary key
    * array for all the no dictionary dimension selected in query
    *
@@ -583,6 +699,15 @@ public abstract class BlockletScannedResult {
   public abstract byte[][] getNoDictionaryKeyArray();
 
   /**
+   * Below method will be used to get the dimension key array
+   * for all the no dictionary dimension present in the query
+   * This method will fill the data column wise for the given batch size
+   *
+   * @return no dictionary keys for all no dictionary dimension
+   */
+  public abstract List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize);
+
+  /**
    * Mark the filtered rows in columnar batch. These rows will not be added to 
vector batches later.
    * @param columnarBatch
    * @param startRow

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index bcc5634..7de3e71 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -16,9 +16,12 @@
  */
 package org.apache.carbondata.core.scan.result.impl;
 
+import java.util.List;
+
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
 /**
  * Result provider class in case of filter query
@@ -27,8 +30,9 @@ import 
org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
  */
 public class FilterQueryScannedResult extends BlockletScannedResult {
 
-  public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) 
{
-    super(tableBlockExecutionInfos);
+  public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(tableBlockExecutionInfos, queryStatisticsModel);
   }
 
   /**
@@ -49,6 +53,10 @@ public class FilterQueryScannedResult extends 
BlockletScannedResult {
     return 
getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
+  @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * Below method will be used to get the complex type key array
    *
@@ -58,6 +66,10 @@ public class FilterQueryScannedResult extends 
BlockletScannedResult {
     return getComplexTypeKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
+  @Override public List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * Below method will be used to get the no dictionary key
    * array for all the no dictionary dimension selected in query
@@ -68,6 +80,10 @@ public class FilterQueryScannedResult extends 
BlockletScannedResult {
     return getNoDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
+  @Override public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * will return the current valid row id
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index 06687c2..5956595 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -16,8 +16,12 @@
  */
 package org.apache.carbondata.core.scan.result.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
 /**
  * Result provide class for non filter query
@@ -26,8 +30,9 @@ import 
org.apache.carbondata.core.scan.result.BlockletScannedResult;
  */
 public class NonFilterQueryScannedResult extends BlockletScannedResult {
 
-  public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) {
-    super(blockExecutionInfo);
+  public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo, queryStatisticsModel);
   }
 
   /**
@@ -48,6 +53,32 @@ public class NonFilterQueryScannedResult extends 
BlockletScannedResult {
     return getDictionaryKeyIntegerArray(currentRow);
   }
 
+  @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
+    // rowId from where computing need to start
+    int startRowId = currentRow + 1;
+    fillValidRowIdsBatchFilling(startRowId, batchSize);
+    List<byte[]> dictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
+    int[] columnDataOffsets = null;
+    byte[] completeKey = null;
+    // everyTime it is initialized new as in case of prefetch it can modify 
the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      completeKey = new byte[fixedLengthKeySize];
+      dictionaryKeyArrayList.add(completeKey);
+    }
+    // initialize offset array onli if data is present
+    if (this.dictionaryColumnChunkIndexes.length > 0) {
+      columnDataOffsets = new int[validRowIds.size()];
+    }
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      for (int j = 0; j < validRowIds.size(); j++) {
+        columnDataOffsets[j] += 
dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+            .fillRawData(validRowIds.get(j), columnDataOffsets[j], 
dictionaryKeyArrayList.get(j),
+                
columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+      }
+    }
+    return dictionaryKeyArrayList;
+  }
+
   /**
    * Below method will be used to get the complex type key array
    *
@@ -57,6 +88,10 @@ public class NonFilterQueryScannedResult extends 
BlockletScannedResult {
     return getComplexTypeKeyArray(currentRow);
   }
 
+  @Override public List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize) {
+    return getComplexTypeKeyArrayBatch();
+  }
+
   /**
    * Below method will be used to get the no dictionary key array for all the
    * no dictionary dimension selected in query
@@ -68,6 +103,35 @@ public class NonFilterQueryScannedResult extends 
BlockletScannedResult {
   }
 
   /**
+   * Below method will be used to get the dimension key array
+   * for all the no dictionary dimension present in the query
+   * This method will fill the data column wise for the given batch size
+   *
+   * @return no dictionary keys for all no dictionary dimension
+   */
+  @Override public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
+    List<byte[][]> noDictionaryKeyArrayList = new 
ArrayList<>(validRowIds.size());
+    byte[][] noDictionaryColumnsKeys = null;
+    // everyTime it is initialized new as in case of prefetch it can modify 
the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      noDictionaryColumnsKeys = new 
byte[noDictionaryColumnChunkIndexes.length][];
+      noDictionaryKeyArrayList.add(noDictionaryColumnsKeys);
+    }
+    int columnPosition = 0;
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      for (int j = 0; j < validRowIds.size(); j++) {
+        byte[][] noDictionaryArray = noDictionaryKeyArrayList.get(j);
+        noDictionaryArray[columnPosition] =
+            
dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
+                .getChunkData(validRowIds.get(j));
+      }
+      columnPosition++;
+    }
+    return noDictionaryKeyArrayList;
+  }
+
+
+  /**
    * will return the current valid row id
    *
    * @return valid row id

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index bb23ff3..d975c20 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -294,6 +294,27 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
     queryStatisticsModel.getStatisticsTypeAndObjMap()
         .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime);
     queryStatisticsModel.getRecorder().recordStatistics(readTime);
+
+    // dimension filling time
+    QueryStatistic keyColumnFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, 
keyColumnFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(keyColumnFilingTime);
+    // measure filling time
+    QueryStatistic measureFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.MEASURE_FILLING_TIME, measureFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(measureFilingTime);
+    // page Io Time
+    QueryStatistic pageUncompressTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME, 
pageUncompressTime);
+    queryStatisticsModel.getRecorder().recordStatistics(pageUncompressTime);
+    // result preparation time
+    QueryStatistic resultPreparationTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.RESULT_PREP_TIME, resultPreparationTime);
+    queryStatisticsModel.getRecorder().recordStatistics(resultPreparationTime);
   }
 
   public void processNextBatch(CarbonColumnarBatch columnarBatch) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
index 1b52e55..c7f5c51 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
@@ -113,6 +113,27 @@ public abstract class AbstractSearchModeResultIterator
     queryStatisticsModel.getStatisticsTypeAndObjMap()
         .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime);
     queryStatisticsModel.getRecorder().recordStatistics(readTime);
+
+    // dimension filling time
+    QueryStatistic keyColumnFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, 
keyColumnFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(keyColumnFilingTime);
+    // measure filling time
+    QueryStatistic measureFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.MEASURE_FILLING_TIME, measureFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(measureFilingTime);
+    // page Io Time
+    QueryStatistic pageUncompressTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME, 
pageUncompressTime);
+    queryStatisticsModel.getRecorder().recordStatistics(pageUncompressTime);
+    // result preparation time
+    QueryStatistic resultPreparationTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.RESULT_PREP_TIME, resultPreparationTime);
+    queryStatisticsModel.getRecorder().recordStatistics(resultPreparationTime);
     return queryStatisticsModel;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index ec49c77..37df0e5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -185,7 +185,8 @@ public class BlockletFilterScanner extends 
BlockletFullScanner {
       return createEmptyResult();
     }
 
-    BlockletScannedResult scannedResult = new 
FilterQueryScannedResult(blockExecutionInfo);
+    BlockletScannedResult scannedResult =
+        new FilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
     scannedResult.setBlockletId(
         blockExecutionInfo.getBlockIdString() + 
CarbonCommonConstants.FILE_SEPARATOR +
             rawBlockletColumnChunks.getDataBlock().blockletIndex());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index 0cb4059..a48804c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -61,7 +61,8 @@ public class BlockletFullScanner implements BlockletScanner {
       RawBlockletColumnChunks rawBlockletColumnChunks)
       throws IOException, FilterUnsupportedException {
     long startTime = System.currentTimeMillis();
-    BlockletScannedResult scannedResult = new 
NonFilterQueryScannedResult(blockExecutionInfo);
+    BlockletScannedResult scannedResult =
+        new NonFilterQueryScannedResult(blockExecutionInfo, 
queryStatisticsModel);
     QueryStatistic totalBlockletStatistic = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     
totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
@@ -163,7 +164,7 @@ public class BlockletFullScanner implements BlockletScanner 
{
 
   BlockletScannedResult createEmptyResult() {
     if (emptyResult == null) {
-      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo, 
queryStatisticsModel);
       emptyResult.setPageFilteredRowCount(new int[0]);
       emptyResult.setPageFilteredRowId(new int[0][]);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 6faae03..3d62d9e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -42,6 +42,11 @@ public class ByteArrayWrapper implements 
Comparable<ByteArrayWrapper>, Serializa
    */
   private byte[][] noDictionaryKeys;
 
+  /**
+   * contains value of implicit columns in byte array format
+   */
+  private byte[] implicitColumnByteArray;
+
   public ByteArrayWrapper() {
   }
 
@@ -192,4 +197,18 @@ public class ByteArrayWrapper implements 
Comparable<ByteArrayWrapper>, Serializa
     this.complexTypesKeys = complexTypesKeys;
   }
 
+  /**
+   * @return
+   */
+  public byte[] getImplicitColumnByteArray() {
+    return implicitColumnByteArray;
+  }
+
+  /**
+   * @param implicitColumnByteArray
+   */
+  public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
+    this.implicitColumnByteArray = implicitColumnByteArray;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
index c2cda7c..8a52294 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
@@ -58,6 +58,28 @@ public interface QueryStatisticsConstants {
 
   String PAGE_SCANNED = "The number of page scanned";
 
+  /**
+   * measure filling time includes time taken for reading all measures data 
from a given offset
+   * and adding each column data to an array. Includes total time for 1 query 
result iterator.
+   */
+  String MEASURE_FILLING_TIME = "measure filling time";
+
+  /**
+   * key column filling time includes time taken for reading all dimensions 
data from a given offset
+   * and filling each column data to byte array. Includes total time for 1 
query result iterator.
+   */
+  String KEY_COLUMN_FILLING_TIME = "key column filling time";
+
+  /**
+   * Time taken to uncompress a page data and decode dimensions and measures 
data in that page
+   */
+  String PAGE_UNCOMPRESS_TIME = "page uncompress time";
+
+  /**
+   * total of measure filling time, dimension filling time and page 
uncompressing time
+   */
+  String RESULT_PREP_TIME = "result preparation time";
+
   // clear no-use statistics timeout
   long CLEAR_STATISTICS_TIMEOUT = 60 * 1000 * 1000000L;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java 
b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
index 25e5542..9197196 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
@@ -45,7 +45,11 @@ public class TaskStatistics implements Serializable {
       new Column("total_pages", QueryStatisticsConstants.TOTAL_PAGE_SCANNED),
       new Column("scanned_pages", QueryStatisticsConstants.PAGE_SCANNED),
       new Column("valid_pages", QueryStatisticsConstants.VALID_PAGE_SCANNED),
-      new Column("result_size", QueryStatisticsConstants.RESULT_SIZE)
+      new Column("result_size", QueryStatisticsConstants.RESULT_SIZE),
+      new Column("key_column_filling_time", 
QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME),
+      new Column("measure_filling_time", 
QueryStatisticsConstants.MEASURE_FILLING_TIME),
+      new Column("page_uncompress_time", 
QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME),
+      new Column("result_preparation_time", 
QueryStatisticsConstants.RESULT_PREP_TIME)
   };
 
   private static final int numOfColumns = columns.length;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index ee40aa9..d29284f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -80,6 +80,7 @@ class CarbonMergerRDD[K, V](
   val tableId = carbonMergerMapping.tableId
 
   override def internalCompute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
+    val queryStartTime = System.currentTimeMillis()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -102,8 +103,6 @@ class CarbonMergerRDD[K, V](
       var processor: AbstractResultProcessor = null
       var rawResultIteratorList: java.util.List[RawResultIterator] = null
       try {
-
-
         // sorting the table block info List.
         val splitList = carbonSparkPartition.split.value.getAllSplits
         val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
@@ -234,7 +233,7 @@ class CarbonMergerRDD[K, V](
         // close all the query executor service and clean up memory acquired 
during query processing
         if (null != exec) {
           LOGGER.info("Cleaning up query resources acquired during compaction")
-          exec.close(rawResultIteratorList)
+          exec.close(rawResultIteratorList, queryStartTime)
         }
         // clean up the resources for processor
         if (null != processor) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index c668f7d..0d960d0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -465,6 +465,9 @@ class CarbonScanRDD[T: ClassTag](
         close()
         logStatistics(executionId, taskId, queryStartTime, 
model.getStatisticsRecorder, split)
       }
+      // create a statistics recorder
+      val recorder = 
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
+      model.setStatisticsRecorder(recorder)
       // initialize the reader
       reader.initialize(inputSplit, attemptContext)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 20103b1..a347313 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -42,7 +42,11 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
@@ -58,6 +62,8 @@ public class CarbonCompactionExecutor {
   private final SegmentProperties destinationSegProperties;
   private final Map<String, TaskBlockInfo> segmentMapping;
   private List<QueryExecutor> queryExecutorList;
+  private List<QueryStatisticsRecorder> queryStatisticsRecorders =
+      new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   private CarbonTable carbonTable;
   private QueryModel queryModel;
 
@@ -120,10 +126,13 @@ public class CarbonCompactionExecutor {
       for (String task : taskBlockListMapping) {
         list = taskBlockInfo.getTableBlockInfoList(task);
         Collections.sort(list);
-        LOGGER.info("for task -" + task + "-block size is -" + list.size());
+        LOGGER.info(
+            "for task -" + task + "- in segment id -" + segmentId + "- block 
size is -" + list
+                .size());
         queryModel.setTableBlockInfos(list);
-        resultList.add(new RawResultIterator(executeBlockList(list), 
sourceSegProperties,
-            destinationSegProperties, false));
+        resultList.add(
+            new RawResultIterator(executeBlockList(list, segmentId, task), 
sourceSegProperties,
+                destinationSegProperties, false));
       }
     }
     return resultList;
@@ -164,9 +173,14 @@ public class CarbonCompactionExecutor {
    * @param blockList
    * @return
    */
-  private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> 
blockList)
+  private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> 
blockList,
+      String segmentId, String taskId)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
+    QueryStatisticsRecorder executorRecorder = CarbonTimeStatisticsFactory
+        .createExecutorRecorder(queryModel.getQueryId() + "_" + segmentId + 
"_" + taskId);
+    queryStatisticsRecorders.add(executorRecorder);
+    queryModel.setStatisticsRecorder(executorRecorder);
     QueryExecutor queryExecutor = 
QueryExecutorFactory.getQueryExecutor(queryModel);
     queryExecutorList.add(queryExecutor);
     return queryExecutor.execute(queryModel);
@@ -176,7 +190,7 @@ public class CarbonCompactionExecutor {
    * Below method will be used
    * for cleanup
    */
-  public void close(List<RawResultIterator> rawResultIteratorList) {
+  public void close(List<RawResultIterator> rawResultIteratorList, long 
queryStartTime) {
     try {
       // close all the iterators. Iterators might not closed in case of 
compaction failure
       // or if process is killed
@@ -188,12 +202,26 @@ public class CarbonCompactionExecutor {
       for (QueryExecutor queryExecutor : queryExecutorList) {
         queryExecutor.finish();
       }
+      logStatistics(queryStartTime);
     } catch (QueryExecutionException e) {
       LOGGER.error(e, "Problem while close. Ignoring the exception");
     }
     clearDictionaryFromQueryModel();
   }
 
+  private void logStatistics(long queryStartTime) {
+    if (!queryStatisticsRecorders.isEmpty()) {
+      QueryStatistic queryStatistic = new QueryStatistic();
+      
queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+          System.currentTimeMillis() - queryStartTime);
+      for (QueryStatisticsRecorder recorder : queryStatisticsRecorders) {
+        recorder.recordStatistics(queryStatistic);
+        // print executor query statistics for each task_id
+        recorder.logStatistics();
+      }
+    }
+  }
+
   /**
    * This method will clear the dictionary access count after its usage is 
complete so
    * that column can be deleted form LRU cache whenever memory reaches 
threshold
@@ -223,6 +251,7 @@ public class CarbonCompactionExecutor {
       enablePageReader = Boolean.parseBoolean(
           
CarbonCommonConstants.CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT);
     }
+    LOGGER.info("Page level reader is set to: " + enablePageReader);
     return enablePageReader;
   }
 

Reply via email to