[CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Implemented interfaces for FG datamap and integrated to filterscanner to use 
the pruned bitset from FG datamap.
FG Query flow as follows.
1.The user can add FG datamap to any table and implement there interfaces.
2. Any filter query which hits the table with datamap will call prune method of 
FGdatamap.
3. The prune method of FGDatamap return list FineGrainBlocklet , these 
blocklets contain the information of block, blocklet, page and rowids 
information as well.
4. The pruned blocklets are internally wriitten to file and returns only the 
block , blocklet and filepath information as part of Splits.
5. Based on the splits scanrdd schedule the tasks.
6. In filterscanner we check the datamapwriterpath from split and reNoteads the 
bitset if exists. And pass this bitset as input to it.

This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/40259b36
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/40259b36
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/40259b36

Branch: refs/heads/fgdatamap
Commit: 40259b3628803dc8bf801efb867120f0670f8e90
Parents: a21bdbb
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Wed Nov 15 19:48:40 2017 +0530
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sat Nov 18 23:16:23 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapMeta.java    |   8 +-
 .../core/datamap/DataMapStoreManager.java       |  30 +-
 .../carbondata/core/datamap/DataMapType.java    |  21 +
 .../carbondata/core/datamap/TableDataMap.java   |  31 +-
 .../core/datamap/dev/AbstractDataMapWriter.java | 110 +++++
 .../core/datamap/dev/BlockletSerializer.java    |  57 +++
 .../carbondata/core/datamap/dev/DataMap.java    |   4 +-
 .../core/datamap/dev/DataMapFactory.java        |  14 +-
 .../core/datamap/dev/DataMapWriter.java         |  57 ---
 .../cgdatamap/AbstractCoarseGrainDataMap.java   |  24 +
 .../AbstractCoarseGrainDataMapFactory.java      |  34 ++
 .../dev/fgdatamap/AbstractFineGrainDataMap.java |  24 +
 .../AbstractFineGrainDataMapFactory.java        |  38 ++
 .../carbondata/core/datastore/DataRefNode.java  |   7 +
 .../core/datastore/block/TableBlockInfo.java    |  10 +
 .../impl/btree/AbstractBTreeLeafNode.java       |   5 +
 .../datastore/impl/btree/BTreeNonLeafNode.java  |   5 +
 .../carbondata/core/indexstore/Blocklet.java    |  30 +-
 .../core/indexstore/BlockletDetailsFetcher.java |   8 +
 .../core/indexstore/ExtendedBlocklet.java       |  19 +-
 .../core/indexstore/FineGrainBlocklet.java      | 120 +++++
 .../blockletindex/BlockletDataMap.java          |  11 +-
 .../blockletindex/BlockletDataMapFactory.java   |  62 ++-
 .../BlockletDataRefNodeWrapper.java             |  27 +-
 .../indexstore/blockletindex/IndexWrapper.java  |  18 +
 .../core/indexstore/schema/FilterType.java      |  24 -
 .../executer/ExcludeFilterExecuterImpl.java     |   3 +
 .../executer/IncludeFilterExecuterImpl.java     |   3 +
 .../core/scan/processor/BlocksChunkHolder.java  |   5 -
 .../core/scan/scanner/impl/FilterScanner.java   |   2 +
 .../apache/carbondata/core/util/CarbonUtil.java |  98 +++++
 .../datamap/examples/MinMaxDataMap.java         |  20 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |  49 ++-
 .../datamap/examples/MinMaxDataWriter.java      |  36 +-
 .../examples/MinMaxIndexBlockDetails.java       |  13 -
 .../carbondata/hadoop/CarbonInputFormat.java    |   2 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  20 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  23 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   | 357 +++++++++++++++
 .../testsuite/datamap/DataMapWriterSuite.scala  |  49 ++-
 .../testsuite/datamap/FGDataMapTestCase.scala   | 436 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   6 +-
 .../datamap/DataMapWriterListener.java          |  57 ++-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../store/writer/AbstractFactDataWriter.java    | 126 +-----
 45 files changed, 1732 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index 7746acf..dd15ccb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -19,15 +19,15 @@ package org.apache.carbondata.core.datamap;
 
 import java.util.List;
 
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 
 public class DataMapMeta {
 
   private List<String> indexedColumns;
 
-  private FilterType optimizedOperation;
+  private List<ExpressionType> optimizedOperation;
 
-  public DataMapMeta(List<String> indexedColumns, FilterType 
optimizedOperation) {
+  public DataMapMeta(List<String> indexedColumns, List<ExpressionType> 
optimizedOperation) {
     this.indexedColumns = indexedColumns;
     this.optimizedOperation = optimizedOperation;
   }
@@ -36,7 +36,7 @@ public class DataMapMeta {
     return indexedColumns;
   }
 
-  public FilterType getOptimizedOperation() {
+  public List<ExpressionType> getOptimizedOperation() {
     return optimizedOperation;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 90e5fff..8d80b4d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -56,7 +56,22 @@ public final class DataMapStoreManager {
   }
 
   public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
-    return allDataMaps.get(identifier.uniqueName());
+    return 
allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+  }
+
+  // TODO its a temporary method till chooser is implemented
+  public TableDataMap chooseDataMap(AbsoluteTableIdentifier identifier) {
+    List<TableDataMap> tableDataMaps = getAllDataMap(identifier);
+    if (tableDataMaps != null && tableDataMaps.size() > 0) {
+      for (TableDataMap dataMap: tableDataMaps) {
+        if (!dataMap.getDataMapName().equalsIgnoreCase(BlockletDataMap.NAME)) {
+          return dataMap;
+        }
+      }
+      return tableDataMaps.get(0);
+    } else {
+      return getDataMap(identifier, BlockletDataMap.NAME, 
BlockletDataMapFactory.class.getName());
+    }
   }
 
   /**
@@ -68,7 +83,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
       String dataMapName, String factoryClass) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     TableDataMap dataMap;
     if (tableDataMaps == null) {
@@ -96,7 +111,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier 
identifier,
       String factoryClassName, String dataMapName) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
@@ -149,7 +164,9 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
-    List<TableDataMap> tableDataMaps = 
allDataMaps.get(identifier.uniqueName());
+    String tableUniqueName = 
identifier.getCarbonTableIdentifier().getTableUniqueName();
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(tableUniqueName);
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap: tableDataMaps) {
@@ -158,7 +175,7 @@ public final class DataMapStoreManager {
           break;
         }
       }
-      allDataMaps.remove(identifier.uniqueName());
+      allDataMaps.remove(tableUniqueName);
     }
   }
 
@@ -167,7 +184,8 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String 
dataMapName) {
-    List<TableDataMap> tableDataMaps = 
allDataMaps.get(identifier.uniqueName());
+    List<TableDataMap> tableDataMaps =
+        
allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
     if (tableDataMaps != null) {
       int i = 0;
       for (TableDataMap tableDataMap: tableDataMaps) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
new file mode 100644
index 0000000..bf812b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+public enum DataMapType {
+  CG,FG;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 07d8eb7..1704e34 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -20,12 +20,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -75,10 +79,15 @@ public final class TableDataMap implements 
OperationEventListener {
     SegmentProperties segmentProperties;
     for (String segmentId : segmentIds) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
-      segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segmentId);
-      for (DataMap dataMap : dataMaps) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties));
+      // if filter is not passed then return all the blocklets
+      if (filterExp == null) {
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segmentId);
+      } else {
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+        segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segmentId);
+        for (DataMap dataMap : dataMaps) {
+          pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties));
+        }
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
           .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
@@ -133,9 +142,21 @@ public final class TableDataMap implements 
OperationEventListener {
       blocklets.addAll(dataMap.prune(filterExp,
           
segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId())));
     }
-    for (Blocklet blocklet: blocklets) {
+    BlockletSerializer serializer = new BlockletSerializer();
+    String writePath =
+        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + 
dataMapName;
+    if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+      FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
+    }
+    for (Blocklet blocklet : blocklets) {
       ExtendedBlocklet detailedBlocklet =
           blockletDetailsFetcher.getExtendedBlocklet(blocklet, 
distributable.getSegmentId());
+      if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+        String blockletwritePath =
+            writePath + CarbonCommonConstants.FILE_SEPARATOR + 
System.nanoTime();
+        detailedBlocklet.setDataMapWriterPath(blockletwritePath);
+        serializer.serializeBlocklet((FineGrainBlocklet) blocklet, 
blockletwritePath);
+      }
       detailedBlocklet.setSegmentId(distributable.getSegmentId());
       detailedBlocklets.add(detailedBlocklet);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
new file mode 100644
index 0000000..bcc9bad
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Data Map writer
+ */
+public abstract class AbstractDataMapWriter {
+
+  protected AbsoluteTableIdentifier identifier;
+
+  protected String segmentId;
+
+  protected String writeDirectoryPath;
+
+  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String 
segmentId,
+      String writeDirectoryPath) {
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.writeDirectoryPath = writeDirectoryPath;
+  }
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  public abstract void onBlockStart(String blockId);
+
+  /**
+   * End of block notification
+   */
+  public abstract void onBlockEnd(String blockId);
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletStart(int blockletId);
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletEnd(int blockletId);
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as 
`indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   * Implementation should copy the content of `pages` as needed, because 
`pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] 
pages);
+
+  /**
+   * This is called during closing of writer.So after this call no more data 
will be sent to this
+   * class.
+   */
+  public abstract void finish() throws IOException;
+
+  /**
+   * It copies the file from temp folder to actual folder
+   *
+   * @param dataMapFile
+   * @throws IOException
+   */
+  protected void commitFile(String dataMapFile) throws IOException {
+    if (!dataMapFile.startsWith(writeDirectoryPath)) {
+      throw new UnsupportedOperationException(
+          "Datamap file " + dataMapFile + " is not written in provided 
directory path "
+              + writeDirectoryPath);
+    }
+    String dataMapFileName =
+        dataMapFile.substring(writeDirectoryPath.length(), 
dataMapFile.length());
+    String carbonFilePath = dataMapFileName.substring(0, 
dataMapFileName.lastIndexOf("/"));
+    String segmentPath = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+    if (carbonFilePath.length() > 0) {
+      carbonFilePath = segmentPath + carbonFilePath;
+      FileFactory.mkdirs(carbonFilePath, 
FileFactory.getFileType(carbonFilePath));
+    } else {
+      carbonFilePath = segmentPath;
+    }
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(dataMapFile, 
carbonFilePath, 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
new file mode 100644
index 0000000..3d4c717
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public class BlockletSerializer {
+
+  /**
+   * Serialize and write blocklet to the file.
+   * @param grainBlocklet
+   * @param writePath
+   * @throws IOException
+   */
+  public void serializeBlocklet(FineGrainBlocklet grainBlocklet, String 
writePath)
+      throws IOException {
+    DataOutputStream dataOutputStream =
+        FileFactory.getDataOutputStream(writePath, 
FileFactory.getFileType(writePath));
+    grainBlocklet.write(dataOutputStream);
+    dataOutputStream.close();
+  }
+
+  /**
+   * Read data from filepath and deserialize blocklet.
+   * @param writePath
+   * @return
+   * @throws IOException
+   */
+  public FineGrainBlocklet deserializeBlocklet(String writePath) throws 
IOException {
+    DataInputStream inputStream =
+        FileFactory.getDataInputStream(writePath, 
FileFactory.getFileType(writePath));
+    FineGrainBlocklet blocklet = new FineGrainBlocklet();
+    blocklet.readFields(inputStream);
+    inputStream.close();
+    return blocklet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 3731922..0e705bf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -27,7 +27,7 @@ import 
org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 /**
  * Datamap is an entity which can store and retrieve index data.
  */
-public interface DataMap {
+public interface DataMap<T extends Blocklet> {
 
   /**
    * It is called to load the data map to memory or to initialize it.
@@ -41,7 +41,7 @@ public interface DataMap {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties);
+  List<T> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties);
 
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f5a7404..e900f8a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,13 +21,14 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 
 /**
  * Interface for datamap factory, it is responsible for creating the datamap.
  */
-public interface DataMapFactory {
+public interface DataMapFactory<T extends DataMap> {
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
@@ -37,17 +38,17 @@ public interface DataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  DataMapWriter createWriter(String segmentId);
+  AbstractDataMapWriter createWriter(String segmentId, String 
writeDirectoryPath);
 
   /**
    * Get the datamap for segmentid
    */
-  List<DataMap> getDataMaps(String segmentId) throws IOException;
+  List<T> getDataMaps(String segmentId) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<DataMap> getDataMaps(DataMapDistributable distributable) throws 
IOException;
+  List<T> getDataMaps(DataMapDistributable distributable) throws IOException;
 
   /**
    * Get all distributable objects of a segmentid
@@ -75,4 +76,9 @@ public interface DataMapFactory {
    * Return metadata of this datamap
    */
   DataMapMeta getMeta();
+
+  /**
+   *  Type of datamap whether it is FG or CG
+   */
+  DataMapType getDataMapType();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
deleted file mode 100644
index 413eaa5..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datamap.dev;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter {
-
-  /**
-   *  Start of new block notification.
-   *  @param blockId file name of the carbondata file
-   */
-  void onBlockStart(String blockId, String blockPath);
-
-  /**
-   * End of block notification
-   */
-  void onBlockEnd(String blockId);
-
-  /**
-   * Start of new blocklet notification.
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletStart(int blockletId);
-
-  /**
-   * End of blocklet notification
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletEnd(int blockletId);
-  /**
-   * Add the column pages row to the datamap, order of pages is same as 
`indexColumns` in
-   * DataMapMeta returned in DataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because 
`pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
new file mode 100644
index 0000000..d79d0c6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.Blocklet;
+
+public abstract class AbstractCoarseGrainDataMap implements DataMap<Blocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..9789992
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune 
method of CGdatamap.
+ *  2. The prune method of CGDatamap return list Blocklet , these blocklets 
contain the
+ *     information of block and blocklet.
+ *  3. Based on the splits scanrdd schedule the tasks.
+ */
+public abstract class AbstractCoarseGrainDataMapFactory
+    implements DataMapFactory<AbstractCoarseGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.CG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
new file mode 100644
index 0000000..310fb3b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public abstract class AbstractFineGrainDataMap implements 
DataMap<FineGrainBlocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
new file mode 100644
index 0000000..1ca7fc3
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune 
method of FGdatamap.
+ *  2. The prune method of FGDatamap return list FineGrainBlocklet , these 
blocklets contain the
+ *     information of block, blocklet, page and rowids information as well.
+ *  3. The pruned blocklets are internally wriitten to file and returns only 
the block ,
+ *    blocklet and filepath information as part of Splits.
+ *  4. Based on the splits scanrdd schedule the tasks.
+ *  5. In filterscanner we check the datamapwriterpath from split and 
reNoteads the
+ *     bitset if exists. And pass this bitset as input to it.
+ */
+public abstract class AbstractFineGrainDataMapFactory
+    implements DataMapFactory<AbstractFineGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.FG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index 8914196..4ceb762 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import 
org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Interface data block reference
@@ -124,6 +125,12 @@ public interface DataRefNode {
   BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache();
 
   /**
+   * Return the indexed data if it has any from disk which was stored by FG 
datamap.
+   * @return
+   */
+  BitSetGroup getIndexedData();
+
+  /**
    * number of pages in blocklet
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 910c9bb..a1ef45f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -85,6 +85,8 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private BlockletDetailInfo detailInfo;
 
+  private String dataMapWriterPath;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, 
String[] locations,
       long blockLength, ColumnarFormatVersion version, String[] 
deletedDeltaFilePath) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -356,4 +358,12 @@ public class TableBlockInfo implements Distributable, 
Serializable {
   public void setDetailInfo(BlockletDetailInfo detailInfo) {
     this.detailInfo = detailInfo;
   }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
index dfd35bc..7a68423 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Non leaf node abstract class
@@ -239,4 +240,8 @@ public abstract class AbstractBTreeLeafNode implements 
BTreeNode {
   public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
     return deleteDeltaDataCache;
   }
+
+  @Override public BitSetGroup getIndexedData() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
index 01c0177..d78015b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * No leaf node of a b+tree class which will keep the matadata(start key) of 
the
@@ -245,6 +246,10 @@ public class BTreeNonLeafNode implements BTreeNode {
     return deleteDeltaDataCache;
   }
 
+  public BitSetGroup getIndexedData() {
+    return null;
+  }
+
   /**
    * number of pages in blocklet
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java 
b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index d84f3f6..c731e07 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,28 +16,46 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
 /**
  * Blocklet
  */
-public class Blocklet implements Serializable {
+public class Blocklet implements Writable,Serializable {
 
-  private String path;
+  private String blockId;
 
   private String blockletId;
 
-  public Blocklet(String path, String blockletId) {
-    this.path = path;
+  public Blocklet(String blockId, String blockletId) {
+    this.blockId = blockId;
     this.blockletId = blockletId;
   }
 
-  public String getPath() {
-    return path;
+  // For serialization purpose
+  public Blocklet() {
   }
 
   public String getBlockletId() {
     return blockletId;
   }
 
+  public String getBlockId() {
+    return blockId;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeUTF(blockId);
+    out.writeUTF(blockletId);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    blockId = in.readUTF();
+    blockletId = in.readUTF();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 21ecba1..a493c06 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -44,4 +44,12 @@ public interface BlockletDetailsFetcher {
    * @throws IOException
    */
   ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) 
throws IOException;
+
+  /**
+   * Get all the blocklets in a segment
+   *
+   * @param segmentId
+   * @return
+   */
+  List<Blocklet> getAllBlocklets(String segmentId) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index e0cfefb..081da53 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -38,8 +38,13 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String[] location;
 
+  private String path;
+
+  private String dataMapWriterPath;
+
   public ExtendedBlocklet(String path, String blockletId) {
     super(path, blockletId);
+    this.path = path;
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -56,7 +61,7 @@ public class ExtendedBlocklet extends Blocklet {
    * @throws IOException
    */
   public void updateLocations() throws IOException {
-    Path path = new Path(getPath());
+    Path path = new Path(this.path);
     FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
     RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
     LocatedFileStatus fileStatus = iter.next();
@@ -79,4 +84,16 @@ public class ExtendedBlocklet extends Blocklet {
   public void setSegmentId(String segmentId) {
     this.segmentId = segmentId;
   }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
new file mode 100644
index 0000000..266120e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * FineGrainBlocklet
+ */
+public class FineGrainBlocklet extends Blocklet implements Serializable {
+
+  private List<Page> pages;
+
+  public FineGrainBlocklet(String blockId, String blockletId, List<Page> 
pages) {
+    super(blockId, blockletId);
+    this.pages = pages;
+  }
+
+  // For serialization purpose
+  public FineGrainBlocklet() {
+
+  }
+
+  public List<Page> getPages() {
+    return pages;
+  }
+
+  public static class Page implements Writable,Serializable {
+
+    private int pageId;
+
+    private int[] rowId;
+
+    public BitSet getBitSet() {
+      BitSet bitSet =
+          new 
BitSet(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      for (int row : rowId) {
+        bitSet.set(row);
+      }
+      return bitSet;
+    }
+
+    @Override public void write(DataOutput out) throws IOException {
+      out.writeInt(pageId);
+      out.writeInt(rowId.length);
+      for (int i = 0; i < rowId.length; i++) {
+        out.writeInt(rowId[i]);
+      }
+    }
+
+    @Override public void readFields(DataInput in) throws IOException {
+      pageId = in.readInt();
+      int length = in.readInt();
+      rowId = new int[length];
+      for (int i = 0; i < length; i++) {
+        rowId[i] = in.readInt();
+      }
+    }
+
+    public void setPageId(int pageId) {
+      this.pageId = pageId;
+    }
+
+    public void setRowId(int[] rowId) {
+      this.rowId = rowId;
+    }
+  }
+
+  public BitSetGroup getBitSetGroup(int numberOfPages) {
+    BitSetGroup bitSetGroup = new BitSetGroup(numberOfPages);
+    for (int i = 0; i < pages.size(); i++) {
+      bitSetGroup.setBitSet(pages.get(i).getBitSet(), pages.get(i).pageId);
+    }
+    return bitSetGroup;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    super.write(out);
+    int size = pages.size();
+    out.writeInt(size);
+    for (Page page : pages) {
+      page.write(out);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int size = in.readInt();
+    pages = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      Page page = new Page();
+      page.readFields(in);
+      pages.add(page);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 54016ad..a49e6f6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -65,7 +65,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap implements DataMap, Cacheable {
+public class BlockletDataMap extends AbstractCoarseGrainDataMap implements 
Cacheable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
@@ -315,7 +315,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
   }
 
   @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties) {
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segProps) {
 
     // getting the start and end index key based on filter for hitting the
     // selected block reference nodes based on filter resolver tree.
@@ -374,7 +374,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
         startIndex++;
       }
     }
-
     return blocklets;
   }
 
@@ -553,4 +552,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     }
   }
 
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 61e5ceb..48f8e05 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -27,9 +27,10 @@ import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -39,9 +40,6 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -53,7 +51,8 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory, 
BlockletDetailsFetcher,
+public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
+    implements BlockletDetailsFetcher,
     SegmentPropertiesFetcher {
 
   private AbsoluteTableIdentifier identifier;
@@ -61,10 +60,7 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
   // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new 
HashMap<>();
 
-  // segmentId -> SegmentProperties.
-  private Map<String, SegmentProperties> segmentPropertiesMap = new 
HashMap<>();
-
-  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainDataMap> 
cache;
 
   @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -74,12 +70,12 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public DataMapWriter createWriter(String segmentId) {
+  public AbstractDataMapWriter createWriter(String segmentId, String 
dataWriterPath) {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public List<DataMap> getDataMaps(String segmentId) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId) throws 
IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         getTableBlockIndexUniqueIdentifiers(segmentId);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -140,17 +136,18 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
 
   private ExtendedBlocklet 
getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
-    String carbonIndexFileName = 
CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
+    String carbonIndexFileName = 
CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
       if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) 
dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }
     }
-    throw new IOException("Blocklet with blockid " + blocklet.getPath() + " 
not found ");
+    throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() 
+ " not found ");
   }
 
 
+
   @Override
   public List<DataMapDistributable> toDistributable(String segmentId) {
     CarbonFile[] carbonIndexFiles = 
SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
@@ -179,7 +176,6 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
 
   @Override
   public void clear(String segmentId) {
-    segmentPropertiesMap.remove(segmentId);
     List<TableBlockIndexUniqueIdentifier> blockIndexes = 
segmentMap.remove(segmentId);
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
@@ -200,7 +196,8 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public List<DataMap> getDataMaps(DataMapDistributable distributable) throws 
IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable 
distributable)
+      throws IOException {
     BlockletDataMapDistributable mapDistributable = 
(BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
     if 
(mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
@@ -217,7 +214,7 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
                 indexFile));
       }
     }
-    List<DataMap> dataMaps;
+    List<AbstractCoarseGrainDataMap> dataMaps;
     try {
       dataMaps = cache.getAll(identifiers);
     } catch (IOException e) {
@@ -233,23 +230,20 @@ public class BlockletDataMapFactory implements 
DataMapFactory, BlockletDetailsFe
   }
 
   @Override public SegmentProperties getSegmentProperties(String segmentId) 
throws IOException {
-    SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId);
-    if (segmentProperties == null) {
-      int[] columnCardinality;
-      List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-          getTableBlockIndexUniqueIdentifiers(segmentId);
-      DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
-      List<DataFileFooter> indexInfo =
-          
fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath());
-      for (DataFileFooter fileFooter : indexInfo) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        if (segmentProperties == null) {
-          columnCardinality = 
fileFooter.getSegmentInfo().getColumnCardinality();
-          segmentProperties = new SegmentProperties(columnInTable, 
columnCardinality);
-        }
-      }
-      segmentPropertiesMap.put(segmentId, segmentProperties);
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    assert (dataMaps.size() > 0);
+    AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
+    assert (coarseGrainDataMap instanceof BlockletDataMap);
+    BlockletDataMap dataMap = (BlockletDataMap) coarseGrainDataMap;
+    return dataMap.getSegmentProperties();
+  }
+
+  @Override public List<Blocklet> getAllBlocklets(String segmentId) throws 
IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    for (AbstractCoarseGrainDataMap dataMap : dataMaps) {
+      blocklets.addAll(dataMap.prune(null, null));
     }
-    return segmentProperties;
+    return blocklets;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
index 5e0f4cf..0c83f91 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import 
org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -29,7 +30,9 @@ import 
org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory
 import 
org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import 
org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * wrapper for blocklet data map data
@@ -44,6 +47,8 @@ public class BlockletDataRefNodeWrapper implements 
DataRefNode {
 
   private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
 
+  private BlockletSerializer blockletSerializer;
+
   public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
       int[] dimensionLens) {
     this.blockInfos = blockInfos;
@@ -55,6 +60,7 @@ public class BlockletDataRefNodeWrapper implements 
DataRefNode {
     }
     this.index = index;
     this.dimensionLens = dimensionLens;
+    this.blockletSerializer = new BlockletSerializer();
   }
 
   @Override public DataRefNode getNextDataRefNode() {
@@ -130,14 +136,31 @@ public class BlockletDataRefNodeWrapper implements 
DataRefNode {
     this.deleteDeltaDataCache = deleteDeltaDataCache;
   }
 
-  @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() 
{
+  @Override
+  public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
     return deleteDeltaDataCache;
   }
 
-  @Override public int numberOfPages() {
+  @Override
+  public int numberOfPages() {
     return blockInfos.get(index).getDetailInfo().getPagesCount();
   }
 
+  @Override
+  public BitSetGroup getIndexedData() {
+    String dataMapWriterPath = blockInfos.get(index).getDataMapWriterPath();
+    if (dataMapWriterPath != null) {
+      try {
+        FineGrainBlocklet blocklet = 
blockletSerializer.deserializeBlocklet(dataMapWriterPath);
+        return blocklet.getBitSetGroup(numberOfPages());
+      } catch (IOException e) {
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
   public int numberOfNodes() {
     return blockInfos.size();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index b8cffc6..2720700 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -22,6 +22,8 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -31,7 +33,10 @@ import org.apache.carbondata.core.util.CarbonUtil;
  */
 public class IndexWrapper extends AbstractIndex {
 
+  private List<TableBlockInfo> blockInfos;
+
   public IndexWrapper(List<TableBlockInfo> blockInfos) {
+    this.blockInfos = blockInfos;
     DataFileFooter fileFooter = null;
     try {
       fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
@@ -46,4 +51,17 @@ public class IndexWrapper extends AbstractIndex {
 
   @Override public void buildIndex(List<DataFileFooter> footerList) {
   }
+
+  @Override public void clear() {
+    super.clear();
+    if (blockInfos != null) {
+      for (TableBlockInfo blockInfo : blockInfos) {
+        String dataMapWriterPath = blockInfo.getDataMapWriterPath();
+        if (dataMapWriterPath != null) {
+          CarbonFile file = FileFactory.getCarbonFile(dataMapWriterPath);
+          FileFactory.deleteAllCarbonFilesOfDir(file);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
deleted file mode 100644
index 9d77010..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.schema;
-
-/**
- * Types of filters of select query
- */
-public enum FilterType {
-  EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index f680579..886b12b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -313,6 +313,9 @@ public class ExcludeFilterExecuterImpl implements 
FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       BitSetGroup prvBitSetGroup, int pageNumber, int numberOfRows) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet();
     bitSet.or(prvPageBitSet);
     byte[][] filterKeys = dimColumnExecuterInfo.getExcludeFilterKeys();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index fe1421c..0385c73 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -332,6 +332,9 @@ public class IncludeFilterExecuterImpl implements 
FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       BitSetGroup prvBitSetGroup, int pageNumber, int numberOfRows) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet(numberOfRows);
     byte[][] filterKeys = dimColumnExecuterInfo.getFilterKeys();
     int compareResult = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
index 60090d0..0ed9137 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
@@ -50,11 +50,6 @@ public class BlocksChunkHolder {
 
   private BitSetGroup bitSetGroup;
 
-  public BlocksChunkHolder(int numberOfDimensionBlock, int 
numberOfMeasureBlock) {
-    dimensionRawDataChunk = new 
DimensionRawColumnChunk[numberOfDimensionBlock];
-    measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
-  }
-
   public BlocksChunkHolder(int numberOfDimensionBlock, int 
numberOfMeasureBlock,
       FileHolder fileReader) {
     dimensionRawDataChunk = new 
DimensionRawColumnChunk[numberOfDimensionBlock];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index abfc5f4..6e8076e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -153,6 +153,8 @@ public class FilterScanner extends AbstractBlockletScanner {
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     
totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
         totalBlockletStatistic.getCount() + 1);
+    // set the indexed data if it has any during fgdatamap pruning.
+    
blocksChunkHolder.setBitSetGroup(blocksChunkHolder.getDataBlock().getIndexedData());
     // apply filter on actual data
     BitSetGroup bitSetGroup = 
this.filterExecuter.applyFilter(blocksChunkHolder, useBitSetPipeLine);
     // if indexes is empty then return with empty result

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e709df7..6c59f07 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -52,6 +53,7 @@ import 
org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -88,6 +90,7 @@ import com.google.gson.Gson;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -120,6 +123,13 @@ public final class CarbonUtil {
 
   private static final Configuration conf = new Configuration(true);
 
+  /**
+   * dfs.bytes-per-checksum
+   * HDFS checksum length, block size for a file should be exactly divisible
+   * by this value
+   */
+  private static final int HDFS_CHECKSUM_LENGTH = 512;
+
   private CarbonUtil() {
 
   }
@@ -2119,5 +2129,93 @@ public final class CarbonUtil {
     return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + 
carbonTableIdentifier
         .getTableName();
   }
+
+
+  /**
+   * This method will copy the given file to carbon store location
+   *
+   * @param localFilePath local file name with full path
+   * @throws CarbonDataWriterException
+   */
+  public static void copyCarbonDataFileToCarbonStorePath(String localFilePath,
+      String carbonDataDirectoryPath, long fileSizeInBytes)
+      throws CarbonDataWriterException {
+    long copyStartTime = System.currentTimeMillis();
+    LOGGER.info("Copying " + localFilePath + " --> " + 
carbonDataDirectoryPath);
+    try {
+      CarbonFile localCarbonFile =
+          FileFactory.getCarbonFile(localFilePath, 
FileFactory.getFileType(localFilePath));
+      String carbonFilePath = carbonDataDirectoryPath + localFilePath
+          .substring(localFilePath.lastIndexOf(File.separator));
+      copyLocalFileToCarbonStore(carbonFilePath, localFilePath,
+          CarbonCommonConstants.BYTEBUFFER_SIZE,
+          getMaxOfBlockAndFileSize(fileSizeInBytes, 
localCarbonFile.getSize()));
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while copying file from local store to carbon store", e);
+    }
+    LOGGER.info(
+        "Total copy time (ms) to copy file " + localFilePath + " is " + 
(System.currentTimeMillis()
+            - copyStartTime));
+  }
+
+  /**
+   * This method will read the local carbon data file and write to carbon data 
file in HDFS
+   *
+   * @param carbonStoreFilePath
+   * @param localFilePath
+   * @param bufferSize
+   * @param blockSize
+   * @throws IOException
+   */
+  private static void copyLocalFileToCarbonStore(String carbonStoreFilePath, 
String localFilePath,
+      int bufferSize, long blockSize) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + 
" is " + blockSize
+            + " (bytes");
+      }
+      dataOutputStream = FileFactory
+          .getDataOutputStream(carbonStoreFilePath, 
FileFactory.getFileType(carbonStoreFilePath),
+              bufferSize, blockSize);
+      dataInputStream = FileFactory
+          .getDataInputStream(localFilePath, 
FileFactory.getFileType(localFilePath), bufferSize);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
+    } finally {
+      CarbonUtil.closeStream(dataInputStream);
+      CarbonUtil.closeStream(dataOutputStream);
+    }
+  }
+
+  /**
+   * This method will return max of block size and file size
+   *
+   * @param blockSize
+   * @param fileSize
+   * @return
+   */
+  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
+    long maxSize = blockSize;
+    if (fileSize > blockSize) {
+      maxSize = fileSize;
+    }
+    // block size should be exactly divisible by 512 which is  maintained by 
HDFS as bytes
+    // per checksum, dfs.bytes-per-checksum=512 must divide block size
+    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
+    if (remainder > 0) {
+      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
+    }
+    // convert to make block size more readable.
+    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
+    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
+    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
+    LOGGER.info(
+        "The configured block size is " + readableBlockSize + ", the actual 
carbon file size is "
+            + readableFileSize + ", choose the max value " + readableMaxSize
+            + " as the block size on HDFS");
+    return maxSize;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
index 2ad6327..605796f 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
@@ -28,7 +28,8 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -47,7 +48,7 @@ import com.google.gson.Gson;
 /**
  * Datamap implementation for min max blocklet.
  */
-public class MinMaxDataMap implements DataMap {
+public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
@@ -58,8 +59,8 @@ public class MinMaxDataMap implements DataMap {
 
   private MinMaxIndexBlockDetails[] readMinMaxDataMap;
 
-  @Override public void init(String filePath) throws MemoryException, 
IOException {
-    this.filePath = filePath;
+  @Override public void init(DataMapModel model) throws MemoryException, 
IOException {
+    this.filePath = model.getFilePath();
     CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
     for (int i = 0; i < listFiles.length; i++) {
       readMinMaxDataMap = readJson(listFiles[i].getPath());
@@ -90,8 +91,7 @@ public class MinMaxDataMap implements DataMap {
         return null;
       }
       dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      inStream = new InputStreamReader(dataInputStream, "UTF-8");
       buffReader = new BufferedReader(inStream);
       readMinMax = gsonObjectToRead.fromJson(buffReader, 
MinMaxIndexBlockDetails[].class);
     } catch (IOException e) {
@@ -115,8 +115,7 @@ public class MinMaxDataMap implements DataMap {
 
     if (filterExp == null) {
       for (int i = 0; i < readMinMaxDataMap.length; i++) {
-        blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(),
-            String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+        blocklets.add(new Blocklet(filePath, 
String.valueOf(readMinMaxDataMap[i].getBlockletId())));
       }
     } else {
       FilterExecuter filterExecuter =
@@ -126,7 +125,7 @@ public class MinMaxDataMap implements DataMap {
         BitSet bitSet = 
filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
             readMinMaxDataMap[startIndex].getMinValues());
         if (!bitSet.isEmpty()) {
-          blocklets.add(new 
Blocklet(readMinMaxDataMap[startIndex].getFilePath(),
+          blocklets.add(new Blocklet(filePath,
               String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
         }
         startIndex++;
@@ -135,8 +134,7 @@ public class MinMaxDataMap implements DataMap {
     return blocklets;
   }
 
-  @Override
-  public void clear() {
+  @Override public void clear() {
     readMinMaxDataMap = null;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40259b36/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index b196d0d..5203cb3 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -25,49 +25,51 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.events.Event;
 
 /**
  * Min Max DataMap Factory
  */
-public class MinMaxDataMapFactory implements DataMapFactory {
+public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
-  @Override
-  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  @Override public void init(AbsoluteTableIdentifier identifier, String 
dataMapName) {
     this.identifier = identifier;
   }
 
   /**
    * createWriter will return the MinMaxDataWriter.
+   *
    * @param segmentId
    * @return
    */
-  @Override
-  public DataMapWriter createWriter(String segmentId) {
-    return new MinMaxDataWriter();
+  @Override public AbstractDataMapWriter createWriter(String segmentId, String 
dataWritePath) {
+    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
   }
 
   /**
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+   *
    * @param segmentId
    * @return
    * @throws IOException
    */
-  @Override public List<DataMap> getDataMaps(String segmentId) throws 
IOException {
-    List<DataMap> dataMapList = new ArrayList<>();
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String 
segmentId)
+      throws IOException {
+    List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxDataMap.
     MinMaxDataMap dataMap = new MinMaxDataMap();
     try {
-      dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + 
segmentId + File.separator);
+      dataMap.init(new DataMapModel(
+          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + 
File.separator));
     } catch (MemoryException ex) {
 
     }
@@ -76,7 +78,6 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   }
 
   /**
-   *
    * @param segmentId
    * @return
    */
@@ -86,6 +87,7 @@ public class MinMaxDataMapFactory implements DataMapFactory {
 
   /**
    * Clear the DataMap.
+   *
    * @param segmentId
    */
   @Override public void clear(String segmentId) {
@@ -94,21 +96,20 @@ public class MinMaxDataMapFactory implements DataMapFactory 
{
   /**
    * Clearing the data map.
    */
-  @Override
-  public void clear() {
+  @Override public void clear() {
   }
 
-  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+  @Override public List<AbstractCoarseGrainDataMap> 
getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     return null;
   }
 
-  @Override
-  public void fireEvent(ChangeEvent event) {
+  @Override public void fireEvent(Event event) {
 
   }
 
-  @Override
-  public DataMapMeta getMeta() {
-    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), 
FilterType.EQUALTO);
+  @Override public DataMapMeta getMeta() {
+    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
+        new ArrayList<ExpressionType>());
   }
 }
\ No newline at end of file

Reply via email to