This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 51b37029733763bbb1ba396671f9b40d613c38ba
Author: kumarvishal09 <[email protected]>
AuthorDate: Wed Jun 12 20:52:00 2019 +0530

    [CARBONDATA-3447]Index server performance improvement
    
    Problem:
    When number of splits are high, index server performance is slow as
    compared to old flow(Driver caching). This is because data is transferred
    over network is more and causing performance bottleneck.
    
    Solution:
    1. If data transferred is less we can sent through network, but when it
    grows we can write to file and only send file name and in Main driver
    it will read the file and construct input split.
    2. Use snappy to compress the data, so data transferred through 
network/written
    to file size will be less, so IO time wont impact performance
    3. In main driver pruning is done in multiple thread, added same for
    index executor as now index executor will do the pruning
    4. In case of block cache no need to send blockletdetailinfo object as
    size is more and same can be constructed in executor from file footer
    
    This closes #3281
    
    Co-authored-by: kunal642 <[email protected]>
---
 .../core/constants/CarbonCommonConstants.java      |  45 +++-
 .../core/datamap/DataMapStoreManager.java          |  28 +++
 .../carbondata/core/datamap/DataMapUtil.java       |  19 +-
 .../core/datamap/DistributableDataMapFormat.java   |  80 +++---
 .../carbondata/core/datamap/TableDataMap.java      |  20 +-
 .../core/datastore/block/TableBlockInfo.java       |  14 ++
 .../carbondata/core/indexstore/Blocklet.java       |   6 +
 .../core/indexstore/BlockletDataMapIndexStore.java |  22 +-
 .../core/indexstore/BlockletDetailInfo.java        |  16 +-
 .../core/indexstore/ExtendedBlocklet.java          |  71 +++---
 .../core/indexstore/ExtendedBlockletWrapper.java   | 251 +++++++++++++++++++
 .../ExtendedBlockletWrapperContainer.java          | 160 ++++++++++++
 .../indexstore/blockletindex/BlockDataMap.java     |  13 +-
 .../blockletindex/BlockletDataMapModel.java        |  14 ++
 .../scan/executor/impl/AbstractQueryExecutor.java  |  15 +-
 .../core/scan/executor/util/QueryUtil.java         |  26 ++
 .../carbondata/core/statusmanager/FileFormat.java  |   4 -
 .../ExtendedByteArrayInputStream.java}             |  37 ++-
 .../stream}/ExtendedByteArrayOutputStream.java     |  19 +-
 .../ExtendedDataInputStream.java}                  |  38 ++-
 .../carbondata/core/util/BlockletDataMapUtil.java  |   4 +-
 .../carbondata/core/util/CarbonProperties.java     |  73 ++++++
 .../apache/carbondata/core/util/CarbonUtil.java    |  27 +++
 .../apache/carbondata/hadoop/CarbonInputSplit.java | 267 +++++++++++++++++++--
 .../carbondata/hadoop/CarbonMultiBlockSplit.java   |   4 +-
 .../carbondata/hadoop/CarbonRecordReader.java      |  14 +-
 .../hadoop/api/CarbonTableInputFormat.java         |   4 +-
 .../hadoop/util/CarbonVectorizedRecordReader.java  |  15 +-
 .../testsuite/datamap/FGDataMapTestCase.scala      |  14 +-
 .../org/apache/carbondata/spark/util/Util.java     |   3 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |   2 +-
 .../carbondata/indexserver/DataMapJobs.scala       |  67 +++---
 .../indexserver/DistributedPruneRDD.scala          | 150 ++++++++----
 .../indexserver/DistributedRDDUtils.scala          |  17 +-
 .../indexserver/DistributedShowCacheRDD.scala      |   2 +-
 .../carbondata/indexserver/IndexServer.scala       |  12 +-
 .../indexserver/InvalidateSegmentCacheRDD.scala    |   2 +-
 .../processing/merger/CarbonCompactionUtil.java    |  18 +-
 .../carbondata/sdk/file/arrow/ArrowConverter.java  |   1 +
 39 files changed, 1283 insertions(+), 311 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1201e1a..6833c8a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1461,6 +1461,14 @@ public final class CarbonCommonConstants {
   // block prune in multi-thread if files size more than 100K files.
   public static final int 
CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 100000;
 
+  /**
+   * max executor threads used for block pruning [1 to 4 threads]
+   */
+  @CarbonProperty public static final String 
CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING =
+      "carbon.max.executor.threads.for.block.pruning";
+
+  public static final String 
CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT = "4";
+
   
//////////////////////////////////////////////////////////////////////////////////////////
   // Datamap parameter start here
   
//////////////////////////////////////////////////////////////////////////////////////////
@@ -2189,9 +2197,42 @@ public final class CarbonCommonConstants {
 
   public static final String LOAD_SYNC_TIME = "load_sync_time";
 
-  public  static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
+  public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
           "carbon.index.server.max.jobname.length";
 
-  public  static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
+  public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
           "50";
+
+  @CarbonProperty
+  /**
+   * Max in memory serialization size after reaching threshold data will
+   * be written to file
+   */
+  public static final String CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD =
+      "carbon.index.server.inmemory.serialization.threshold.inKB";
+
+  /**
+   * default value for in memory serialization size
+   */
+  public static final String 
CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT = "300";
+
+  /**
+   * min value for in memory serialization size
+   */
+  public static final int CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MIN = 
100;
+
+  /**
+   * max value for in memory serialization size
+   */
+  public static final int CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MAX = 
102400;
+
+  /**
+   * will be used to write split serialize data when in memory threashold 
crosses the limit
+   */
+  public static final String CARBON_INDEX_SERVER_TEMP_PATH = 
"carbon.indexserver.temp.path";
+
+  /**
+   * index server temp file name
+   */
+  public static final String INDEX_SERVER_TEMP_FOLDER_NAME = "indexservertmp";
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 4d235c5..729c419 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -31,6 +31,7 @@ import 
org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
@@ -53,6 +54,7 @@ import static 
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassPro
 import static 
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -739,4 +741,30 @@ public final class DataMapStoreManager {
     }
   }
 
+  public synchronized void clearInvalidDataMaps(CarbonTable carbonTable, 
List<String> segmentNos,
+      String dataMapToClear) throws IOException {
+    List<TableDataMap> dataMaps = getAllDataMap(carbonTable);
+    List<TableDataMap> remainingDataMaps = new ArrayList<>();
+    if (StringUtils.isNotEmpty(dataMapToClear)) {
+      Iterator<TableDataMap> dataMapIterator = dataMaps.iterator();
+      while (dataMapIterator.hasNext()) {
+        TableDataMap tableDataMap = dataMapIterator.next();
+        if 
(dataMapToClear.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName()))
 {
+          for (String segment: segmentNos) {
+            tableDataMap.deleteSegmentDatamapData(segment);
+          }
+          tableDataMap.clear();
+        } else {
+          remainingDataMaps.add(tableDataMap);
+        }
+      }
+      getAllDataMaps().put(carbonTable.getTableUniqueName(), 
remainingDataMaps);
+    } else {
+      clearDataMaps(carbonTable.getTableUniqueName());
+      // clear the segment properties cache from executor
+      SegmentPropertiesAndSchemaHolder.getInstance()
+          .invalidate(carbonTable.getAbsoluteTableIdentifier());
+    }
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 2371a10..394a1dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -168,23 +168,28 @@ public class DataMapUtil {
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
-      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) throws 
IOException {
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
+      DataMapChooser dataMapChooser) throws IOException {
+    if (null == dataMapChooser) {
+      return blocklets;
+    }
     pruneSegments(segmentsToLoad, blocklets);
     List<ExtendedBlocklet> cgDataMaps = pruneDataMaps(table, 
filterResolverIntf, segmentsToLoad,
         partitions, blocklets,
-        DataMapLevel.CG);
+        DataMapLevel.CG, dataMapChooser);
     pruneSegments(segmentsToLoad, cgDataMaps);
     return pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
         partitions, cgDataMaps,
-        DataMapLevel.FG);
+        DataMapLevel.FG, dataMapChooser);
   }
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
-      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, 
DataMapLevel dataMapLevel)
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, 
DataMapLevel dataMapLevel,
+      DataMapChooser dataMapChooser)
       throws IOException {
     DataMapExprWrapper dataMapExprWrapper =
-        new DataMapChooser(table).chooseDataMap(dataMapLevel, 
filterResolverIntf);
+        dataMapChooser.chooseDataMap(dataMapLevel, filterResolverIntf);
     if (dataMapExprWrapper != null) {
       List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
       // Prune segments from already pruned blocklets
@@ -211,10 +216,6 @@ public class DataMapUtil {
       }
       return dataMapExprWrapper.pruneBlocklets(extendedBlocklets);
     }
-    // For all blocklets initialize the detail info so that it can be 
serialized to the driver.
-    for (ExtendedBlocklet blocklet : blocklets) {
-      blocklet.getDetailInfo();
-    }
     return blocklets;
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 0478b40..0f82f57 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -41,7 +40,6 @@ import 
org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -85,7 +83,9 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
 
   private String taskGroupDesc = "";
 
+  private String queryId = "";
 
+  private boolean isWriteToFile = true;
   DistributableDataMapFormat() {
 
   }
@@ -141,42 +141,11 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
         
distributable.getDistributable().getSegment().setReadCommittedScope(readCommittedScope);
         List<Segment> segmentsToLoad = new ArrayList<>();
         segmentsToLoad.add(distributable.getDistributable().getSegment());
-        if (isJobToClearDataMaps) {
-          if (StringUtils.isNotEmpty(dataMapToClear)) {
-            List<TableDataMap> dataMaps =
-                DataMapStoreManager.getInstance().getAllDataMap(table);
-            int i = 0;
-            for (TableDataMap tableDataMap : dataMaps) {
-              if (tableDataMap != null && dataMapToClear
-                  
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
-                tableDataMap.deleteSegmentDatamapData(
-                    ((DataMapDistributableWrapper) 
inputSplit).getDistributable().getSegment()
-                        .getSegmentNo());
-                tableDataMap.clear();
-                dataMaps.remove(i);
-                break;
-              }
-              i++;
-            }
-            
DataMapStoreManager.getInstance().getAllDataMaps().put(table.getTableUniqueName(),
-                dataMaps);
-          } else {
-            // if job is to clear datamaps just clear datamaps from cache and 
return
-            DataMapStoreManager.getInstance()
-                
.clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName());
-            // clear the segment properties cache from executor
-            SegmentPropertiesAndSchemaHolder.getInstance()
-                .invalidate(table.getAbsoluteTableIdentifier());
-          }
-          List<ExtendedBlocklet> list = new ArrayList<ExtendedBlocklet>();
-          list.add(new ExtendedBlocklet());
-          blockletIterator = list.iterator();
-          return;
-        } else if (invalidSegments.size() > 0) {
-          // clear the segmentMap and from cache in executor when there are 
invalid segments
-          DataMapStoreManager.getInstance().clearInvalidSegments(table, 
invalidSegments);
-        }
         List<ExtendedBlocklet> blocklets = new ArrayList<>();
+        DataMapChooser dataMapChooser = null;
+        if (null != filterResolverIntf) {
+          dataMapChooser = new DataMapChooser(table);
+        }
         if (dataMapLevel == null) {
           TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
               .getDataMap(table, 
distributable.getDistributable().getDataMapSchema());
@@ -189,11 +158,12 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
                 partitions);
           }
           blocklets = DataMapUtil
-              .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, 
partitions, blocklets);
+              .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, 
partitions, blocklets,
+                  dataMapChooser);
         } else {
           blocklets = DataMapUtil
               .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, 
partitions, blocklets,
-                  dataMapLevel);
+                  dataMapLevel, dataMapChooser);
         }
         blockletIterator = blocklets.iterator();
       }
@@ -280,6 +250,8 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     out.writeUTF(dataMapToClear);
     out.writeUTF(taskGroupId);
     out.writeUTF(taskGroupDesc);
+    out.writeUTF(queryId);
+    out.writeBoolean(isWriteToFile);
   }
 
   @Override
@@ -323,6 +295,8 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     this.dataMapToClear = in.readUTF();
     this.taskGroupId = in.readUTF();
     this.taskGroupDesc = in.readUTF();
+    this.queryId = in.readUTF();
+    this.isWriteToFile = in.readBoolean();
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -390,4 +364,32 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
   public void setFilterResolverIntf(FilterResolverIntf filterResolverIntf) {
     this.filterResolverIntf = filterResolverIntf;
   }
+
+  public List<String> getInvalidSegments() {
+    return invalidSegments;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public void setQueryId(String queryId) {
+    this.queryId = queryId;
+  }
+
+  public String getDataMapToClear() {
+    return dataMapToClear;
+  }
+
+  public void setIsWriteToFile(boolean isWriteToFile) {
+    this.isWriteToFile = isWriteToFile;
+  }
+
+  public boolean isWriteToFile() {
+    return isWriteToFile;
+  }
+
+  public void setFallbackJob() {
+    isFallbackJob = true;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index bc87298..33fc3b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -124,7 +124,7 @@ public final class TableDataMap extends 
OperationEventListener {
         datamapsCount++;
       }
     }
-    int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    int numOfThreadsForPruning = CarbonProperties.getNumOfThreadsForPruning();
     if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning 
|| totalFiles
         < 
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) {
       // use multi-thread, only if the files are more than 0.1 million.
@@ -206,7 +206,7 @@ public final class TableDataMap extends 
OperationEventListener {
      
*********************************************************************************
      */
 
-    int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    int numOfThreadsForPruning = CarbonProperties.getNumOfThreadsForPruning();
     LOG.info(
         "Number of threads selected for multi-thread block pruning is " + 
numOfThreadsForPruning
             + ". total files: " + totalFiles + ". total segments: " + 
segments.size());
@@ -323,22 +323,6 @@ public final class TableDataMap extends 
OperationEventListener {
     return blocklets;
   }
 
-  private int getNumOfThreadsForPruning() {
-    int numOfThreadsForPruning = 
Integer.parseInt(CarbonProperties.getInstance()
-        
.getProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING,
-            
CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT));
-    if (numOfThreadsForPruning > Integer
-        
.parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT)
-        || numOfThreadsForPruning < 1) {
-      LOG.info("Invalid value for carbon.max.driver.threads.for.block.pruning, 
value :"
-          + numOfThreadsForPruning + " .using the default threads : "
-          + 
CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
-      numOfThreadsForPruning = Integer
-          
.parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
-    }
-    return numOfThreadsForPruning;
-  }
-
   private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> 
pruneBlocklets,
       Segment segment) {
     for (ExtendedBlocklet blocklet : pruneBlocklets) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 8ef2198..25d82f8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -105,6 +105,11 @@ public class TableBlockInfo implements Distributable, 
Serializable {
   private transient DataFileFooter dataFileFooter;
 
   /**
+   * true when index file does't have blocklet information
+   */
+  private boolean isLegacyStore;
+
+  /**
    * comparator to sort by block size in descending order.
    * Since each line is not exactly the same, the size of a InputSplit may 
differs,
    * so we allow some deviation for these splits.
@@ -210,6 +215,7 @@ public class TableBlockInfo implements Distributable, 
Serializable {
     info.deletedDeltaFilePath = deletedDeltaFilePath;
     info.detailInfo = detailInfo.copy();
     info.dataMapWriterPath = dataMapWriterPath;
+    info.isLegacyStore = isLegacyStore;
     return info;
   }
 
@@ -473,6 +479,14 @@ public class TableBlockInfo implements Distributable, 
Serializable {
     this.dataFileFooter = dataFileFooter;
   }
 
+  public boolean isLegacyStore() {
+    return isLegacyStore;
+  }
+
+  public void setLegacyStore(boolean legacyStore) {
+    isLegacyStore = legacyStore;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("TableBlockInfo{");
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java 
b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 9aeb6c4..645fdd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -65,6 +65,10 @@ public class Blocklet implements Writable,Serializable {
     return filePath;
   }
 
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     if (filePath == null) {
@@ -79,6 +83,7 @@ public class Blocklet implements Writable,Serializable {
       out.writeBoolean(true);
       out.writeUTF(blockletId);
     }
+    out.writeBoolean(compareBlockletIdForObjectMatching);
   }
 
   @Override
@@ -89,6 +94,7 @@ public class Blocklet implements Writable,Serializable {
     if (in.readBoolean()) {
       blockletId = in.readUTF();
     }
+    this.compareBlockletIdForObjectMatching = in.readBoolean();
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index a9667a8..ce1e8ac 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -36,6 +36,7 @@ import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactor
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
@@ -94,7 +95,7 @@ public class BlockletDataMapIndexStore
         Set<String> filesRead = new HashSet<>();
         String segmentFilePath = identifier.getIndexFilePath();
         if (segInfoCache == null) {
-          segInfoCache = new HashMap<String, Map<String, BlockMetaInfo>>();
+          segInfoCache = new HashMap<>();
         }
         Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
             segInfoCache.get(segmentFilePath);
@@ -106,14 +107,15 @@ public class BlockletDataMapIndexStore
         }
         // if the identifier is not a merge file we can directly load the 
datamaps
         if (identifier.getMergeIndexFileName() == null) {
+          List<DataFileFooter> indexInfos = new ArrayList<>();
           Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
               .getBlockMetaInfoMap(identifierWrapper, indexFileStore, 
filesRead,
-                  carbonDataFileBlockMetaInfoMapping);
+                  carbonDataFileBlockMetaInfoMapping, indexInfos);
           BlockDataMap blockletDataMap =
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
                   identifierWrapper.getCarbonTable(),
                   identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(),
-                  identifierWrapper.getConfiguration());
+                  identifierWrapper.getConfiguration(), indexInfos);
           dataMaps.add(blockletDataMap);
           blockletDataMapIndexWrapper =
               new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps);
@@ -123,16 +125,17 @@ public class BlockletDataMapIndexStore
               
BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, 
indexFileStore);
           for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
               tableBlockIndexUniqueIdentifiers) {
+            List<DataFileFooter> indexInfos = new ArrayList<>();
             Map<String, BlockMetaInfo> blockMetaInfoMap = 
BlockletDataMapUtil.getBlockMetaInfoMap(
                 new 
TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
                     identifierWrapper.getCarbonTable()), indexFileStore, 
filesRead,
-                carbonDataFileBlockMetaInfoMapping);
+                carbonDataFileBlockMetaInfoMapping, indexInfos);
             if (!blockMetaInfoMap.isEmpty()) {
               BlockDataMap blockletDataMap =
                   loadAndGetDataMap(blockIndexUniqueIdentifier, 
indexFileStore, blockMetaInfoMap,
                       identifierWrapper.getCarbonTable(),
                       identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(),
-                      identifierWrapper.getConfiguration());
+                      identifierWrapper.getConfiguration(), indexInfos);
               dataMaps.add(blockletDataMap);
             }
           }
@@ -274,7 +277,8 @@ public class BlockletDataMapIndexStore
    */
   private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier 
identifier,
       SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> 
blockMetaInfoMap,
-      CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration 
configuration)
+      CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration 
configuration,
+      List<DataFileFooter> indexInfos)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -285,10 +289,12 @@ public class BlockletDataMapIndexStore
     BlockDataMap dataMap;
     synchronized (lock) {
       dataMap = (BlockDataMap) 
BlockletDataMapFactory.createDataMap(carbonTable);
-      dataMap.init(new BlockletDataMapModel(carbonTable,
+      final BlockletDataMapModel blockletDataMapModel = new 
BlockletDataMapModel(carbonTable,
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ identifier
               .getIndexFileName(), 
indexFileStore.getFileData(identifier.getIndexFileName()),
-          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, 
configuration));
+          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, 
configuration);
+      blockletDataMapModel.setIndexInfos(indexInfos);
+      dataMap.init(blockletDataMapModel);
     }
     return dataMap;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index af07f09..31dcd24 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -69,10 +69,7 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
   private byte[] columnSchemaBinary;
 
   private long blockSize;
-  /**
-   * flag to check for store from 1.1 or any prior version
-   */
-  private boolean isLegacyStore;
+
   /**
    * flag to check whether to serialize min max values. The flag will be set 
to true in case
    * 1. When CACHE_LEVEL = BLOCKLET and filter column min/max in not cached in 
the driver using the
@@ -196,7 +193,6 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     out.writeInt(blockletInfoBinary.length);
     out.write(blockletInfoBinary);
     out.writeLong(blockSize);
-    out.writeBoolean(isLegacyStore);
     out.writeBoolean(useMinMaxForPruning);
   }
 
@@ -227,7 +223,6 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     in.readFully(blockletInfoBinary);
     setBlockletInfoFromBinary();
     blockSize = in.readLong();
-    isLegacyStore = in.readBoolean();
     useMinMaxForPruning = in.readBoolean();
   }
 
@@ -265,7 +260,6 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     detailInfo.columnSchemas = columnSchemas;
     detailInfo.columnSchemaBinary = columnSchemaBinary;
     detailInfo.blockSize = blockSize;
-    detailInfo.isLegacyStore = isLegacyStore;
     detailInfo.useMinMaxForPruning = useMinMaxForPruning;
     return detailInfo;
   }
@@ -301,14 +295,6 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     this.blockletInfoBinary = blockletInfoBinary;
   }
 
-  public boolean isLegacyStore() {
-    return isLegacyStore;
-  }
-
-  public void setLegacyStore(boolean legacyStore) {
-    isLegacyStore = legacyStore;
-  }
-
   public void setColumnSchemas(List<ColumnSchema> columnSchemas) {
     this.columnSchemas = columnSchemas;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 1de1ab5..a85423b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -18,13 +18,16 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 /**
@@ -150,9 +153,16 @@ public class ExtendedBlocklet extends Blocklet {
     this.inputSplit.setColumnSchema(columnSchema);
   }
 
-
-
-  @Override public void write(DataOutput out) throws IOException {
+  /**
+   * Method to seralize extended blocklet and inputsplit for index server
+   * DataFormat
+   * <Extended Blocklet data><Carbon input split serializeData 
lenght><CarbonInputSplitData>
+   * @param out
+   * @param uniqueLocation
+   * @throws IOException
+   */
+  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
+      throws IOException {
     super.write(out);
     if (dataMapUniqueId == null) {
       out.writeBoolean(false);
@@ -160,41 +170,44 @@ public class ExtendedBlocklet extends Blocklet {
       out.writeBoolean(true);
       out.writeUTF(dataMapUniqueId);
     }
+    out.writeBoolean(inputSplit != null);
     if (inputSplit != null) {
-      out.writeBoolean(true);
-      inputSplit.write(out);
-      String[] locations = getLocations();
-      if (locations != null) {
-        out.writeBoolean(true);
-        out.writeInt(locations.length);
-        for (String location : locations) {
-          out.writeUTF(location);
-        }
-      } else {
-        out.writeBoolean(false);
+      // creating byte array output stream to get the size of input split 
serializeData size
+      ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(ebos);
+      inputSplit.setFilePath(null);
+      inputSplit.setBucketId(null);
+      if (inputSplit.isBlockCache()) {
+        inputSplit.updateFooteroffset();
+        inputSplit.updateBlockLength();
+        inputSplit.setWriteDetailInfo(false);
       }
-    } else {
-      out.writeBoolean(false);
+      inputSplit.serializeFields(dos, uniqueLocation);
+      out.writeInt(ebos.size());
+      out.write(ebos.getBuffer(), 0 , ebos.size());
     }
   }
 
-  @Override public void readFields(DataInput in) throws IOException {
+  /**
+   * Method to deseralize extended blocklet and inputsplit for index server
+   * @param in
+   * @param locations
+   * @param tablePath
+   * @throws IOException
+   */
+  public void deserializeFields(DataInput in, String[] locations, String 
tablePath)
+      throws IOException {
     super.readFields(in);
     if (in.readBoolean()) {
       dataMapUniqueId = in.readUTF();
     }
-    if (in.readBoolean()) {
-      inputSplit = new CarbonInputSplit();
-      inputSplit.readFields(in);
-      if (in.readBoolean()) {
-        int numLocations = in.readInt();
-        String[] locations = new String[numLocations];
-        for (int i = 0; i < numLocations; i++) {
-          locations[i] = in.readUTF();
-        }
-        inputSplit.setLocation(locations);
-      }
+    setFilePath(tablePath + getPath());
+    boolean isSplitPresent = in.readBoolean();
+    if (isSplitPresent) {
+      // getting the length of the data
+      final int serializeLen = in.readInt();
+      this.inputSplit =
+          new CarbonInputSplit(serializeLen, in, getFilePath(), locations, 
getBlockletId());
     }
   }
-
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
new file mode 100644
index 0000000..9fd4e66
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.stream.ExtendedByteArrayInputStream;
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
+import org.apache.carbondata.core.stream.ExtendedDataInputStream;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.log4j.Logger;
+
+/**
+ * class will be used to send extended blocklet object from index executor to 
index driver
+ * if data size is more than it will be written in temp folder provided by user
+ * and only file name will send, if data size is less then complete data will 
be send to
+ * index executor
+ */
+public class ExtendedBlockletWrapper implements Writable, Serializable {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());
+
+  private boolean isWrittenToFile;
+
+  private int dataSize;
+
+  private byte[] bytes;
+
+  private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+  private static final int BLOCK_SIZE = 256 * 1024 * 1024;
+
+  public ExtendedBlockletWrapper() {
+
+  }
+
+  public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, 
String tablePath,
+      String queryId, boolean isWriteToFile) {
+    Map<String, Short> uniqueLocations = new HashMap<>();
+    byte[] bytes = convertToBytes(tablePath, uniqueLocations, 
extendedBlockletList);
+    int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
+        
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+            
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 
1024;
+    DataOutputStream stream = null;
+    // if data size is more then data will be written in file and file name 
will be sent from
+    // executor to driver, in case of any failure data will send through 
network
+    if (bytes.length > serializeAllowedSize && isWriteToFile) {
+      final String fileName = UUID.randomUUID().toString();
+      String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, 
queryId);
+      try {
+        final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath);
+        boolean isFolderExists = true;
+        if (!carbonFile.isFileExist(folderPath)) {
+          LOGGER.warn("Folder:" + folderPath + "doesn't exists, data will be 
send through netwrok");
+          isFolderExists = false;
+        }
+        if (isFolderExists) {
+          stream = FileFactory.getDataOutputStream(folderPath + "/" + fileName,
+              FileFactory.getFileType(folderPath),
+                  BUFFER_SIZE, BLOCK_SIZE, (short) 1);
+          writeBlockletToStream(stream, bytes, uniqueLocations, 
extendedBlockletList);
+          this.dataSize = stream.size();
+          this.bytes = 
fileName.getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+          isWrittenToFile = true;
+        }
+      } catch (IOException e) {
+        LOGGER.error("Problem while writing to file, data will be sent through 
network", e);
+      } finally {
+        CarbonUtil.closeStreams(stream);
+      }
+    }
+    if (!isWrittenToFile) {
+      try {
+        ExtendedByteArrayOutputStream bos = new 
ExtendedByteArrayOutputStream();
+        stream = new DataOutputStream(bos);
+        writeBlockletToStream(stream, bytes, uniqueLocations, 
extendedBlockletList);
+        this.dataSize = bos.size();
+        this.bytes = bos.getBuffer();
+      } catch (IOException e) {
+        LOGGER.error("Problem while writing data to memory stream", e);
+      } finally {
+        CarbonUtil.closeStreams(stream);
+      }
+    }
+  }
+
+  private byte[] convertToBytes(String tablePath, Map<String, Short> 
uniqueLocations,
+      List<ExtendedBlocklet> extendedBlockletList) {
+    ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
+    DataOutputStream stream = new DataOutputStream(bos);
+    try {
+      for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
+        
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, 
""));
+        extendedBlocklet.serializeData(stream, uniqueLocations);
+      }
+      return new SnappyCompressor().compressByte(bos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      CarbonUtil.closeStreams(stream);
+    }
+  }
+
+  /**
+   * Below method will be used to write the data to stream[file/memory]
+   * Data Format
+   * <number of splits><number of unique location[short]><locations><serialize 
data len><data>
+   * @param stream
+   * @param data
+   * @param uniqueLocation
+   * @param extendedBlockletList
+   * @throws IOException
+   */
+  private void writeBlockletToStream(DataOutputStream stream, byte[] data,
+      Map<String, Short> uniqueLocation, List<ExtendedBlocklet> 
extendedBlockletList)
+      throws IOException {
+    stream.writeInt(extendedBlockletList.size());
+    String[] uniqueLoc = new String[uniqueLocation.size()];
+    Iterator<Map.Entry<String, Short>> iterator = 
uniqueLocation.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<String, Short> next = iterator.next();
+      uniqueLoc[next.getValue()] = next.getKey();
+    }
+    stream.writeShort((short)uniqueLoc.length);
+    for (String loc : uniqueLoc) {
+      stream.writeUTF(loc);
+    }
+    stream.writeInt(data.length);
+    stream.write(data);
+  }
+
+  /**
+   * deseralize the blocklet data from file or stream
+   * data format
+   * <number of splits><number of unique location[short]><locations><serialize 
data len><data>
+   * @param tablePath
+   * @param queryId
+   * @return
+   * @throws IOException
+   */
+  public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) 
throws IOException {
+    byte[] data;
+    if (bytes != null) {
+      if (isWrittenToFile) {
+        DataInputStream stream = null;
+        try {
+          final String folderPath = 
CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+          String fileName = new String(bytes, 
CarbonCommonConstants.DEFAULT_CHARSET);
+          stream = FileFactory
+              .getDataInputStream(folderPath + "/" + fileName, 
FileFactory.getFileType(folderPath));
+          data = new byte[dataSize];
+          stream.readFully(data);
+        } finally {
+          CarbonUtil.closeStreams(stream);
+        }
+      } else {
+        data = bytes;
+      }
+      DataInputStream stream = null;
+      int numberOfBlocklet;
+      String[] locations;
+      int actualDataLen;
+      try {
+        stream = new DataInputStream(new ByteArrayInputStream(data));
+        numberOfBlocklet = stream.readInt();
+        short numberOfLocations = stream.readShort();
+        locations = new String[numberOfLocations];
+        for (int i = 0; i < numberOfLocations; i++) {
+          locations[i] = stream.readUTF();
+        }
+        actualDataLen = stream.readInt();
+      } finally {
+        CarbonUtil.closeStreams(stream);
+      }
+
+      final byte[] unCompressByte =
+          new SnappyCompressor().unCompressByte(data, data.length - 
actualDataLen, actualDataLen);
+      ExtendedByteArrayInputStream ebis = new 
ExtendedByteArrayInputStream(unCompressByte);
+      ExtendedDataInputStream eDIS = new ExtendedDataInputStream(ebis);
+      List<ExtendedBlocklet> extendedBlockletList = new ArrayList<>();
+      try {
+        for (int i = 0; i < numberOfBlocklet; i++) {
+          ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
+          extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
+          extendedBlockletList.add(extendedBlocklet);
+        }
+      } finally {
+        CarbonUtil.closeStreams(eDIS);
+      }
+      return extendedBlockletList;
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isWrittenToFile);
+    out.writeBoolean(bytes != null);
+    if (bytes != null) {
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+    out.writeInt(dataSize);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    this.isWrittenToFile = in.readBoolean();
+    if (in.readBoolean()) {
+      this.bytes = new byte[in.readInt()];
+      in.readFully(bytes);
+    }
+    this.dataSize = in.readInt();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
new file mode 100644
index 0000000..0c52297
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * below class will be used to send split information from index driver to
+ * main driver.
+ * Main driver will Deserialize the extended blocklet object and get the split
+ * to run the query
+ */
+public class ExtendedBlockletWrapperContainer implements Writable {
+
+  private static final Logger LOGGER =
+      
LogServiceFactory.getLogService(ExtendedBlockletWrapperContainer.class.getName());
+
+  private ExtendedBlockletWrapper[] extendedBlockletWrappers;
+
+  private boolean isFallbackJob;
+
+  public ExtendedBlockletWrapperContainer() {
+
+  }
+
+  public ExtendedBlockletWrapperContainer(ExtendedBlockletWrapper[] 
extendedBlockletWrappers,
+      boolean isFallbackJob) {
+    this.extendedBlockletWrappers = extendedBlockletWrappers;
+    this.isFallbackJob = isFallbackJob;
+  }
+
+  public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String 
queryId)
+      throws IOException {
+    if (!isFallbackJob) {
+      int numOfThreads = CarbonProperties.getNumOfThreadsForPruning();
+      ExecutorService executorService = Executors
+          .newFixedThreadPool(numOfThreads, new 
CarbonThreadFactory("SplitDeseralizerPool", true));
+      int numberOfWrapperPerThread = extendedBlockletWrappers.length / 
numOfThreads;
+      int leftOver = extendedBlockletWrappers.length % numOfThreads;
+      int[] split = null;
+      if (numberOfWrapperPerThread > 0) {
+        split = new int[numOfThreads];
+      } else {
+        split = new int[leftOver];
+      }
+      Arrays.fill(split, numberOfWrapperPerThread);
+      for (int i = 0; i < leftOver; i++) {
+        split[i] += 1;
+      }
+      int start = 0;
+      int end = 0;
+      List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>();
+      for (int i = 0; i < split.length; i++) {
+        end += split[i];
+        futures.add(executorService
+            .submit(new ExtendedBlockletDeserializerThread(start, end, 
tablePath, queryId)));
+        start += split[i];
+      }
+      executorService.shutdown();
+      try {
+        executorService.awaitTermination(1, TimeUnit.HOURS);
+      } catch (InterruptedException e) {
+        LOGGER.error(e);
+        throw new RuntimeException(e);
+      }
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      for (int i = 0; i < futures.size(); i++) {
+        try {
+          extendedBlocklets.addAll(futures.get(i).get());
+        } catch (InterruptedException | ExecutionException e) {
+          LOGGER.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+      return extendedBlocklets;
+    } else {
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      for (ExtendedBlockletWrapper extendedBlockletWrapper: 
extendedBlockletWrappers) {
+        
extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, 
queryId));
+      }
+      return extendedBlocklets;
+    }
+  }
+
+  private class ExtendedBlockletDeserializerThread implements 
Callable<List<ExtendedBlocklet>> {
+
+    private int start;
+
+    private int end;
+
+    private String tablePath;
+
+    private String queryId;
+
+    public ExtendedBlockletDeserializerThread(int start, int end, String 
tablePath,
+        String queryId) {
+      this.start = start;
+      this.end = end;
+      this.tablePath = tablePath;
+      this.queryId = queryId;
+    }
+
+    @Override public List<ExtendedBlocklet> call() throws Exception {
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      for (int i = start; i < end; i++) {
+        
extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, 
queryId));
+      }
+      return extendedBlocklets;
+    }
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeInt(extendedBlockletWrappers.length);
+    for (int i = 0; i < extendedBlockletWrappers.length; i++) {
+      extendedBlockletWrappers[i].write(out);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    extendedBlockletWrappers = new ExtendedBlockletWrapper[in.readInt()];
+    for (int i = 0; i < extendedBlockletWrappers.length; i++) {
+      ExtendedBlockletWrapper extendedBlockletWrapper = new 
ExtendedBlockletWrapper();
+      extendedBlockletWrapper.readFields(in);
+      extendedBlockletWrappers[i] = extendedBlockletWrapper;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 13e612d..24ad43a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -117,9 +117,16 @@ public class BlockDataMap extends CoarseGrainDataMap
     BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) 
dataMapModel;
     DataFileFooterConverter fileFooterConverter =
         new DataFileFooterConverter(dataMapModel.getConfiguration());
-    List<DataFileFooter> indexInfo = fileFooterConverter
-        .getIndexInfo(blockletDataMapInfo.getFilePath(), 
blockletDataMapInfo.getFileData(),
-            blockletDataMapInfo.getCarbonTable().isTransactionalTable());
+    List<DataFileFooter> indexInfo = null;
+    if (blockletDataMapInfo.getIndexInfos() == null || 
blockletDataMapInfo.getIndexInfos()
+        .isEmpty()) {
+      indexInfo = fileFooterConverter
+          .getIndexInfo(blockletDataMapInfo.getFilePath(), 
blockletDataMapInfo.getFileData(),
+              blockletDataMapInfo.getCarbonTable().isTransactionalTable());
+    } else {
+      // when index info is already read and converted to data file footer 
object
+      indexInfo = blockletDataMapInfo.getIndexInfos();
+    }
     Path path = new Path(blockletDataMapInfo.getFilePath());
     // store file path only in case of partition table, non transactional 
table and flat folder
     // structure
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 0a75d59..2f0c122 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -16,10 +16,12 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +40,10 @@ public class BlockletDataMapModel extends DataMapModel {
   private String segmentId;
 
   private boolean addToUnsafe = true;
+  /**
+   * list of index thrift object present in index file
+   */
+  private List<DataFileFooter> indexInfos;
 
   public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] 
fileData,
       Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, 
Configuration configuration) {
@@ -74,4 +80,12 @@ public class BlockletDataMapModel extends DataMapModel {
   public CarbonTable getCarbonTable() {
     return carbonTable;
   }
+
+  public void setIndexInfos(List<DataFileFooter> indexInfos) {
+    this.indexInfos = indexInfos;
+  }
+
+  public List<DataFileFooter> getIndexInfos() {
+    return indexInfos;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6c048f3..b3d4780 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -194,9 +194,11 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
       // available so read the blocklet information from block file
       // 2. CACHE_LEVEL is set to block
       // 3. CACHE_LEVEL is BLOCKLET but filter column min/max is not cached in 
driver
-      if (blockletDetailInfo.getBlockletInfo() == null || blockletDetailInfo
-            .isUseMinMaxForPruning()) {
-        blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
+      if (null == blockletDetailInfo || blockletDetailInfo.getBlockletInfo() 
== null
+          || blockletDetailInfo.isUseMinMaxForPruning()) {
+        if (null != blockletDetailInfo) {
+          blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
+        }
         DataFileFooter fileFooter = 
filePathToFileFooterMapping.get(blockInfo.getFilePath());
         if (null != blockInfo.getDataFileFooter()) {
           fileFooter = blockInfo.getDataFileFooter();
@@ -211,6 +213,9 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
             
QueryUtil.updateColumnUniqueIdForNonTransactionTable(fileFooter.getColumnInTable());
           }
           filePathToFileFooterMapping.put(blockInfo.getFilePath(), fileFooter);
+          if (null == blockletDetailInfo) {
+            blockletDetailInfo = QueryUtil.getBlockletDetailInfo(fileFooter, 
blockInfo);
+          }
           blockInfo.setDetailInfo(blockletDetailInfo);
         }
         if (null == segmentProperties) {
@@ -220,7 +225,7 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
           updateColumns(queryModel, fileFooter.getColumnInTable(), 
blockInfo.getFilePath());
           filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), 
segmentProperties);
         }
-        if (blockletDetailInfo.isLegacyStore()) {
+        if (blockInfo.isLegacyStore()) {
           LOGGER.warn("Skipping Direct Vector Filling as it is not Supported "
               + "for Legacy store prior to V3 store");
           queryModel.setDirectVectorFill(false);
@@ -377,7 +382,7 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
     detailInfo.setRowCount(blockletInfo.getNumberOfRows());
     byte[][] maxValues = 
blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues();
     byte[][] minValues = 
blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues();
-    if (blockletDetailInfo.isLegacyStore()) {
+    if (blockInfo.isLegacyStore()) {
       info.setDataBlockFromOldStore(true);
     }
     blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 95fbe66..ced99b4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -34,12 +34,15 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -777,4 +780,27 @@ public class QueryUtil {
       }
     }
   }
+
+  /**
+   * In case of index server there will not be any details info serialize from 
driver.
+   * Below method will use to create blocklet detail info object from footer
+   * @param fileFooter
+   * @param blockInfo
+   * @return
+   */
+  public static BlockletDetailInfo getBlockletDetailInfo(DataFileFooter 
fileFooter,
+      TableBlockInfo blockInfo) {
+    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+    detailInfo.setDimLens(fileFooter.getSegmentInfo().getColumnCardinality());
+    detailInfo.setBlockletInfoBinary(new byte[0]);
+    detailInfo.setColumnSchemas(fileFooter.getColumnInTable());
+    detailInfo.setBlockletId((short) -1);
+    detailInfo.setRowCount((int) fileFooter.getNumberOfRows());
+    
detailInfo.setSchemaUpdatedTimeStamp(fileFooter.getSchemaUpdatedTimeStamp());
+    detailInfo.setBlockFooterOffset(blockInfo.getBlockOffset());
+    detailInfo.setBlockSize(blockInfo.getBlockLength());
+    detailInfo.setUseMinMaxForPruning(true);
+    detailInfo.setVersionNumber(blockInfo.getVersion().number());
+    return detailInfo;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index c154c5f..05f616f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -29,10 +29,6 @@ public enum FileFormat {
   ROW_V1;
 
   public static FileFormat getByOrdinal(int ordinal) {
-    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return COLUMNAR_V3;
-    }
-
     switch (ordinal) {
       case 0:
         return COLUMNAR_V3;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayInputStream.java
similarity index 59%
copy from 
core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
copy to 
core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayInputStream.java
index c154c5f..f03464a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayInputStream.java
@@ -15,31 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.core.statusmanager;
+package org.apache.carbondata.core.stream;
 
-/**
- * The data file format supported in carbondata project
- */
-public enum FileFormat {
+import java.io.ByteArrayInputStream;
 
-  // carbondata columnar file format, optimized for read
-  COLUMNAR_V3,
+public class ExtendedByteArrayInputStream extends ByteArrayInputStream {
+  public ExtendedByteArrayInputStream(byte[] buf) {
+    super(buf);
+  }
 
-  // carbondata row file format, optimized for write
-  ROW_V1;
+  public byte[] getBuffer() {
+    return buf;
+  }
 
-  public static FileFormat getByOrdinal(int ordinal) {
-    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return COLUMNAR_V3;
-    }
+  public void setPosition(int position) {
+    this.pos = position;
+  }
 
-    switch (ordinal) {
-      case 0:
-        return COLUMNAR_V3;
-      case 1:
-        return ROW_V1;
-    }
+  public int getPosition() {
+    return this.pos;
+  }
 
-    return COLUMNAR_V3;
+  public int getLength() {
+    return count - pos;
   }
 }
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
 
b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayOutputStream.java
similarity index 74%
rename from 
store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
rename to 
core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayOutputStream.java
index 393cd86..f941bd1 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayOutputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.carbondata.sdk.file.arrow;
+package org.apache.carbondata.core.stream;
 
 import java.io.ByteArrayOutputStream;
 
@@ -23,17 +23,24 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 
 public class ExtendedByteArrayOutputStream extends ByteArrayOutputStream {
 
+  public ExtendedByteArrayOutputStream() {
+
+  }
+
   public ExtendedByteArrayOutputStream(int initialSize) {
     super(initialSize);
   }
 
+  public byte[] getBuffer() {
+    return buf;
+  }
+
   public long copyToAddress() {
-    final long address = CarbonUnsafe.getUnsafe()
-        .allocateMemory(CarbonCommonConstants.INT_SIZE_IN_BYTE + count);
+    final long address =
+        
CarbonUnsafe.getUnsafe().allocateMemory(CarbonCommonConstants.INT_SIZE_IN_BYTE 
+ count);
     CarbonUnsafe.getUnsafe().putInt(address, count);
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(buf, CarbonUnsafe.BYTE_ARRAY_OFFSET, null,
-            address + CarbonCommonConstants.INT_SIZE_IN_BYTE, count);
+    CarbonUnsafe.getUnsafe().copyMemory(buf, CarbonUnsafe.BYTE_ARRAY_OFFSET, 
null,
+        address + CarbonCommonConstants.INT_SIZE_IN_BYTE, count);
     return address;
   }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedDataInputStream.java
similarity index 59%
copy from 
core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
copy to 
core/src/main/java/org/apache/carbondata/core/stream/ExtendedDataInputStream.java
index c154c5f..c0e236b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedDataInputStream.java
@@ -15,31 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.core.statusmanager;
+package org.apache.carbondata.core.stream;
 
-/**
- * The data file format supported in carbondata project
- */
-public enum FileFormat {
-
-  // carbondata columnar file format, optimized for read
-  COLUMNAR_V3,
+import java.io.DataInputStream;
 
-  // carbondata row file format, optimized for write
-  ROW_V1;
+public class ExtendedDataInputStream extends DataInputStream {
 
-  public static FileFormat getByOrdinal(int ordinal) {
-    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return COLUMNAR_V3;
-    }
+  private ExtendedByteArrayInputStream in;
 
-    switch (ordinal) {
-      case 0:
-        return COLUMNAR_V3;
-      case 1:
-        return ROW_V1;
-    }
+  /**
+   * Creates a DataInputStream that uses the specified
+   * underlying InputStream.
+   *
+   * @param in the specified input stream
+   */
+  public ExtendedDataInputStream(ExtendedByteArrayInputStream in) {
+    super(in);
+    this.in = in;
+  }
 
-    return COLUMNAR_V3;
+  public ExtendedByteArrayInputStream getUnderlineStream() {
+    return this.in;
   }
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 9074587..6cd60a2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -74,7 +74,8 @@ public class BlockletDataMapUtil {
   public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
       TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
       SegmentIndexFileStore indexFileStore, Set<String> filesRead,
-      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping) throws IOException 
{
+      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, 
List<DataFileFooter> indexInfos)
+      throws IOException {
     boolean isTransactionalTable = true;
     TableBlockIndexUniqueIdentifier identifier =
         identifierWrapper.getTableBlockIndexUniqueIdentifier();
@@ -107,6 +108,7 @@ public class BlockletDataMapUtil {
         identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + 
identifier
             .getIndexFileName(), 
indexFileStore.getFileData(identifier.getIndexFileName()),
         isTransactionalTable);
+    indexInfos.addAll(indexInfo);
     for (DataFileFooter footer : indexInfo) {
       if ((!isTransactionalTable) && (tableColumnList.size() != 0) &&
           !isSameColumnAndDifferentDatatypeInSchema(footer.getColumnInTable(), 
tableColumnList)) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a53c365..ec8a1c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -199,6 +199,9 @@ public final class CarbonProperties {
       case DETAIL_QUERY_BATCH_SIZE:
         validateDetailQueryBatchSize();
         break;
+      case CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD:
+        validateIndexServerSerializationThreshold();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for 
addProperty flow
       default:
         // none
@@ -264,6 +267,7 @@ public final class CarbonProperties {
     validateSortMemorySpillPercentage();
     validateStringCharacterLimit();
     validateDetailQueryBatchSize();
+    validateIndexServerSerializationThreshold();
   }
 
   /**
@@ -656,6 +660,38 @@ public final class CarbonProperties {
   }
 
   /**
+   * This method validates the index server serialization size
+   */
+  private void validateIndexServerSerializationThreshold() {
+    String serializationSizeString = carbonProperties
+        
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+            
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+    try {
+      int serializationSize = Integer.parseInt(serializationSizeString);
+
+      if (serializationSize < 
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MIN
+          || serializationSize
+          > 
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MAX) {
+        LOGGER.info(
+            "The " + 
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD + " value \""
+                + serializationSize + "\" is invalid. Using the default value 
\""
+                + 
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+        carbonProperties
+            
.setProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+                
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info(
+          "The " + 
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD + " value \""
+              + serializationSizeString + "\" is invalid. Using the default 
value \""
+              + 
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+      carbonProperties
+          
.setProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+              
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+    }
+  }
+
+  /**
    * This method validates the sort size
    */
   private void validateSortSize() {
@@ -1670,4 +1706,41 @@ public final class CarbonProperties {
     return CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT;
   }
 
+  public int getNumOfThreadsForExecutorPruning() {
+    String configuredValue = CarbonProperties.getInstance()
+        
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING);
+    if (configuredValue == null || configuredValue.equalsIgnoreCase("0")) {
+      configuredValue = 
CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT;
+    }
+    try {
+      int numOfThreads = Integer.parseInt(configuredValue);
+      LOGGER.info("Value for "
+          + 
CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING + " is "
+          + numOfThreads);
+      return numOfThreads;
+    } catch (NumberFormatException e) {
+      LOGGER.info(configuredValue + " is not a valid input for "
+          + 
CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING + ", taking 
"
+          + 
CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT
+          + " as default value");
+      return Integer
+          
.parseInt(CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+    }
+  }
+
+  public static int getNumOfThreadsForPruning() {
+    int numOfThreadsForPruning = 
Integer.parseInt(CarbonProperties.getInstance()
+        
.getProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING,
+            
CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT));
+    if (numOfThreadsForPruning > Integer
+        
.parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT)
+        || numOfThreadsForPruning < 1) {
+      LOGGER.info("Invalid value for 
carbon.max.driver.threads.for.block.pruning, value :"
+          + numOfThreadsForPruning + " .using the default threads : "
+          + 
CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+      numOfThreadsForPruning = Integer
+          
.parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+    }
+    return numOfThreadsForPruning;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 6fa24b7..376c757 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3311,4 +3311,31 @@ public final class CarbonUtil {
     }
     return null;
   }
+
+  public static String getIndexServerTempPath(String tablePath, String 
queryId) {
+    String tempFolderPath = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_TEMP_PATH);
+    if (null == tempFolderPath) {
+      tempFolderPath =
+          tablePath + "/" + 
CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/" + queryId;
+    } else {
+      tempFolderPath =
+          tempFolderPath + "/" + 
CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/"
+              + queryId;
+    }
+    return tempFolderPath;
+  }
+
+  public static CarbonFile createTempFolderForIndexServer(String tablePath, 
String queryId)
+      throws IOException {
+    final String path = getIndexServerTempPath(tablePath, queryId);
+    CarbonFile file = FileFactory.getCarbonFile(path);
+    if (!file.mkdirs(path)) {
+      LOGGER.info("Unable to create table directory for index server");
+      return null;
+    } else {
+      LOGGER.info("Created index server temp directory" + path);
+      return file;
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java 
b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 931b41b..71c86c1 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -16,7 +16,9 @@
  */
 package org.apache.carbondata.hadoop;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
@@ -39,9 +41,12 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.stream.ExtendedByteArrayInputStream;
+import org.apache.carbondata.core.stream.ExtendedDataInputStream;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
@@ -92,7 +97,7 @@ public class CarbonInputSplit extends FileSplit
 
   private transient int[] columnCardinality;
 
-  private transient boolean isLegacyStore;
+  private boolean isLegacyStore;
 
   private transient List<ColumnSchema> columnSchema;
 
@@ -114,6 +119,37 @@ public class CarbonInputSplit extends FileSplit
 
   private transient String blockPath;
 
+  /**
+   * used in case of index server, all the fields which is required
+   * only in case in executor not need to deseralize and will be kept as
+   * byte array and duing write method directly it will be written to output 
stream
+   */
+  private byte[] serializeData;
+
+  /**
+   * start position of fields
+   */
+  private int offset;
+
+  /**
+   * actual length of data
+   */
+  private int actualLen;
+
+  /**
+   * in case of index server block cache no need to write detail info, 
filepath, blocklet id
+   * bucket id to reduce the serialize data size, this parameter will be used 
to check whether
+   * its a index server flow or not
+   */
+  private boolean writeDetailInfo = true;
+
+  /**
+   * TODO remove this code after Index server count(*) optimization
+   * only used for index server, once index server handled count star push down
+   * below row count is not required
+   */
+  private int rowCount;
+
   public CarbonInputSplit() {
     segment = null;
     taskId = "0";
@@ -123,6 +159,58 @@ public class CarbonInputSplit extends FileSplit
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
+  /**
+   * Will be used in case of index server
+   * @param serializeLen
+   * @param in
+   * @param filePath
+   * @param allLocation
+   * @param blockletId
+   * @throws IOException
+   */
+  public CarbonInputSplit(int serializeLen, DataInput in, String filePath, 
String[] allLocation,
+      String blockletId) throws IOException {
+    this.filePath = filePath;
+    this.blockletId = blockletId;
+    // getting the underline stream to get the actual position of the fileds 
which won't be
+    // deseralize as its used by executor
+    ExtendedByteArrayInputStream underlineStream =
+        ((ExtendedDataInputStream) in).getUnderlineStream();
+    // current position
+    int currentPosition = underlineStream.getPosition();
+    // number of locations
+    short numberOfLocations = in.readShort();
+    if (numberOfLocations > 0) {
+      // used locations for this split
+      this.location = new String[numberOfLocations];
+      for (int i = 0; i < location.length; i++) {
+        location[i] = allLocation[in.readShort()];
+      }
+    }
+    // get start
+    this.start = in.readLong();
+    this.length = in.readLong();
+    this.version = ColumnarFormatVersion.valueOf(in.readShort());
+    // will be removed after count(*) optmization in case of index server
+    this.rowCount = in.readInt();
+    // after deseralizing required field get the start position of field which 
will be only used
+    // in executor
+    int leftoverPosition = underlineStream.getPosition();
+    // position of next split
+    int newPosition = currentPosition + serializeLen;
+    // setting the position to next split
+    underlineStream.setPosition(newPosition);
+    this.serializeData = underlineStream.getBuffer();
+    this.offset = leftoverPosition;
+    this.actualLen = serializeLen - (leftoverPosition - currentPosition);
+    String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(this.filePath);
+    if (taskNo.contains("_")) {
+      taskNo = taskNo.split("_")[0];
+    }
+    this.taskId = taskNo;
+    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(this.filePath);
+  }
+
   private CarbonInputSplit(String segmentId, String blockletId, String 
filePath, long start,
       long length, ColumnarFormatVersion version, String[] deleteDeltaFiles,
       String dataMapWritePath) {
@@ -211,6 +299,7 @@ public class CarbonInputSplit extends FileSplit
                 blockletInfos, split.getVersion(), 
split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
         blockInfo.setDataMapWriterPath(split.dataMapWritePath);
+        blockInfo.setLegacyStore(split.isLegacyStore);
         if (split.getDetailInfo() != null) {
           
blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
         }
@@ -232,7 +321,10 @@ public class CarbonInputSplit extends FileSplit
               inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
               inputSplit.getDeleteDeltaFiles());
       blockInfo.setDetailInfo(inputSplit.getDetailInfo());
-      
blockInfo.setBlockOffset(inputSplit.getDetailInfo().getBlockFooterOffset());
+      if (null != inputSplit.getDetailInfo()) {
+        
blockInfo.setBlockOffset(inputSplit.getDetailInfo().getBlockFooterOffset());
+      }
+      blockInfo.setLegacyStore(inputSplit.isLegacyStore);
       return blockInfo;
     } catch (IOException e) {
       throw new RuntimeException("fail to get location of split: " + 
inputSplit, e);
@@ -240,6 +332,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public String getSegmentId() {
+    derserializeField();
     if (segment != null) {
       return segment.getSegmentNo();
     } else {
@@ -248,18 +341,25 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public Segment getSegment() {
+    derserializeField();
     return segment;
   }
 
 
   @Override public void readFields(DataInput in) throws IOException {
-    this.filePath = in.readUTF();
-    this.start = in.readLong();
-    this.length = in.readLong();
-    this.segment = Segment.toSegment(in.readUTF());
-    this.version = ColumnarFormatVersion.valueOf(in.readShort());
-    this.bucketId = in.readUTF();
+    // if serializeData is not null it means fields which is present below if 
condition are alredy
+    // deserialize  
org.apache.carbondata.hadoop.CarbonInputSplit#CarbonInputSplit(
+    // int, java.io.DataInput, java.lang.String, java.lang.String[], 
java.lang.String)
+    if (null == serializeData) {
+      this.filePath = in.readUTF();
+      this.start = in.readLong();
+      this.length = in.readLong();
+      this.version = ColumnarFormatVersion.valueOf(in.readShort());
+      this.rowCount = in.readInt();
+      this.bucketId = in.readUTF();
+    }
     this.blockletId = in.readUTF();
+    this.segment = Segment.toSegment(in.readUTF());
     int numberOfDeleteDeltaFiles = in.readInt();
     deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
     for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
@@ -279,26 +379,55 @@ public class CarbonInputSplit extends FileSplit
     for (int i = 0; i < validBlockletIdCount; i++) {
       validBlockletIds.add((int) in.readShort());
     }
+    this.isLegacyStore = in.readBoolean();
   }
 
   @Override public void write(DataOutput out) throws IOException {
-    out.writeUTF(filePath);
+    // if serializeData is not null then its a index server flow so write 
fields
+    // which is already deserialize and write serializeData to output stream
+    if (null != serializeData) {
+      out.writeUTF(filePath);
+      out.writeLong(start);
+      out.writeLong(length);
+      out.writeShort(version.number());
+      out.writeInt(rowCount);
+      out.writeUTF(bucketId);
+      out.writeUTF(blockletId);
+      out.write(serializeData, offset, actualLen);
+      return;
+    }
+    // please refer writeDetailInfo doc
+    if (null != filePath) {
+      out.writeUTF(filePath);
+    }
     out.writeLong(start);
     out.writeLong(length);
-    out.writeUTF(segment.toString());
     out.writeShort(version.number());
-    out.writeUTF(bucketId);
+    //TODO remove this code once count(*) optmization is added in case of 
index server
+    if (null != dataMapRow) {
+      
out.writeInt(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
+    } else if (null != detailInfo) {
+      out.writeInt(this.detailInfo.getRowCount());
+    } else {
+      out.writeInt(0);
+    }
+    if (null != bucketId) {
+      out.writeUTF(bucketId);
+    }
     out.writeUTF(blockletId);
+    out.writeUTF(segment.toString());
     out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
     if (null != deleteDeltaFiles) {
       for (int i = 0; i < deleteDeltaFiles.length; i++) {
         out.writeUTF(deleteDeltaFiles[i]);
       }
     }
-    out.writeBoolean(detailInfo != null || dataMapRow != null);
-    if (detailInfo != null) {
+    // please refer writeDetailInfo doc
+    out.writeBoolean(writeDetailInfo && (detailInfo != null || dataMapRow != 
null));
+    if (writeDetailInfo && detailInfo != null) {
       detailInfo.write(out);
-    } else if (dataMapRow != null) {
+      // please refer writeDetailInfo doc
+    } else if (writeDetailInfo && dataMapRow != null) {
       writeBlockletDetailsInfo(out);
     }
     out.writeBoolean(dataMapWritePath != null);
@@ -309,6 +438,7 @@ public class CarbonInputSplit extends FileSplit
     for (Integer blockletId : getValidBlockletIds()) {
       out.writeShort(blockletId);
     }
+    out.writeBoolean(isLegacyStore);
   }
 
   /**
@@ -339,6 +469,8 @@ public class CarbonInputSplit extends FileSplit
       return -1;
     }
     CarbonInputSplit other = (CarbonInputSplit) o;
+    derserializeField();
+    other.derserializeField();
     int compareResult = 0;
     // get the segment id
     // converr seg ID to double.
@@ -400,6 +532,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   @Override public int hashCode() {
+    derserializeField();
     int result = taskId.hashCode();
     result = 31 * result + segment.hashCode();
     result = 31 * result + bucketId.hashCode();
@@ -498,6 +631,10 @@ public class CarbonInputSplit extends FileSplit
     this.isBlockCache = isBlockCache;
   }
 
+  public boolean isBlockCache() {
+    return this.isBlockCache;
+  }
+
   private void writeBlockletDetailsInfo(DataOutput out) throws IOException {
     
out.writeInt(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
     if (this.isBlockCache) {
@@ -535,7 +672,6 @@ public class CarbonInputSplit extends FileSplit
       out.write(blockletInfoBinary);
     }
     out.writeLong(getLength());
-    out.writeBoolean(this.isLegacyStore);
     out.writeBoolean(this.useMinMaxForPruning);
   }
 
@@ -544,6 +680,7 @@ public class CarbonInputSplit extends FileSplit
       detailInfo = new BlockletDetailInfo();
       detailInfo
           
.setRowCount(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
+      rowCount = detailInfo.getRowCount();
       detailInfo
           
.setVersionNumber(this.dataMapRow.getShort(BlockletDataMapRowIndexes.VERSION_INDEX));
       detailInfo.setBlockletId(Short.parseShort(this.blockletId));
@@ -552,9 +689,10 @@ public class CarbonInputSplit extends FileSplit
           
this.dataMapRow.getLong(BlockletDataMapRowIndexes.SCHEMA_UPADATED_TIME_INDEX));
       detailInfo.setBlockFooterOffset(
           
this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET));
+      start = detailInfo.getBlockFooterOffset();
       detailInfo
           .setBlockSize(getLength());
-      detailInfo.setLegacyStore(isLegacyStore);
+      length = detailInfo.getBlockSize();
       detailInfo.setUseMinMaxForPruning(useMinMaxForPruning);
       if (!this.isBlockCache) {
         detailInfo.setColumnSchemas(this.columnSchema);
@@ -601,8 +739,22 @@ public class CarbonInputSplit extends FileSplit
   /** The position of the first byte in the file to process. */
   public long getStart() { return start; }
 
-  @Override
-  public long getLength() {
+  /**
+   * In case of index server detail info won't be present
+   * so footer offsets needs to be written correctly, so updating the length
+   *
+   */
+  public void updateFooteroffset() {
+    if (isBlockCache && start == 0) {
+      if (null != dataMapRow) {
+        start = 
this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET);
+      } else if (null != detailInfo) {
+        start = detailInfo.getBlockFooterOffset();
+      }
+    }
+  }
+
+  public void updateBlockLength() {
     if (length == -1) {
       if (null != dataMapRow) {
         length = 
this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH);
@@ -610,6 +762,11 @@ public class CarbonInputSplit extends FileSplit
         length = detailInfo.getBlockSize();
       }
     }
+  }
+
+  @Override
+  public long getLength() {
+    updateBlockLength();
     return length;
   }
 
@@ -629,4 +786,78 @@ public class CarbonInputSplit extends FileSplit
   public void setLocation(String[] location) {
     this.location = location;
   }
+
+  /**
+   * In case of index server block cache no need to write details info as its
+   * heavy object.
+   * @param writeDetailInfo
+   */
+  public void setWriteDetailInfo(boolean writeDetailInfo) {
+    this.writeDetailInfo = writeDetailInfo;
+  }
+
+  /**
+   * Below method will be used to serialize the input split in case of
+   * index server
+   * @param out
+   * @param uniqueLocationMap
+   * @throws IOException
+   */
+  public void serializeFields(DataOutput out, Map<String, Short> 
uniqueLocationMap)
+      throws IOException {
+    final String[] locations = getLocations();
+    if (null != locations) {
+      out.writeShort(locations.length);
+      // below code is to get the unique locations across all the block
+      for (String loc : locations) {
+        Short pos = uniqueLocationMap.get(loc);
+        if (null == pos) {
+          pos = (short) uniqueLocationMap.size();
+          uniqueLocationMap.put(loc, pos);
+        }
+        out.writeShort(pos);
+      }
+    } else {
+      out.writeShort(0);
+    }
+    write(out);
+  }
+
+  /**
+   * This method will be used to deserialize fields
+   * in case of index server
+   */
+  private void derserializeField() {
+    if (null != serializeData) {
+      DataInputStream in = null;
+      try {
+        ByteArrayInputStream bis = new ByteArrayInputStream(serializeData, 
offset, actualLen);
+        in = new DataInputStream(bis);
+        readFields(in);
+        serializeData = null;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (null != in) {
+          CarbonUtil.closeStreams(in);
+        }
+      }
+    }
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public void setStart(long start) {
+    this.start = start;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public void setBucketId(String bucketId) {
+    this.bucketId = bucketId;
+  }
 }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 7ac5bc0..3703727 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -100,7 +100,7 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Serializable, W
 
   public void calculateLength() {
     long total = 0;
-    if (splitList.size() > 0) {
+    if (splitList.size() > 1) {
       Map<String, Long> blockSizes = new HashMap<>();
       for (CarbonInputSplit split : splitList) {
         blockSizes.put(split.getFilePath(), split.getLength());
@@ -108,6 +108,8 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Serializable, W
       for (Map.Entry<String, Long> entry : blockSizes.entrySet()) {
         total += entry.getValue();
       }
+    } else if (splitList.size() == 1) {
+      total += splitList.get(0).getLength();
     }
     length = total;
   }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 1a529e3..635ab1a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -81,17 +81,25 @@ public class CarbonRecordReader<T> extends 
AbstractRecordReader<T> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       splitList = new ArrayList<>(1);
-      String splitPath = ((CarbonInputSplit) inputSplit).getFilePath();
+      CarbonInputSplit carbonInputSplit = ((CarbonInputSplit) inputSplit);
+      String splitPath = carbonInputSplit.getFilePath();
       // BlockFooterOffSet will be null in case of CarbonVectorizedReader as 
this has to be set
       // where multiple threads are able to read small set of files to 
calculate footer instead
       // of the main thread setting this for all the files.
-      if (((CarbonInputSplit) 
inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+      if ((null != carbonInputSplit.getDetailInfo()
+          && carbonInputSplit.getDetailInfo().getBlockFooterOffset() == 0L) || 
(
+          null == carbonInputSplit.getDetailInfo() && 
carbonInputSplit.getStart() == 0)) {
         FileReader reader = 
FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
             context.getConfiguration());
         ByteBuffer buffer = reader
             .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath), 
inputSplit.getLength() - 8,
                 8);
-        ((CarbonInputSplit) 
inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        if (carbonInputSplit.getDetailInfo() == null) {
+          carbonInputSplit.setStart(buffer.getLong());
+        } else {
+          
carbonInputSplit.getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        }
+        reader.finish();
       }
       splitList.add((CarbonInputSplit) inputSplit);
     } else if (inputSplit instanceof CarbonMultiBlockSplit) {
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 274c7ef..6c99142 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -566,7 +566,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
           for (InputSplit extendedBlocklet : extendedBlocklets) {
             CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
             blockletToRowCountMap.put(blocklet.getSegmentId() + "," + 
blocklet.getFilePath(),
-                (long) blocklet.getDetailInfo().getRowCount());
+                (long) blocklet.getRowCount());
           }
         } catch (Exception e) {
           // Check if fallback is disabled then directly throw exception 
otherwise try driver
@@ -618,7 +618,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
             getDistributedSplit(table, null, partitions, filteredSegment,
                 allSegments.getInvalidSegments(), new ArrayList<String>()));
         for (InputSplit extendedBlocklet : extendedBlocklets) {
-          totalRowCount += ((CarbonInputSplit) 
extendedBlocklet).getDetailInfo().getRowCount();
+          totalRowCount += ((CarbonInputSplit) extendedBlocklet).getRowCount();
         }
       } else {
         TableDataMap defaultDataMap = 
DataMapStoreManager.getInstance().getDefaultDataMap(table);
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 99db9d3..507f6d9 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -81,15 +81,22 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       // Read the footer offset and set.
-      String splitPath = ((CarbonInputSplit) inputSplit).getFilePath();
-      if (((CarbonInputSplit) 
inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+      CarbonInputSplit carbonInputSplit = ((CarbonInputSplit) inputSplit);
+      String splitPath = carbonInputSplit.getFilePath();
+      if ((null != carbonInputSplit.getDetailInfo()
+          && carbonInputSplit.getDetailInfo().getBlockFooterOffset() == 0L) || 
(
+          null == carbonInputSplit.getDetailInfo() && 
carbonInputSplit.getStart() == 0)) {
         FileReader reader = 
FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
             taskAttemptContext.getConfiguration());
         ByteBuffer buffer = reader
             .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath),
-                ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() 
- 8,
+                ((CarbonInputSplit) inputSplit).getLength() - 8,
                 8);
-        ((CarbonInputSplit) 
inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        if (carbonInputSplit.getDetailInfo() == null) {
+          carbonInputSplit.setStart(buffer.getLong());
+        } else {
+          
carbonInputSplit.getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        }
         reader.finish();
       }
       splitList = new ArrayList<>(1);
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 0be4970..a49b233 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -565,12 +565,12 @@ class FGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   override protected def afterAll(): Unit = {
-    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql("DROP TABLE IF EXISTS datamap_testFG")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
-        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+//    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+//    sql("DROP TABLE IF EXISTS normal_test")
+//    sql("DROP TABLE IF EXISTS datamap_test")
+//    sql("DROP TABLE IF EXISTS datamap_testFG")
+//    CarbonProperties.getInstance()
+//      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+//        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
   }
 }
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index 20a2d39..0f71f8c 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -52,8 +52,7 @@ public class Util {
    */
   public static boolean 
isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> splitList) {
     for (CarbonInputSplit inputSplit : splitList) {
-      if (null == inputSplit.getDetailInfo() || null == 
inputSplit.getDetailInfo()
-          .getBlockletInfo()) {
+      if (inputSplit.isBlockCache()) {
         return true;
       }
     }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 656166d..8935b5b 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -489,7 +489,7 @@ class CarbonMergerRDD[K, V](
         var dataFileFooter: DataFileFooter = null
         if (null == rangeColumn) {
           val taskNo = getTaskNo(split, partitionTaskMap, counter)
-          var sizeOfSplit = split.getDetailInfo.getBlockSize
+          var sizeOfSplit = split.getLength
           val splitList = taskIdMapping.get(taskNo)
           noOfBlocks += 1
           if (null == splitList) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 698dd58..4acfc33 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -17,6 +17,7 @@
 package org.apache.carbondata.indexserver
 
 import java.util
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 
@@ -26,12 +27,15 @@ import org.apache.spark.util.SizeEstimator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.{AbstractDataMapJob, 
DistributableDataMapFormat}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.BinaryExpression
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, 
LogicalFilterResolverImpl, RowLevelFilterResolverImpl}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime
 
 /**
@@ -47,24 +51,38 @@ class DistributedDataMapJob extends AbstractDataMapJob {
       val messageSize = SizeEstimator.estimate(dataMapFormat)
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
+    val queryId = SparkSQLUtil.getSparkSession.sparkContext.getConf
+      .get("queryId", UUID.randomUUID().toString)
+    dataMapFormat.setQueryId(queryId)
+    val tmpFolder = CarbonUtil
+      
.createTempFolderForIndexServer(dataMapFormat.getCarbonTable.getTablePath, 
queryId)
     val (resonse, time) = logTime {
-      val spark = SparkSQLUtil.getSparkSession
-      val taskGroupId = 
spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
-        case null => ""
-        case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+      try {
+        val spark = SparkSQLUtil.getSparkSession
+        val taskGroupId = 
spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
+          case null => ""
+          case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+        }
+        val taskGroupDesc = 
spark.sparkContext.getLocalProperty("spark.job.description") match {
+          case null => ""
+          case _ => 
spark.sparkContext.getLocalProperty("spark.job.description")
+        }
+        dataMapFormat.setTaskGroupId(taskGroupId)
+        dataMapFormat.setTaskGroupDesc(taskGroupDesc)
+        var filterInf = dataMapFormat.getFilterResolverIntf
+        val filterProcessor = new FilterExpressionProcessor
+        filterInf = removeSparkUnknown(filterInf,
+          dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, 
filterProcessor)
+        dataMapFormat.setFilterResolverIntf(filterInf)
+        IndexServer.getClient.getSplits(dataMapFormat)
+          .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, 
dataMapFormat.getQueryId)
+      } finally {
+        val tmpPath = CarbonUtil
+          .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath, 
queryId)
+        if (null != tmpFolder && !tmpFolder.deleteFile(tmpPath, 
FileFactory.getFileType(tmpPath))) {
+          LOGGER.error("Problem while deleting the temp directory:" + tmpPath)
+        }
       }
-      val taskGroupDesc = 
spark.sparkContext.getLocalProperty("spark.job.description") match {
-        case null => ""
-        case _ => spark.sparkContext.getLocalProperty("spark.job.description")
-      }
-      dataMapFormat.setTaskGroupId(taskGroupId)
-      dataMapFormat.setTaskGroupDesc(taskGroupDesc)
-      var filterInf = dataMapFormat.getFilterResolverIntf
-      val filterProcessor = new FilterExpressionProcessor
-      filterInf = removeSparkUnknown(filterInf,
-        dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, 
filterProcessor)
-      dataMapFormat.setFilterResolverIntf(filterInf)
-      IndexServer.getClient.getSplits(dataMapFormat).toList.asJava
     }
     LOGGER.info(s"Time taken to get response from server: $time ms")
     resonse
@@ -108,17 +126,12 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat): 
util.List[ExtendedBlocklet] = {
     val spark = SparkSQLUtil.getSparkSession
-    val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") 
match {
-      case null => ""
-      case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
-    }
-    val taskGroupDesc = 
spark.sparkContext.getLocalProperty("spark.job.description") match {
-      case null => ""
-      case _ => spark.sparkContext.getLocalProperty("spark.job.description")
-    }
-    dataMapFormat.setTaskGroupId(taskGroupId)
-    dataMapFormat.setTaskGroupDesc(taskGroupDesc)
-    IndexServer.getSplits(dataMapFormat).toList.asJava
+    val queryId = spark.sparkContext.getConf.get("queryId", 
UUID.randomUUID().toString)
+    dataMapFormat.setQueryId(queryId)
+    dataMapFormat.setIsWriteToFile(false)
+    dataMapFormat.setFallbackJob()
+    IndexServer.getSplits(dataMapFormat)
+      .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, 
dataMapFormat.getQueryId)
   }
 
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index 607f923..2598e43 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -19,98 +19,154 @@ package org.apache.carbondata.indexserver
 
 import java.text.SimpleDateFormat
 import java.util.Date
+import java.util.concurrent.Executors
 
 import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, 
Future}
+import scala.concurrent.duration.Duration
 
-import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
 import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.DistributableDataMapFormat
+import org.apache.carbondata.core.datamap.{DataMapDistributable, 
DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, 
ExtendedBlockletWrapper}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int,
+    idx: Int,
+    val inputSplit: Seq[InputSplit],
+    location: Array[String])
   extends Partition {
 
   override def index: Int = idx
 
   override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+  def getLocations: Array[String] = {
+    location
+  }
 }
 
 private[indexserver] class DistributedPruneRDD(@transient private val ss: 
SparkSession,
     dataMapFormat: DistributableDataMapFormat)
-  extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
+  extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) {
 
   @transient private val LOGGER = 
LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
     .getName)
-
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
+  var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] 
= _
 
-  override protected def getPreferredLocations(split: Partition): Seq[String] 
= {
-    if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != 
null) {
-      split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
-    } else {
-      Seq()
-    }
+  private def groupSplits(xs: Seq[InputSplit], n: Int) = {
+    val (quot, rem) = (xs.size / n, xs.size % n)
+    val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1))
+    (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList
   }
 
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[(String, ExtendedBlocklet)] = {
+      context: TaskContext): Iterator[(String, ExtendedBlockletWrapper)] = {
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
     val attemptContext = new 
TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
-    val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
-    val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
-    reader.initialize(inputSplit, attemptContext)
-    val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
-      CacheProvider.getInstance().getCarbonCache.getCurrentSize
+    val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
+    if (dataMapFormat.isJobToClearDataMaps) {
+      // if job is to clear datamaps just clear datamaps from cache and pass 
empty iterator
+      
DataMapStoreManager.getInstance().clearInvalidDataMaps(dataMapFormat.getCarbonTable,
+        inputSplits.map(_
+          
.asInstanceOf[DataMapDistributableWrapper].getDistributable.getSegment.getSegmentNo)
+          .toList.asJava,
+        dataMapFormat
+          .getDataMapToClear)
+      val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+        CacheProvider.getInstance().getCarbonCache.getCurrentSize
+      } else {
+        0L
+      }
+      val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+        SparkEnv.get.blockManager.blockManagerId.executorId
+      }"
+      Iterator((executorIP + "_" + cacheSize, new ExtendedBlockletWrapper()))
     } else {
-      0L
-    }
-    context.addTaskCompletionListener(_ => {
-      if (reader != null) {
-        reader.close()
+      if (dataMapFormat.getInvalidSegments.size > 0) {
+        // clear the segmentMap and from cache in executor when there are 
invalid segments
+        
DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
+          dataMapFormat.getInvalidSegments)
       }
-    })
-    val iter: Iterator[(String, ExtendedBlocklet)] = new Iterator[(String, 
ExtendedBlocklet)] {
+      val startTime = System.currentTimeMillis()
+      val numOfThreads = 
CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning
 
-      private var havePair = false
-      private var finished = false
+      val service = Executors
+        .newFixedThreadPool(numOfThreads, new 
CarbonThreadFactory("IndexPruningPool", true))
+      implicit val ec: ExecutionContextExecutor = ExecutionContext
+        .fromExecutor(service)
 
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
+      val futures = if (inputSplits.length <= numOfThreads) {
+        inputSplits.map {
+          split => generateFuture(Seq(split), attemptContext)
         }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          havePair = !finished
+      } else {
+        groupSplits(inputSplits, numOfThreads).map {
+          splits => generateFuture(splits, attemptContext)
         }
-        !finished
       }
+      // scalastyle:off
+      val f = Await.result(Future.sequence(futures), Duration.Inf).flatten
+      // scalastyle:on
+      service.shutdownNow()
+      val LOGGER = 
LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName)
+      LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " +
+                  (System.currentTimeMillis() - startTime))
+      val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+        CacheProvider.getInstance().getCarbonCache.getCurrentSize
+      } else {
+        0L
+      }
+      val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+        SparkEnv.get.blockManager.blockManagerId.executorId
+      }"
+      val value = (executorIP + "_" + cacheSize.toString, new 
ExtendedBlockletWrapper(f.toList
+        .asJava, dataMapFormat.getCarbonTable.getTablePath, 
dataMapFormat.getQueryId,
+        dataMapFormat.isWriteToFile))
+      Iterator(value)
+    }
+  }
 
-      override def next(): (String, ExtendedBlocklet) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
+  private def generateFuture(split: Seq[InputSplit],
+      attemptContextImpl: TaskAttemptContextImpl)
+    (implicit executionContext: ExecutionContext) = {
+    Future {
+      split.flatMap { inputSplit =>
+        val blocklets = new java.util.ArrayList[ExtendedBlocklet]()
+        val reader = dataMapFormat.createRecordReader(inputSplit, 
attemptContextImpl)
+        reader.initialize(inputSplit, attemptContextImpl)
+        while (reader.nextKeyValue()) {
+          blocklets.add(reader.getCurrentValue)
         }
-        havePair = false
-        val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host 
}_${
-          SparkEnv.get.blockManager.blockManagerId.executorId}"
-        val value = (executorIP + "_" + cacheSize.toString, 
reader.getCurrentValue)
-        value
+        reader.close()
+        blocklets.asScala
       }
     }
-    iter
+  }
+
+  override protected def getPreferredLocations(split: Partition): Seq[String] 
= {
+    if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
+      split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
+    } else {
+      Seq()
+    }
   }
 
   override protected def internalGetPartitions: Array[Partition] = {
@@ -121,7 +177,7 @@ private[indexserver] class DistributedPruneRDD(@transient 
private val ss: SparkS
         dataMapFormat.getCarbonTable.getTableName)
     if (!isDistributedPruningEnabled || dataMapFormat.isFallbackJob || 
splits.isEmpty) {
       splits.zipWithIndex.map {
-        f => new DataMapRDDPartition(id, f._2, f._1)
+        f => new DataMapRDDPartition(id, f._2, List(f._1), f._1.getLocations)
       }.toArray
     } else {
       val executorsList: Map[String, Seq[String]] = DistributionUtil
@@ -130,7 +186,7 @@ private[indexserver] class DistributedPruneRDD(@transient 
private val ss: SparkS
         DistributedRDDUtils.getExecutors(splits.toArray, executorsList, 
dataMapFormat
           .getCarbonTable.getTableUniqueName, id)
       }
-      LOGGER.debug(s"Time taken to assign executors to ${splits.length} is 
$time ms")
+      LOGGER.debug(s"Time taken to assign executors to ${ splits.length } is 
$time ms")
       response.toArray
     }
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index c7632be..f31110b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -64,11 +64,18 @@ object DistributedRDDUtils {
     if (invalidExecutorIds.nonEmpty) {
       DistributedRDDUtils.invalidateExecutors(invalidExecutorIds.toSeq)
     }
-    (convertToPartition(legacySegments, tableUniqueName, executorsList) ++
-     convertToPartition(sortedPartitions, tableUniqueName, 
executorsList)).zipWithIndex.map {
-      case (dataMapDistributable, index) =>
-        new DataMapRDDPartition(rddId, index, dataMapDistributable)
-    }
+    val groupedPartitions = (convertToPartition(legacySegments, 
tableUniqueName, executorsList) ++
+                             convertToPartition(sortedPartitions, 
tableUniqueName, executorsList))
+      .groupBy {
+        partition =>
+          partition.getLocations.head
+      }
+    groupedPartitions.zipWithIndex.map {
+      case ((location, splitList), index) =>
+        new DataMapRDDPartition(rddId,
+          index, splitList,
+          Array(location))
+    }.toArray.sortBy(_.index)
   }
 
   private def convertToPartition(segments: Seq[InputSplit], tableUniqueName: 
String,
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
index 0069d86..172ea47 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -37,7 +37,7 @@ class DistributedShowCacheRDD(@transient private val ss: 
SparkSession, tableName
         // create a dummy split for each executor to accumulate the cache size.
         val dummySplit = new CarbonInputSplit()
         dummySplit.setLocation(Array(executor))
-        new DataMapRDDPartition(id, idx, dummySplit)
+        new DataMapRDDPartition(id, idx, List(dummySplit), Array(executor))
     }
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 9eee6d7..295ebe1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DistributableDataMapFormat
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, 
ExtendedBlockletWrapperContainer}
 import org.apache.carbondata.core.util.CarbonProperties
 
 @ProtocolInfo(protocolName = "Server", protocolVersion = 1)
@@ -42,7 +42,7 @@ trait ServerInterface {
   /**
    * Used to prune and cache the datamaps for the table.
    */
-  def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
+  def getSplits(request: DistributableDataMapFormat): 
ExtendedBlockletWrapperContainer
 
   /**
    * Get the cache size for the specified table.
@@ -99,15 +99,17 @@ object IndexServer extends ServerInterface {
     })
   }
 
-  def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] 
= doAs {
+  def getSplits(request: DistributableDataMapFormat): 
ExtendedBlockletWrapperContainer = doAs {
     sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
request.getTaskGroupId)
     sparkSession.sparkContext.setLocalProperty("spark.job.description", 
request.getTaskGroupDesc)
     val splits = new DistributedPruneRDD(sparkSession, request).collect()
-    DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+    if (!request.isFallbackJob) {
+      DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+    }
     if (request.isJobToClearDataMaps) {
       
DistributedRDDUtils.invalidateCache(request.getCarbonTable.getTableUniqueName)
     }
-    splits.map(_._2)
+    new ExtendedBlockletWrapperContainer(splits.map(_._2), 
request.isFallbackJob)
   }
 
   override def invalidateSegmentCache(databaseName: String, tableName: String,
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index bc83d2f..0c2f877 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -51,7 +51,7 @@ class InvalidateSegmentCacheRDD(@transient private val ss: 
SparkSession, databas
           // create a dummy split for each executor to accumulate the cache 
size.
           val dummySplit = new CarbonInputSplit()
           dummySplit.setLocation(Array(executor))
-          new DataMapRDDPartition(id, idx, dummySplit)
+          new DataMapRDDPartition(id, idx, List(dummySplit), Array(executor))
       }
     }
   }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 8cf477e..10c4bcb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -31,9 +31,9 @@ import 
org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -43,6 +43,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -147,7 +148,6 @@ public class CarbonCompactionUtil {
    */
   public static Map<String, List<DataFileFooter>> 
createDataFileFooterMappingForSegments(
       List<TableBlockInfo> tableBlockInfoList, boolean isSortedTable) throws 
IOException {
-
     Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new 
HashMap<>();
     for (TableBlockInfo blockInfo : tableBlockInfoList) {
       List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
@@ -159,11 +159,16 @@ public class CarbonCompactionUtil {
       // in getting the schema last updated time based on which compaction 
flow is decided that
       // whether it will go to restructure compaction flow or normal 
compaction flow.
       // This decision will impact the compaction performance so it needs to 
be decided carefully
-      final BlockletInfo blockletInfo = 
blockInfo.getDetailInfo().getBlockletInfo();
-      if (null != blockInfo.getDetailInfo() && (
-          blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L || null 
== blockletInfo
-              || null == blockletInfo.isSorted() || !blockletInfo.isSorted())) 
{
+      BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
+      if (null == blockletDetailInfo || blockletDetailInfo.getBlockletInfo() 
== null ||
+          blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L
+              || null == blockletDetailInfo.getBlockletInfo().isSorted() || 
!blockletDetailInfo
+              .getBlockletInfo().isSorted()) {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo, true);
+        if (blockletDetailInfo == null) {
+          blockletDetailInfo = 
QueryUtil.getBlockletDetailInfo(dataFileMatadata, blockInfo);
+          blockInfo.setDetailInfo(blockletDetailInfo);
+        }
         if (null == dataFileMatadata.isSorted()) {
           dataFileMatadata.setSorted(isSortedTable);
         }
@@ -181,7 +186,6 @@ public class CarbonCompactionUtil {
       }
     }
     return segmentBlockInfoMapping;
-
   }
 
   /**
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index c0e4e27..0c08eab 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.channels.Channels;
 import java.util.TimeZone;
 
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
 import org.apache.carbondata.sdk.file.Schema;
 
 import org.apache.arrow.memory.BufferAllocator;

Reply via email to