http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
new file mode 100644
index 0000000..ef169af
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.xerial.snappy.Snappy;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap 
implements Cacheable {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName());
+
+  private static int KEY_INDEX = 0;
+
+  private static int MIN_VALUES_INDEX = 1;
+
+  private static int MAX_VALUES_INDEX = 2;
+
+  private static int ROW_COUNT_INDEX = 3;
+
+  private static int FILE_PATH_INDEX = 4;
+
+  private static int PAGE_COUNT_INDEX = 5;
+
+  private static int VERSION_INDEX = 6;
+
+  private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+  private static int BLOCK_INFO_INDEX = 8;
+
+  private static int BLOCK_FOOTER_OFFSET = 9;
+
+  private static int LOCATIONS = 10;
+
+  private static int BLOCKLET_ID_INDEX = 11;
+
+  private static int BLOCK_LENGTH = 12;
+
+  private static int TASK_MIN_VALUES_INDEX = 0;
+
+  private static int TASK_MAX_VALUES_INDEX = 1;
+
+  private static int SCHEMA = 2;
+
+  private static int PARTITION_INFO = 3;
+
+  private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+  private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+
+  private SegmentProperties segmentProperties;
+
+  private int[] columnCardinality;
+
+  private boolean isPartitionedSegment;
+
+  @Override
+  public void init(DataMapModel dataMapModel) throws IOException, 
MemoryException {
+    long startTime = System.currentTimeMillis();
+    assert (dataMapModel instanceof BlockletDataMapModel);
+    BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) 
dataMapModel;
+    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+    List<DataFileFooter> indexInfo = fileFooterConverter
+        .getIndexInfo(blockletDataMapInfo.getFilePath(), 
blockletDataMapInfo.getFileData());
+    isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment();
+    DataMapRowImpl summaryRow = null;
+    byte[] schemaBinary = null;
+    // below 2 variables will be used for fetching the relative blocklet id. 
Relative blocklet ID
+    // is id assigned to a blocklet within a part file
+    String tempFilePath = null;
+    int relativeBlockletId = 0;
+    for (DataFileFooter fileFooter : indexInfo) {
+      if (segmentProperties == null) {
+        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+        schemaBinary = convertSchemaToBinary(columnInTable);
+        columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+        segmentProperties = new SegmentProperties(columnInTable, 
columnCardinality);
+        createSchema(segmentProperties);
+        createSummarySchema(segmentProperties, 
blockletDataMapInfo.getPartitions(), schemaBinary);
+      }
+      TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+      BlockMetaInfo blockMetaInfo =
+          
blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
+      // Here it loads info about all blocklets of index
+      // Only add if the file exists physically. There are scenarios which 
index file exists inside
+      // merge index but related carbondata files are deleted. In that case we 
first check whether
+      // the file exists physically or not
+      if (blockMetaInfo != null) {
+        if (fileFooter.getBlockletList() == null) {
+          // This is old store scenario, here blocklet information is not 
available in index file so
+          // load only block info
+          summaryRow =
+              loadToUnsafeBlock(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow,
+                  blockMetaInfo);
+        } else {
+          // blocklet ID will start from 0 again only when part file path is 
changed
+          if (null == tempFilePath || 
!tempFilePath.equals(blockInfo.getFilePath())) {
+            tempFilePath = blockInfo.getFilePath();
+            relativeBlockletId = 0;
+          }
+          summaryRow =
+              loadToUnsafe(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow,
+                  blockMetaInfo, relativeBlockletId);
+          // this is done because relative blocklet id need to be incremented 
based on the
+          // total number of blocklets
+          relativeBlockletId += fileFooter.getBlockletList().size();
+        }
+      }
+    }
+    if (unsafeMemoryDMStore != null) {
+      unsafeMemoryDMStore.finishWriting();
+    }
+    if (null != unsafeMemorySummaryDMStore) {
+      addTaskSummaryRowToUnsafeMemoryStore(
+          summaryRow,
+          blockletDataMapInfo.getPartitions(),
+          schemaBinary);
+      unsafeMemorySummaryDMStore.finishWriting();
+    }
+    LOGGER.info(
+        "Time taken to load blocklet datamap from file : " + 
dataMapModel.getFilePath() + "is " + (
+            System.currentTimeMillis() - startTime));
+  }
+
+  private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
+      SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow,
+      BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    // Add one row to maintain task level min max for segment pruning
+    if (!blockletList.isEmpty() && summaryRow == null) {
+      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+    }
+    for (int index = 0; index < blockletList.size(); index++) {
+      DataMapRow row = new DataMapRowImpl(schema);
+      int ordinal = 0;
+      int taskMinMaxOrdinal = 0;
+      BlockletInfo blockletInfo = blockletList.get(index);
+
+      // add start key as index key
+      
row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), 
ordinal++);
+
+      BlockletMinMaxIndex minMaxIndex = 
blockletInfo.getBlockletIndex().getMinMaxIndex();
+      byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), 
minMaxLen);
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+      // compute and set task level min values
+      addTaskMinMaxValues(summaryRow, minMaxLen,
+          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+          TASK_MIN_VALUES_INDEX, true);
+      ordinal++;
+      taskMinMaxOrdinal++;
+      byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), 
minMaxLen);
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+      // compute and set task level max values
+      addTaskMinMaxValues(summaryRow, minMaxLen,
+          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+          TASK_MAX_VALUES_INDEX, false);
+      ordinal++;
+
+      row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+      // add file path
+      byte[] filePathBytes = 
filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+      row.setByteArray(filePathBytes, ordinal++);
+
+      // add pages
+      row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+      // add version number
+      row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+      // add schema updated time
+      row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+      // add blocklet info
+      byte[] serializedData;
+      try {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutput dataOutput = new DataOutputStream(stream);
+        blockletInfo.write(dataOutput);
+        serializedData = stream.toByteArray();
+        row.setByteArray(serializedData, ordinal++);
+        // Add block footer offset, it is used if we need to read footer of 
block
+        
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal++);
+        setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
+        ordinal++;
+        // for relative blockelt id i.e blocklet id that belongs to a 
particular part file
+        row.setShort((short) relativeBlockletId++, ordinal++);
+        // Store block size
+        row.setLong(blockMetaInfo.getSize(), ordinal);
+        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return summaryRow;
+  }
+
+  private void setLocations(String[] locations, DataMapRow row, int ordinal)
+      throws UnsupportedEncodingException {
+    // Add location info
+    String locationStr = StringUtils.join(locations, ',');
+    
row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), 
ordinal);
+  }
+
+  /**
+   * Load information for the block.It is the case can happen only for old 
stores
+   * where blocklet information is not available in index file. So load only 
block information
+   * and read blocklet information in executor.
+   */
+  private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
+      SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow,
+      BlockMetaInfo blockMetaInfo) {
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
+    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    // Add one row to maintain task level min max for segment pruning
+    if (summaryRow == null) {
+      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+    }
+    DataMapRow row = new DataMapRowImpl(schema);
+    int ordinal = 0;
+    int taskMinMaxOrdinal = 0;
+    // add start key as index key
+    row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++);
+
+    BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
+    byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), 
minMaxLen);
+    row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+    // compute and set task level min values
+    addTaskMinMaxValues(summaryRow, minMaxLen,
+        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+        TASK_MIN_VALUES_INDEX, true);
+    ordinal++;
+    taskMinMaxOrdinal++;
+    byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), 
minMaxLen);
+    row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+    // compute and set task level max values
+    addTaskMinMaxValues(summaryRow, minMaxLen,
+        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+        TASK_MAX_VALUES_INDEX, false);
+    ordinal++;
+
+    row.setInt((int)fileFooter.getNumberOfRows(), ordinal++);
+
+    // add file path
+    byte[] filePathBytes = 
filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+    row.setByteArray(filePathBytes, ordinal++);
+
+    // add pages
+    row.setShort((short) 0, ordinal++);
+
+    // add version number
+    row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+    // add schema updated time
+    row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+    // add blocklet info
+    row.setByteArray(new byte[0], ordinal++);
+
+    
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal++);
+    try {
+      setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
+      ordinal++;
+      // for relative blocklet id. Value is -1 because in case of old store 
blocklet info will
+      // not be present in the index file and in that case we will not knwo 
the total number of
+      // blocklets
+      row.setShort((short) -1, ordinal++);
+
+      // store block size
+      row.setLong(blockMetaInfo.getSize(), ordinal);
+      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return summaryRow;
+  }
+
+  private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow,
+      List<String> partitions, byte[] schemaBinary) throws IOException {
+    // write the task summary info to unsafe memory store
+    if (null != summaryRow) {
+      // Add column schema , it is useful to generate segment properties in 
executor.
+      // So we no need to read footer again there.
+      if (schemaBinary != null) {
+        summaryRow.setByteArray(schemaBinary, SCHEMA);
+      }
+      if (partitions != null && partitions.size() > 0) {
+        CarbonRowSchema[] minSchemas =
+            ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore
+                .getSchema()[PARTITION_INFO]).getChildSchemas();
+        DataMapRow partitionRow = new DataMapRowImpl(minSchemas);
+        for (int i = 0; i < partitions.size(); i++) {
+          partitionRow
+              
.setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
+                  i);
+        }
+        summaryRow.setRow(partitionRow, PARTITION_INFO);
+      }
+      try {
+        unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Fill the measures min values with minimum , this is needed for backward 
version compatability
+   * as older versions don't store min values for measures
+   */
+  private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) {
+    byte[][] updatedValues = minValues;
+    if (minValues.length < minMaxLen.length) {
+      updatedValues = new byte[minMaxLen.length][];
+      System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
+      List<CarbonMeasure> measures = segmentProperties.getMeasures();
+      ByteBuffer buffer = ByteBuffer.allocate(8);
+      for (int i = 0; i < measures.size(); i++) {
+        buffer.rewind();
+        DataType dataType = measures.get(i).getDataType();
+        if (dataType == DataTypes.BYTE) {
+          buffer.putLong(Byte.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.SHORT) {
+          buffer.putLong(Short.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.INT) {
+          buffer.putLong(Integer.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.LONG) {
+          buffer.putLong(Long.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (DataTypes.isDecimal(dataType)) {
+          updatedValues[minValues.length + i] =
+              
DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
+        } else {
+          buffer.putDouble(Double.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        }
+      }
+    }
+    return updatedValues;
+  }
+
+  /**
+   * Fill the measures max values with maximum , this is needed for backward 
version compatability
+   * as older versions don't store max values for measures
+   */
+  private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) {
+    byte[][] updatedValues = maxValues;
+    if (maxValues.length < minMaxLen.length) {
+      updatedValues = new byte[minMaxLen.length][];
+      System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
+      List<CarbonMeasure> measures = segmentProperties.getMeasures();
+      ByteBuffer buffer = ByteBuffer.allocate(8);
+      for (int i = 0; i < measures.size(); i++) {
+        buffer.rewind();
+        DataType dataType = measures.get(i).getDataType();
+        if (dataType == DataTypes.BYTE) {
+          buffer.putLong(Byte.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.SHORT) {
+          buffer.putLong(Short.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.INT) {
+          buffer.putLong(Integer.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.LONG) {
+          buffer.putLong(Long.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (DataTypes.isDecimal(dataType)) {
+          updatedValues[maxValues.length + i] =
+              
DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
+        } else {
+          buffer.putDouble(Double.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        }
+      }
+    }
+    return updatedValues;
+  }
+
+  private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema 
carbonRowSchema,
+      byte[][] minValues) {
+    CarbonRowSchema[] minSchemas =
+        ((CarbonRowSchema.StructCarbonRowSchema) 
carbonRowSchema).getChildSchemas();
+    DataMapRow minRow = new DataMapRowImpl(minSchemas);
+    int minOrdinal = 0;
+    // min value adding
+    for (int i = 0; i < minMaxLen.length; i++) {
+      minRow.setByteArray(minValues[i], minOrdinal++);
+    }
+    return minRow;
+  }
+
+  /**
+   * This method will compute min/max values at task level
+   *
+   * @param taskMinMaxRow
+   * @param minMaxLen
+   * @param carbonRowSchema
+   * @param minMaxValue
+   * @param ordinal
+   * @param isMinValueComparison
+   */
+  private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
+      CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
+      boolean isMinValueComparison) {
+    DataMapRow row = taskMinMaxRow.getRow(ordinal);
+    byte[][] updatedMinMaxValues = minMaxValue;
+    if (null == row) {
+      CarbonRowSchema[] minSchemas =
+          ((CarbonRowSchema.StructCarbonRowSchema) 
carbonRowSchema).getChildSchemas();
+      row = new DataMapRowImpl(minSchemas);
+    } else {
+      byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
+      // Compare and update min max values
+      for (int i = 0; i < minMaxLen.length; i++) {
+        int compare =
+            
ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], 
minMaxValue[i]);
+        if (isMinValueComparison) {
+          if (compare < 0) {
+            updatedMinMaxValues[i] = existingMinMaxValues[i];
+          }
+        } else if (compare > 0) {
+          updatedMinMaxValues[i] = existingMinMaxValues[i];
+        }
+      }
+    }
+    int minMaxOrdinal = 0;
+    // min/max value adding
+    for (int i = 0; i < minMaxLen.length; i++) {
+      row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
+    }
+    taskMinMaxRow.setRow(row, ordinal);
+  }
+
+  private void createSchema(SegmentProperties segmentProperties) throws 
MemoryException {
+    List<CarbonRowSchema> indexSchemas = new ArrayList<>();
+
+    // Index key
+    indexSchemas.add(new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    getMinMaxSchema(segmentProperties, indexSchemas);
+
+    // for number of rows.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
+
+    // for table block path
+    indexSchemas.add(new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
+    // for number of pages.
+    indexSchemas.add(new 
CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+
+    // for version number.
+    indexSchemas.add(new 
CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+
+    // for schema updated time.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
+    //for blocklet info
+    indexSchemas.add(new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
+    // for block footer offset.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
+    // for locations
+    indexSchemas.add(new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
+    // for relative blocklet id i.e. blocklet id that belongs to a particular 
part file.
+    indexSchemas.add(new 
CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+
+    // for storing block length.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
+    unsafeMemoryDMStore =
+        new UnsafeMemoryDMStore(indexSchemas.toArray(new 
CarbonRowSchema[indexSchemas.size()]));
+  }
+
+  /**
+   * Creates the schema to store summary information or the information which 
can be stored only
+   * once per datamap. It stores datamap level max/min of each column and 
partition information of
+   * datamap
+   * @param segmentProperties
+   * @param partitions
+   * @throws MemoryException
+   */
+  private void createSummarySchema(SegmentProperties segmentProperties, 
List<String> partitions,
+      byte[] schemaBinary)
+      throws MemoryException {
+    List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
+    getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
+    // for storing column schema
+    taskMinMaxSchemas.add(
+        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, 
schemaBinary.length));
+    if (partitions != null && partitions.size() > 0) {
+      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()];
+      for (int i = 0; i < mapSchemas.length; i++) {
+        mapSchemas[i] = new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+      }
+      CarbonRowSchema mapSchema =
+          new 
CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+              mapSchemas);
+      taskMinMaxSchemas.add(mapSchema);
+    }
+    unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
+        taskMinMaxSchemas.toArray(new 
CarbonRowSchema[taskMinMaxSchemas.size()]));
+  }
+
+  private void getMinMaxSchema(SegmentProperties segmentProperties,
+      List<CarbonRowSchema> minMaxSchemas) {
+    // Index key
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    // do it 2 times, one for min and one for max.
+    for (int k = 0; k < 2; k++) {
+      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
+      for (int i = 0; i < minMaxLen.length; i++) {
+        if (minMaxLen[i] <= 0) {
+          mapSchemas[i] = new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+        } else {
+          mapSchemas[i] =
+              new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, 
minMaxLen[i]);
+        }
+      }
+      CarbonRowSchema mapSchema =
+          new 
CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+              mapSchemas);
+      minMaxSchemas.add(mapSchema);
+    }
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    FilterExecuter filterExecuter =
+        FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+    for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
+      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+      boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
+          filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+          getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+      if (isScanRequired) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties) {
+    if (unsafeMemoryDMStore.getRowCount() == 0) {
+      return new ArrayList<>();
+    }
+    // getting the start and end index key based on filter for hitting the
+    // selected block reference nodes based on filter resolver tree.
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("preparing the start and end key for finding"
+          + "start and end block as per filter resolver");
+    }
+    List<Blocklet> blocklets = new ArrayList<>();
+    Comparator<DataMapRow> comparator =
+        new BlockletDMComparator(segmentProperties.getColumnsValueSize(),
+            segmentProperties.getNumberOfSortColumns(),
+            segmentProperties.getNumberOfNoDictSortColumns());
+    List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+    FilterUtil
+        .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, 
filterExp, listOfStartEndKeys);
+    // reading the first value from list which has start key
+    IndexKey searchStartKey = listOfStartEndKeys.get(0);
+    // reading the last value from list which has end key
+    IndexKey searchEndKey = listOfStartEndKeys.get(1);
+    if (null == searchStartKey && null == searchEndKey) {
+      try {
+        // TODO need to handle for no dictionary dimensions
+        searchStartKey = 
FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+        // TODO need to handle for no dictionary dimensions
+        searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+      } catch (KeyGenException e) {
+        return null;
+      }
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Successfully retrieved the start and end key" + "Dictionary Start 
Key: " + Arrays
+              .toString(searchStartKey.getDictionaryKeys()) + "No Dictionary 
Start Key " + Arrays
+              .toString(searchStartKey.getNoDictionaryKeys()) + "Dictionary 
End Key: " + Arrays
+              .toString(searchEndKey.getDictionaryKeys()) + "No Dictionary End 
Key " + Arrays
+              .toString(searchEndKey.getNoDictionaryKeys()));
+    }
+    if (filterExp == null) {
+      int rowCount = unsafeMemoryDMStore.getRowCount();
+      for (int i = 0; i < rowCount; i++) {
+        DataMapRow safeRow = 
unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+        blocklets.add(createBlocklet(safeRow, 
safeRow.getShort(BLOCKLET_ID_INDEX)));
+      }
+    } else {
+      int startIndex = findStartIndex(convertToRow(searchStartKey), 
comparator);
+      int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
+      FilterExecuter filterExecuter =
+          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+      while (startIndex <= endIndex) {
+        DataMapRow safeRow = 
unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+        int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
+        String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
+            CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+        boolean isValid =
+            addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, 
MAX_VALUES_INDEX),
+                getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, 
blockletId);
+        if (isValid) {
+          blocklets.add(createBlocklet(safeRow, blockletId));
+        }
+        startIndex++;
+      }
+    }
+    return blocklets;
+  }
+
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties,
+      List<String> partitions) {
+    if (unsafeMemoryDMStore.getRowCount() == 0) {
+      return new ArrayList<>();
+    }
+    // First get the partitions which are stored inside datamap.
+    List<String> storedPartitions = getPartitions();
+    // if it has partitioned datamap but there is no partitioned information 
stored, it means
+    // partitions are dropped so return empty list.
+    if (isPartitionedSegment && (storedPartitions == null || 
storedPartitions.size() == 0)) {
+      return new ArrayList<>();
+    }
+    if (storedPartitions != null && storedPartitions.size() > 0) {
+      // Check the exact match of partition information inside the stored 
partitions.
+      boolean found = false;
+      if (partitions != null && partitions.size() > 0) {
+        found = partitions.containsAll(storedPartitions);
+      }
+      if (!found) {
+        return new ArrayList<>();
+      }
+    }
+    // Prune with filters if the partitions are existed in this datamap
+    return prune(filterExp, segmentProperties);
+  }
+
+  /**
+   * select the blocks based on column min and max value
+   *
+   * @param filterExecuter
+   * @param maxValue
+   * @param minValue
+   * @param filePath
+   * @param blockletId
+   * @return
+   */
+  private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, 
byte[][] maxValue,
+      byte[][] minValue, String filePath, int blockletId) {
+    BitSet bitSet = null;
+    if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
+      String uniqueBlockPath = 
filePath.substring(filePath.lastIndexOf("/Part") + 1);
+      // this case will come in case of old store where index file does not 
contain the
+      // blocklet information
+      if (blockletId != -1) {
+        uniqueBlockPath = uniqueBlockPath + 
CarbonCommonConstants.FILE_SEPARATOR + blockletId;
+      }
+      bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
+          .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, 
uniqueBlockPath);
+    } else {
+      bitSet = filterExecuter.isScanRequired(maxValue, minValue);
+    }
+    if (!bitSet.isEmpty()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
+    int index = Integer.parseInt(blockletId);
+    DataMapRow safeRow = 
unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+    return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
+  }
+
+  private byte[][] getMinMaxValue(DataMapRow row, int index) {
+    DataMapRow minMaxRow = row.getRow(index);
+    byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+    for (int i = 0; i < minMax.length; i++) {
+      minMax[i] = minMaxRow.getByteArray(i);
+    }
+    return minMax;
+  }
+
+  private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) {
+    ExtendedBlocklet blocklet = new ExtendedBlocklet(
+        new String(row.getByteArray(FILE_PATH_INDEX), 
CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
+        blockletId + "");
+    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+    detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+    detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+    detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+    detailInfo.setBlockletId((short) blockletId);
+    detailInfo.setDimLens(columnCardinality);
+    
detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+    byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+    BlockletInfo blockletInfo = null;
+    try {
+      if (byteArray.length > 0) {
+        blockletInfo = new BlockletInfo();
+        ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+        DataInputStream inputStream = new DataInputStream(stream);
+        blockletInfo.readFields(inputStream);
+        inputStream.close();
+      }
+      blocklet.setLocation(
+          new String(row.getByteArray(LOCATIONS), 
CarbonCommonConstants.DEFAULT_CHARSET)
+              .split(","));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    detailInfo.setBlockletInfo(blockletInfo);
+    blocklet.setDetailInfo(detailInfo);
+    detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
+    detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
+    detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH));
+    return blocklet;
+  }
+
+  /**
+   * Binary search used to get the first tentative index row based on
+   * search key
+   *
+   * @param key search key
+   * @return first tentative block
+   */
+  private int findStartIndex(DataMapRow key, Comparator<DataMapRow> 
comparator) {
+    int childNodeIndex;
+    int low = 0;
+    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int mid = 0;
+    int compareRes = -1;
+    //
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      // compare the entries
+      compareRes = comparator.compare(key, 
unsafeMemoryDMStore.getUnsafeRow(mid));
+      if (compareRes < 0) {
+        high = mid - 1;
+      } else if (compareRes > 0) {
+        low = mid + 1;
+      } else {
+        // if key is matched then get the first entry
+        int currentPos = mid;
+        while (currentPos - 1 >= 0
+            && comparator.compare(key, 
unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+          currentPos--;
+        }
+        mid = currentPos;
+        break;
+      }
+    }
+    // if compare result is less than zero then we
+    // and mid is more than 0 then we need to previous block as duplicates
+    // record can be present
+    if (compareRes < 0) {
+      if (mid > 0) {
+        mid--;
+      }
+      childNodeIndex = mid;
+    } else {
+      childNodeIndex = mid;
+    }
+    // get the leaf child
+    return childNodeIndex;
+  }
+
+  /**
+   * Binary search used to get the last tentative block  based on
+   * search key
+   *
+   * @param key search key
+   * @return first tentative block
+   */
+  private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+    int childNodeIndex;
+    int low = 0;
+    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int mid = 0;
+    int compareRes = -1;
+    //
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      // compare the entries
+      compareRes = comparator.compare(key, 
unsafeMemoryDMStore.getUnsafeRow(mid));
+      if (compareRes < 0) {
+        high = mid - 1;
+      } else if (compareRes > 0) {
+        low = mid + 1;
+      } else {
+        int currentPos = mid;
+        // if key is matched then get the first entry
+        while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+            && comparator.compare(key, 
unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+          currentPos++;
+        }
+        mid = currentPos;
+        break;
+      }
+    }
+    // if compare result is less than zero then we
+    // and mid is more than 0 then we need to previous block as duplicates
+    // record can be present
+    if (compareRes < 0) {
+      if (mid > 0) {
+        mid--;
+      }
+      childNodeIndex = mid;
+    } else {
+      childNodeIndex = mid;
+    }
+    return childNodeIndex;
+  }
+
+  private DataMapRow convertToRow(IndexKey key) {
+    ByteBuffer buffer =
+        ByteBuffer.allocate(key.getDictionaryKeys().length + 
key.getNoDictionaryKeys().length + 8);
+    buffer.putInt(key.getDictionaryKeys().length);
+    buffer.putInt(key.getNoDictionaryKeys().length);
+    buffer.put(key.getDictionaryKeys());
+    buffer.put(key.getNoDictionaryKeys());
+    DataMapRowImpl dataMapRow = new 
DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+    dataMapRow.setByteArray(buffer.array(), 0);
+    return dataMapRow;
+  }
+
+  private List<String> getPartitions() {
+    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    if (unsafeRow.getColumnCount() > PARTITION_INFO) {
+      List<String> partitions = new ArrayList<>();
+      DataMapRow row = unsafeRow.getRow(PARTITION_INFO);
+      for (int i = 0; i < row.getColumnCount(); i++) {
+        partitions.add(
+            new String(row.getByteArray(i), 
CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
+      }
+      return partitions;
+    }
+    return null;
+  }
+
+  private byte[] getColumnSchemaBinary() {
+    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    return unsafeRow.getByteArray(SCHEMA);
+  }
+
+  /**
+   * Convert schema to binary
+   */
+  private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) 
throws IOException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutput dataOutput = new DataOutputStream(stream);
+    dataOutput.writeShort(columnSchemas.size());
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (columnSchema.getColumnReferenceId() == null) {
+        columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
+      }
+      columnSchema.write(dataOutput);
+    }
+    byte[] byteArray = stream.toByteArray();
+    // Compress with snappy to reduce the size of schema
+    return Snappy.rawCompress(byteArray, byteArray.length);
+  }
+
+  @Override
+  public void clear() {
+    if (unsafeMemoryDMStore != null) {
+      unsafeMemoryDMStore.freeMemory();
+      unsafeMemoryDMStore = null;
+      segmentProperties = null;
+    }
+    // clear task min/max unsafe memory
+    if (null != unsafeMemorySummaryDMStore) {
+      unsafeMemorySummaryDMStore.freeMemory();
+      unsafeMemorySummaryDMStore = null;
+    }
+  }
+
+  @Override
+  public long getFileTimeStamp() {
+    return 0;
+  }
+
+  @Override
+  public int getAccessCount() {
+    return 0;
+  }
+
+  @Override
+  public long getMemorySize() {
+    long memoryUsed = 0L;
+    if (unsafeMemoryDMStore != null) {
+      memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+    }
+    if (null != unsafeMemorySummaryDMStore) {
+      memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+    }
+    return memoryUsed;
+  }
+
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
new file mode 100644
index 0000000..a2c65ba
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.IndexDataMap;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletIndexDataMapFactory extends 
AbstractCoarseGrainIndexDataMapFactory
+    implements BlockletDetailsFetcher,
+    SegmentPropertiesFetcher {
+
+  private static final String NAME = "clustered.btree.blocklet";
+
+  public static final DataMapSchema DATA_MAP_SCHEMA =
+      new DataMapSchema(NAME, BlockletIndexDataMapFactory.class.getName());
+
+  private AbsoluteTableIdentifier identifier;
+
+  // segmentId -> list of index file
+  private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new 
HashMap<>();
+
+  private Cache<TableBlockIndexUniqueIdentifier, 
AbstractCoarseGrainIndexDataMap> cache;
+
+  @Override
+  public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema) {
+    this.identifier = identifier;
+    cache = CacheProvider.getInstance()
+        .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+  }
+
+  @Override
+  public AbstractDataMapWriter createWriter(String segmentId, String 
dataWriterPath) {
+    throw new UnsupportedOperationException("not implemented");
+  }
+
+  @Override
+  public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId) 
throws IOException {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        getTableBlockIndexUniqueIdentifiers(segmentId);
+    return cache.getAll(tableBlockIndexUniqueIdentifiers);
+  }
+
+  private List<TableBlockIndexUniqueIdentifier> 
getTableBlockIndexUniqueIdentifiers(
+      String segmentId) throws IOException {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        segmentMap.get(segmentId);
+    if (tableBlockIndexUniqueIdentifiers == null) {
+      tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+      String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segmentId);
+      List<String> indexFiles = new 
SegmentIndexFileStore().getIndexFilesFromSegment(path);
+      for (int i = 0; i < indexFiles.size(); i++) {
+        tableBlockIndexUniqueIdentifiers.add(
+            new TableBlockIndexUniqueIdentifier(identifier, segmentId, 
indexFiles.get(i)));
+      }
+      segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers);
+    }
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  /**
+   * Get the blocklet detail information based on blockletid, blockid and 
segmentid. This method is
+   * exclusively for BlockletIndexDataMapFactory as detail information is only 
available in this
+   * default datamap.
+   */
+  @Override
+  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, 
String segmentId)
+      throws IOException {
+    List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
+    // If it is already detailed blocklet then type cast and return same
+    if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
+      for (Blocklet blocklet : blocklets) {
+        detailedBlocklets.add((ExtendedBlocklet) blocklet);
+      }
+      return detailedBlocklets;
+    }
+    List<TableBlockIndexUniqueIdentifier> identifiers =
+        getTableBlockIndexUniqueIdentifiers(segmentId);
+    // Retrieve each blocklets detail information from blocklet datamap
+    for (Blocklet blocklet : blocklets) {
+      detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
+    }
+    return detailedBlocklets;
+  }
+
+  @Override
+  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String 
segmentId)
+      throws IOException {
+    if (blocklet instanceof ExtendedBlocklet) {
+      return (ExtendedBlocklet) blocklet;
+    }
+    List<TableBlockIndexUniqueIdentifier> identifiers =
+        getTableBlockIndexUniqueIdentifiers(segmentId);
+    return getExtendedBlocklet(identifiers, blocklet);
+  }
+
+  private ExtendedBlocklet 
getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+      Blocklet blocklet) throws IOException {
+    String carbonIndexFileName = 
CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
+    for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
+      if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
+        IndexDataMap indexDataMap = cache.get(identifier);
+        return ((BlockletIndexDataMap) 
indexDataMap).getDetailedBlocklet(blocklet.getBlockletId());
+      }
+    }
+    throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() 
+ " not found ");
+  }
+
+
+
+  @Override
+  public List<DataMapDistributable> toDistributable(String segmentId) {
+    CarbonFile[] carbonIndexFiles = 
SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
+    List<DataMapDistributable> distributables = new ArrayList<>();
+    for (int i = 0; i < carbonIndexFiles.length; i++) {
+      Path path = new Path(carbonIndexFiles[i].getPath());
+      try {
+        FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+        LocatedFileStatus fileStatus = iter.next();
+        String[] location = fileStatus.getBlockLocations()[0].getHosts();
+        BlockletDataMapDistributable distributable =
+            new BlockletDataMapDistributable(path.getName());
+        distributable.setLocations(location);
+        distributables.add(distributable);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return distributables;
+  }
+
+  @Override public void fireEvent(Event event) {
+
+  }
+
+  @Override
+  public void clear(String segmentId) {
+    List<TableBlockIndexUniqueIdentifier> blockIndexes = 
segmentMap.remove(segmentId);
+    if (blockIndexes != null) {
+      for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+        IndexDataMap indexDataMap = cache.getIfPresent(blockIndex);
+        if (indexDataMap != null) {
+          cache.invalidate(blockIndex);
+          indexDataMap.clear();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void clear() {
+    for (String segmentId : segmentMap.keySet().toArray(new 
String[segmentMap.size()])) {
+      clear(segmentId);
+    }
+  }
+
+  @Override
+  public List<AbstractCoarseGrainIndexDataMap> 
getDataMaps(DataMapDistributable distributable)
+      throws IOException {
+    BlockletDataMapDistributable mapDistributable = 
(BlockletDataMapDistributable) distributable;
+    List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
+    if 
(mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+      identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, 
distributable.getSegmentId(),
+          mapDistributable.getFilePath()));
+    } else if 
(mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
{
+      SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+      List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(
+          CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
mapDistributable.getSegmentId())
+              + "/" + mapDistributable.getFilePath());
+      for (String indexFile : indexFiles) {
+        identifiers.add(
+            new TableBlockIndexUniqueIdentifier(identifier, 
distributable.getSegmentId(),
+                indexFile));
+      }
+    }
+    List<AbstractCoarseGrainIndexDataMap> dataMaps;
+    try {
+      dataMaps = cache.getAll(identifiers);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return dataMaps;
+  }
+
+  @Override
+  public DataMapMeta getMeta() {
+    // TODO: pass SORT_COLUMNS into this class
+    return null;
+  }
+
+  @Override public SegmentProperties getSegmentProperties(String segmentId) 
throws IOException {
+    List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId);
+    assert (dataMaps.size() > 0);
+    AbstractCoarseGrainIndexDataMap coarseGrainDataMap = dataMaps.get(0);
+    assert (coarseGrainDataMap instanceof BlockletIndexDataMap);
+    BlockletIndexDataMap dataMap = (BlockletIndexDataMap) coarseGrainDataMap;
+    return dataMap.getSegmentProperties();
+  }
+
+  @Override public List<Blocklet> getAllBlocklets(String segmentId, 
List<String> partitions)
+      throws IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId);
+    for (AbstractCoarseGrainIndexDataMap dataMap : dataMaps) {
+      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), 
partitions));
+    }
+    return blocklets;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 5b919d0..2393c54 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -768,7 +768,15 @@ public class CarbonTable implements Serializable {
   }
 
   /**
-   * whether this table has aggregation DataMap or not
+   * Return true if 'autoRefreshDataMap' is enabled, by default it is enabled
+   */
+  public boolean isAutoRefreshDataMap() {
+    String refresh = 
getTableInfo().getFactTable().getTableProperties().get("autoRefreshDataMap");
+    return refresh == null || refresh.equalsIgnoreCase("true");
+  }
+
+  /**
+   * whether this table has aggregation IndexDataMap or not
    */
   public boolean hasAggregationDataMap() {
     List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 5a9017b..ae49467 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -32,6 +32,7 @@ public class DataMapSchema implements Serializable, Writable {
 
   protected String dataMapName;
 
+  // this name can be class name of the DataMapProvider implementation or 
short name of it
   private String className;
 
   protected RelationIdentifier relationIdentifier;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
index 5729959..1c6ebad 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.core.metadata.schema.table;
 
-import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider;
 
 public class DataMapSchemaFactory {
   public static final DataMapSchemaFactory INSTANCE = new 
DataMapSchemaFactory();
@@ -24,15 +24,16 @@ public class DataMapSchemaFactory {
   /**
    * Below class will be used to get data map schema object
    * based on class name
-   * @param className
+   * @param providerName
    * @return data map schema
    */
-  public DataMapSchema getDataMapSchema(String dataMapName, String className) {
-    switch (className) {
-      case AGGREGATIONDATAMAPSCHEMA:
-        return new AggregationDataMapSchema(dataMapName, className);
-      default:
-        return new DataMapSchema(dataMapName, className);
+  public DataMapSchema getDataMapSchema(String dataMapName, String 
providerName) {
+    if 
(providerName.equalsIgnoreCase(DataMapProvider.PREAGGREGATE.toString())) {
+      return new AggregationDataMapSchema(dataMapName, providerName);
+    } else if 
(providerName.equalsIgnoreCase(DataMapProvider.TIMESERIES.toString())) {
+      return new AggregationDataMapSchema(dataMapName, providerName);
+    } else {
+      return new DataMapSchema(dataMapName, providerName);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index fff1a74..5d79abc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -285,8 +285,7 @@ public class TableSchema implements Serializable, Writable {
                 // only = is allowed as special character , so replace with &
                 CarbonCommonConstants.DEFAULT_CHARSET)).replace("=","&"));
     properties.put("QUERYTYPE", queryType);
-    DataMapSchema dataMapSchema =
-        new DataMapSchema(dataMapName, className);
+    DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
     dataMapSchema.setProperties(properties);
 
     dataMapSchema.setChildSchema(this);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
deleted file mode 100644
index 88ac3ed..0000000
--- 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import 
org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
-import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
-import org.apache.carbondata.core.util.ByteUtil;
-
-import mockit.Mock;
-import mockit.MockUp;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestBlockletDataMap extends AbstractDictionaryCacheTest {
-
-  ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor;
-  @Before public void setUp() throws Exception {
-    CarbonImplicitDimension carbonImplicitDimension =
-        new CarbonImplicitDimension(0, 
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID);
-    DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new 
DimColumnResolvedFilterInfo();
-    dimColumnEvaluatorInfo.setColumnIndex(0);
-    dimColumnEvaluatorInfo.setRowIndex(0);
-    dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension);
-    dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
-    implicitIncludeFilterExecutor =
-        new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo);
-  }
-
-  @Test public void testaddBlockBasedOnMinMaxValue() throws Exception {
-
-    new MockUp<ImplicitIncludeFilterExecutorImpl>() {
-      @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, 
byte[][] minValue,
-          String uniqueBlockPath) {
-        BitSet bitSet = new BitSet(1);
-        bitSet.set(8);
-        return bitSet;
-      }
-    };
-
-    BlockletDataMap blockletDataMap = new BlockletDataMap();
-    Method method = BlockletDataMap.class
-        .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, 
byte[][].class,
-            byte[][].class, String.class, int.class);
-    method.setAccessible(true);
-
-    byte[][] minValue = { ByteUtil.toBytes("sfds") };
-    byte[][] maxValue = { ByteUtil.toBytes("resa") };
-    Object result = method
-        .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, 
maxValue,
-            
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata",
-            0);
-    assert ((boolean) result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
new file mode 100644
index 0000000..16048db
--- /dev/null
+++ 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
@@ -0,0 +1,59 @@
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.lang.reflect.Method;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletIndexDataMap extends AbstractDictionaryCacheTest {
+
+  ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor;
+  @Before public void setUp() throws Exception {
+    CarbonImplicitDimension carbonImplicitDimension =
+        new CarbonImplicitDimension(0, 
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID);
+    DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new 
DimColumnResolvedFilterInfo();
+    dimColumnEvaluatorInfo.setColumnIndex(0);
+    dimColumnEvaluatorInfo.setRowIndex(0);
+    dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension);
+    dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
+    implicitIncludeFilterExecutor =
+        new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo);
+  }
+
+  @Test public void testaddBlockBasedOnMinMaxValue() throws Exception {
+
+    new MockUp<ImplicitIncludeFilterExecutorImpl>() {
+      @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, 
byte[][] minValue,
+          String uniqueBlockPath) {
+        BitSet bitSet = new BitSet(1);
+        bitSet.set(8);
+        return bitSet;
+      }
+    };
+
+    BlockletIndexDataMap blockletDataMap = new BlockletIndexDataMap();
+    Method method = BlockletIndexDataMap.class
+        .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, 
byte[][].class,
+            byte[][].class, String.class, int.class);
+    method.setAccessible(true);
+
+    byte[][] minValue = { ByteUtil.toBytes("sfds") };
+    byte[][] maxValue = { ByteUtil.toBytes("resa") };
+    Object result = method
+        .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, 
maxValue,
+            
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata",
+            0);
+    assert ((boolean) result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
deleted file mode 100644
index 8002e57..0000000
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.examples;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import com.google.gson.Gson;
-
-/**
- * Datamap implementation for min max blocklet.
- */
-public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
-
-  public static final String NAME = "clustered.minmax.btree.blocklet";
-
-  private String filePath;
-
-  private MinMaxIndexBlockDetails[] readMinMaxDataMap;
-
-  @Override
-  public void init(DataMapModel model) throws MemoryException, IOException {
-    this.filePath = model.getFilePath();
-    CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
-    for (int i = 0; i < listFiles.length; i++) {
-      readMinMaxDataMap = readJson(listFiles[i].getPath());
-    }
-  }
-
-  private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String 
segmentId) {
-    String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
-    CarbonFile carbonFile = FileFactory.getCarbonFile(path);
-    return carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        return file.getName().endsWith(".minmaxindex");
-      }
-    });
-  }
-
-  private MinMaxIndexBlockDetails[] readJson(String filePath) {
-    Gson gsonObjectToRead = new Gson();
-    DataInputStream dataInputStream = null;
-    BufferedReader buffReader = null;
-    InputStreamReader inStream = null;
-    MinMaxIndexBlockDetails[] readMinMax = null;
-    AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(filePath, 
FileFactory.getFileType(filePath));
-
-    try {
-      if (!FileFactory.isFileExist(filePath, 
FileFactory.getFileType(filePath))) {
-        return null;
-      }
-      dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream, "UTF-8");
-      buffReader = new BufferedReader(inStream);
-      readMinMax = gsonObjectToRead.fromJson(buffReader, 
MinMaxIndexBlockDetails[].class);
-    } catch (IOException e) {
-      return null;
-    } finally {
-      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
-    }
-    return readMinMax;
-  }
-
-  /**
-   * Block Prunning logic for Min Max DataMap.
-   *
-   * @param filterExp
-   * @param segmentProperties
-   * @return
-   */
-  @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp,
-      SegmentProperties segmentProperties, List<String> partitions) {
-    List<Blocklet> blocklets = new ArrayList<>();
-
-    if (filterExp == null) {
-      for (int i = 0; i < readMinMaxDataMap.length; i++) {
-        blocklets.add(new Blocklet(filePath, 
String.valueOf(readMinMaxDataMap[i].getBlockletId())));
-      }
-    } else {
-      FilterExecuter filterExecuter =
-          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-      int startIndex = 0;
-      while (startIndex < readMinMaxDataMap.length) {
-        BitSet bitSet = 
filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
-            readMinMaxDataMap[startIndex].getMinValues());
-        if (!bitSet.isEmpty()) {
-          blocklets.add(new Blocklet(filePath,
-              String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
-        }
-        startIndex++;
-      }
-    }
-    return blocklets;
-  }
-
-  @Override
-  public boolean isScanRequired(FilterResolverIntf filterExp) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void clear() {
-    readMinMaxDataMap = null;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
deleted file mode 100644
index 925731a..0000000
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.examples;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
-import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
-import org.apache.carbondata.events.Event;
-
-/**
- * Min Max DataMap Factory
- */
-public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
-
-  private AbsoluteTableIdentifier identifier;
-
-  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema) {
-    this.identifier = identifier;
-  }
-
-  /**
-   * createWriter will return the MinMaxDataWriter.
-   *
-   * @param segmentId
-   * @return
-   */
-  @Override public AbstractDataMapWriter createWriter(String segmentId, String 
dataWritePath) {
-    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
-  }
-
-  /**
-   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
-   *
-   * @param segmentId
-   * @return
-   * @throws IOException
-   */
-  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String 
segmentId)
-      throws IOException {
-    List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
-    // Form a dataMap of Type MinMaxDataMap.
-    MinMaxDataMap dataMap = new MinMaxDataMap();
-    try {
-      dataMap.init(new DataMapModel(
-          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + 
File.separator));
-    } catch (MemoryException ex) {
-
-    }
-    dataMapList.add(dataMap);
-    return dataMapList;
-  }
-
-  /**
-   * @param segmentId
-   * @return
-   */
-  @Override public List<DataMapDistributable> toDistributable(String 
segmentId) {
-    return null;
-  }
-
-  /**
-   * Clear the DataMap.
-   *
-   * @param segmentId
-   */
-  @Override public void clear(String segmentId) {
-  }
-
-  /**
-   * Clearing the data map.
-   */
-  @Override public void clear() {
-  }
-
-  @Override public List<AbstractCoarseGrainDataMap> 
getDataMaps(DataMapDistributable distributable)
-      throws IOException {
-    return null;
-  }
-
-  @Override public void fireEvent(Event event) {
-
-  }
-
-  @Override public DataMapMeta getMeta() {
-    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
-        new ArrayList<ExpressionType>());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
new file mode 100644
index 0000000..216000b
--- /dev/null
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.gson.Gson;
+
+/**
+ * Datamap implementation for min max blocklet.
+ */
+public class MinMaxIndexDataMap extends AbstractCoarseGrainIndexDataMap {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName());
+
+  public static final String NAME = "clustered.minmax.btree.blocklet";
+
+  private String filePath;
+
+  private MinMaxIndexBlockDetails[] readMinMaxDataMap;
+
+  @Override
+  public void init(DataMapModel model) throws MemoryException, IOException {
+    this.filePath = model.getFilePath();
+    CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
+    for (int i = 0; i < listFiles.length; i++) {
+      readMinMaxDataMap = readJson(listFiles[i].getPath());
+    }
+  }
+
+  private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String 
segmentId) {
+    String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(path);
+    return carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(".minmaxindex");
+      }
+    });
+  }
+
+  private MinMaxIndexBlockDetails[] readJson(String filePath) {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    MinMaxIndexBlockDetails[] readMinMax = null;
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(filePath, 
FileFactory.getFileType(filePath));
+
+    try {
+      if (!FileFactory.isFileExist(filePath, 
FileFactory.getFileType(filePath))) {
+        return null;
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream, "UTF-8");
+      buffReader = new BufferedReader(inStream);
+      readMinMax = gsonObjectToRead.fromJson(buffReader, 
MinMaxIndexBlockDetails[].class);
+    } catch (IOException e) {
+      return null;
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+    return readMinMax;
+  }
+
+  /**
+   * Block Prunning logic for Min Max DataMap.
+   *
+   * @param filterExp
+   * @param segmentProperties
+   * @return
+   */
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp,
+      SegmentProperties segmentProperties, List<String> partitions) {
+    List<Blocklet> blocklets = new ArrayList<>();
+
+    if (filterExp == null) {
+      for (int i = 0; i < readMinMaxDataMap.length; i++) {
+        blocklets.add(new Blocklet(filePath, 
String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+      }
+    } else {
+      FilterExecuter filterExecuter =
+          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+      int startIndex = 0;
+      while (startIndex < readMinMaxDataMap.length) {
+        BitSet bitSet = 
filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
+            readMinMaxDataMap[startIndex].getMinValues());
+        if (!bitSet.isEmpty()) {
+          blocklets.add(new Blocklet(filePath,
+              String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
+        }
+        startIndex++;
+      }
+    }
+    return blocklets;
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear() {
+    readMinMaxDataMap = null;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
new file mode 100644
index 0000000..5f714a1
--- /dev/null
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.events.Event;
+
+/**
+ * Min Max DataMap Factory
+ */
+public class MinMaxIndexDataMapFactory extends 
AbstractCoarseGrainIndexDataMapFactory {
+
+  private AbsoluteTableIdentifier identifier;
+
+  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema) {
+    this.identifier = identifier;
+  }
+
+  /**
+   * createWriter will return the MinMaxDataWriter.
+   *
+   * @param segmentId
+   * @return
+   */
+  @Override public AbstractDataMapWriter createWriter(String segmentId, String 
dataWritePath) {
+    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
+  }
+
+  /**
+   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+   *
+   * @param segmentId
+   * @return
+   * @throws IOException
+   */
+  @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String 
segmentId)
+      throws IOException {
+    List<AbstractCoarseGrainIndexDataMap> dataMapList = new ArrayList<>();
+    // Form a dataMap of Type MinMaxIndexDataMap.
+    MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
+    try {
+      dataMap.init(new DataMapModel(
+          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + 
File.separator));
+    } catch (MemoryException ex) {
+
+    }
+    dataMapList.add(dataMap);
+    return dataMapList;
+  }
+
+  /**
+   * @param segmentId
+   * @return
+   */
+  @Override public List<DataMapDistributable> toDistributable(String 
segmentId) {
+    return null;
+  }
+
+  /**
+   * Clear the DataMap.
+   *
+   * @param segmentId
+   */
+  @Override public void clear(String segmentId) {
+  }
+
+  /**
+   * Clearing the data map.
+   */
+  @Override public void clear() {
+  }
+
+  @Override public List<AbstractCoarseGrainIndexDataMap> 
getDataMaps(DataMapDistributable distributable)
+      throws IOException {
+    return null;
+  }
+
+  @Override public void fireEvent(Event event) {
+
+  }
+
+  @Override public DataMapMeta getMeta() {
+    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
+        new ArrayList<ExpressionType>());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
 
b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
index 0cfe410..59872aa 100644
--- 
a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
+++ 
b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
@@ -52,8 +52,8 @@ object MinMaxDataMapExample {
     // register datamap writer
     DataMapStoreManager.getInstance().createAndRegisterDataMap(
       AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"),
-      classOf[MinMaxDataMapFactory].getName,
-      MinMaxDataMap.NAME)
+      classOf[MinMaxIndexDataMapFactory].getName,
+      MinMaxIndexDataMap.NAME)
 
     spark.sql("DROP TABLE IF EXISTS carbonminmax")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/docs/datamap-developer-guide.md
----------------------------------------------------------------------
diff --git a/docs/datamap-developer-guide.md b/docs/datamap-developer-guide.md
new file mode 100644
index 0000000..31afd34
--- /dev/null
+++ b/docs/datamap-developer-guide.md
@@ -0,0 +1,16 @@
+# DataMap Developer Guide
+
+### Introduction
+DataMap is a data structure that can be used to accelerate certain query of 
the table. Different DataMap can be implemented by developers. 
+Currently, there are two 2 types of DataMap supported:
+1. IndexDataMap: DataMap that leveraging index to accelerate filter query
+2. MVDataMap: DataMap that leveraging Materialized View to accelerate olap 
style query, like SPJG query (select, predicate, join, groupby)
+
+### DataMap provider
+When user issues `CREATE DATAMAP dm ON TABLE main USING 'provider'`, the 
corresponding DataMapProvider implementation will be created and initialized. 
+Currently, the provider string can be:
+1. preaggregate: one type of MVDataMap that do pre-aggregate of single table
+2. timeseries: one type of MVDataMap that do pre-aggregate based on time 
dimension of the table
+3. class name IndexDataMapFactory  implementation: Developer can implement new 
type of IndexDataMap by extending IndexDataMapFactory
+
+When user issues `DROP DATAMAP dm ON TABLE main`, the corresponding 
DataMapProvider interface will be called.
\ No newline at end of file

Reply via email to