http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index 78544d3..fe0bbcf 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.datamap.examples;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
@@ -29,17 +28,18 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
 
-public class MinMaxDataWriter implements DataMapWriter {
+public class MinMaxDataWriter extends AbstractDataMapWriter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(TableInfo.class.getName());
@@ -50,17 +50,23 @@ public class MinMaxDataWriter implements DataMapWriter {
 
   private Map<Integer, BlockletMinMax> blockMinMaxMap;
 
-  private String blockPath;
+  private String dataWritePath;
 
+  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String dataWritePath) {
+    super(identifier, segmentId, dataWritePath);
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.dataWritePath = dataWritePath;
+  }
 
-  @Override public void onBlockStart(String blockId, String blockPath) {
+  @Override public void onBlockStart(String blockId) {
     pageLevelMax = null;
     pageLevelMin = null;
     blockletLevelMax = null;
     blockletLevelMin = null;
     blockMinMaxMap = null;
     blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
-    this.blockPath = blockPath;
   }
 
   @Override public void onBlockEnd(String blockId) {
@@ -161,7 +167,7 @@ public class MinMaxDataWriter implements DataMapWriter {
     List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null;
     tempMinMaxIndexBlockDetails = loadBlockDetails();
     try {
-      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId);
+      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockId);
     } catch (IOException ex) {
       LOGGER.info(" Unable to write the file");
     }
@@ -178,7 +184,6 @@ public class MinMaxDataWriter implements DataMapWriter {
       
tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
       
tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
       tmpminMaxIndexBlockDetails.setBlockletId(index);
-      tmpminMaxIndexBlockDetails.setFilePath(this.blockPath);
       minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails);
     }
     return minMaxIndexBlockDetails;
@@ -187,22 +192,19 @@ public class MinMaxDataWriter implements DataMapWriter {
   /**
    * Write the data to a file. This is JSON format file.
    * @param minMaxIndexBlockDetails
-   * @param blockPath
    * @param blockId
    * @throws IOException
    */
   public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> 
minMaxIndexBlockDetails,
-      String blockPath, String blockId) throws IOException {
-    String filePath = blockPath.substring(0, 
blockPath.lastIndexOf(File.separator) + 1) + blockId
-        + ".minmaxindex";
+      String blockId) throws IOException {
+    String filePath = dataWritePath +"/" + blockId + ".minmaxindex";
     BufferedWriter brWriter = null;
     DataOutputStream dataOutStream = null;
     try {
       FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath));
       dataOutStream = FileFactory.getDataOutputStream(filePath, 
FileFactory.getFileType(filePath));
       Gson gsonObjectToWrite = new Gson();
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, 
"UTF-8"));
       String minmaxIndexData = 
gsonObjectToWrite.toJson(minMaxIndexBlockDetails);
       brWriter.write(minmaxIndexData);
     } catch (IOException ioe) {
@@ -215,7 +217,11 @@ public class MinMaxDataWriter implements DataMapWriter {
         dataOutStream.flush();
       }
       CarbonUtil.closeStreams(brWriter, dataOutStream);
+      commitFile(filePath);
     }
   }
 
+  @Override public void finish() throws IOException {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
index 0596db5..93a453e 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
@@ -33,11 +33,6 @@ public class MinMaxIndexBlockDetails implements Serializable 
{
   private byte[][] maxValues;
 
   /**
-   * filePath pointing to the block.
-   */
-  private String filePath;
-
-  /**
    * BlockletID of the block.
    */
   private Integer BlockletId;
@@ -59,14 +54,6 @@ public class MinMaxIndexBlockDetails implements Serializable 
{
     this.maxValues = maxValues;
   }
 
-  public String getFilePath() {
-    return filePath;
-  }
-
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
-
   public Integer getBlockletId() {
     return BlockletId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 5ab6605..2240cf6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -86,6 +86,8 @@ public class CarbonInputSplit extends FileSplit
 
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
+  private String dataMapWritePath;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -97,7 +99,8 @@ public class CarbonInputSplit extends FileSplit
   }
 
   private CarbonInputSplit(String segmentId, String blockletId, Path path, 
long start, long length,
-      String[] locations, ColumnarFormatVersion version, String[] 
deleteDeltaFiles) {
+      String[] locations, ColumnarFormatVersion version, String[] 
deleteDeltaFiles,
+      String dataMapWritePath) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -110,12 +113,13 @@ public class CarbonInputSplit extends FileSplit
     this.invalidSegments = new ArrayList<>();
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
+    this.dataMapWritePath = dataMapWritePath;
   }
 
   public CarbonInputSplit(String segmentId, String blockletId, Path path, long 
start, long length,
       String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
       String[] deleteDeltaFiles) {
-    this(segmentId, blockletId, path, start, length, locations, version, 
deleteDeltaFiles);
+    this(segmentId, blockletId, path, start, length, locations, version, 
deleteDeltaFiles, null);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
@@ -165,9 +169,9 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public static CarbonInputSplit from(String segmentId, String blockletId, 
FileSplit split,
-      ColumnarFormatVersion version) throws IOException {
+      ColumnarFormatVersion version, String dataMapWritePath) throws 
IOException {
     return new CarbonInputSplit(segmentId, blockletId, split.getPath(), 
split.getStart(),
-        split.getLength(), split.getLocations(), version, null);
+        split.getLength(), split.getLocations(), version, null, 
dataMapWritePath);
   }
 
   public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> 
splitList) {
@@ -181,6 +185,7 @@ public class CarbonInputSplit extends FileSplit
                 split.getSegmentId(), split.getLocations(), split.getLength(), 
blockletInfos,
                 split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
+        blockInfo.setDataMapWriterPath(split.dataMapWritePath);
         blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
         tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
@@ -232,6 +237,10 @@ public class CarbonInputSplit extends FileSplit
       detailInfo = new BlockletDetailInfo();
       detailInfo.readFields(in);
     }
+    boolean dataMapWriterPathExists = in.readBoolean();
+    if (dataMapWriterPathExists) {
+      dataMapWritePath = in.readUTF();
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -254,6 +263,10 @@ public class CarbonInputSplit extends FileSplit
     if (detailInfo != null) {
       detailInfo.write(out);
     }
+    out.writeBoolean(dataMapWritePath != null);
+    if (dataMapWritePath != null) {
+      out.writeUTF(dataMapWritePath);
+    }
   }
 
   public List<String> getInvalidSegments() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 24f5713..ad5a1c9 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -716,16 +717,17 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     // get tokens for all the required FileSystem for table path
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
         new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, 
job.getConfiguration());
-
-    TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
-            BlockletDataMapFactory.class.getName());
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    TableDataMap blockletMap =
+        
DataMapStoreManager.getInstance().chooseDataMap(absoluteTableIdentifier);
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<String> partitionsToPrune = 
getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (dataMapJob != null) {
+    if (distributedCG || blockletMap.getDataMapFactory().getDataMapType() == 
DataMapType.FG) {
       DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, 
BlockletDataMap.NAME,
+          new DistributableDataMapFormat(absoluteTableIdentifier, 
blockletMap.getDataMapName(),
               segmentIds, partitionsToPrune,
               BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
@@ -778,7 +780,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
         
org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
             blocklet.getBlockletId(), new FileSplit(new 
Path(blocklet.getPath()), 0,
                 blocklet.getLength(), blocklet.getLocations()),
-            ColumnarFormatVersion.valueOf((short) 
blocklet.getDetailInfo().getVersionNumber()));
+            ColumnarFormatVersion.valueOf((short) 
blocklet.getDetailInfo().getVersionNumber()),
+            blocklet.getDataMapWriterPath());
     split.setDetailInfo(blocklet.getDetailInfo());
     return split;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
new file mode 100644
index 0000000..4b6f231
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, 
ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, 
AbstractCoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, 
DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
DataMapStoreManager}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.Blocklet
+import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import 
org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapName: String = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+    this.dataMapName = dataMapName
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): 
AbstractDataMapWriter = {
+    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): 
java.util.List[AbstractCoarseGrainDataMap] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
+    })
+    files.map {f =>
+      val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+
+  /**
+   * Get datamaps for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): 
java.util.List[AbstractCoarseGrainDataMap] = {
+    val mapDistributable = 
distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event): Unit = {
+    ???
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): 
java.util.List[DataMapDistributable] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d:DataMapDistributable = new 
BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(Seq("name").toList.asJava, new 
ArrayBuffer[ExpressionType]().toList.asJava)
+  }
+}
+
+class CGDataMap extends AbstractCoarseGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size-4)
+    val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, 
(Array[Byte], Array[Byte]))]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[Blocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f=>
+      new Blocklet(f._1, f._2+"")
+    }.asJava
+  }
+
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+    }
+    tuples
+  }
+
+  private def getEqualToExpression(expression: Expression, buffer: 
ArrayBuffer[Expression]): Unit = {
+    if (expression.getChildren != null) {
+      expression.getChildren.asScala.map { f =>
+        if (f.isInstanceOf[EqualToExpression]) {
+          buffer += f
+        }
+        getEqualToExpression(f, buffer)
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear() = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String,
+    dataWritePath: String,
+    dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+
+  var currentBlockId: String = null
+  val cgwritepath = dataWritePath + "/" +
+                    dataMapName + System.nanoTime() + ".datamap"
+  lazy val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+  val blockletList = new ArrayBuffer[Array[Byte]]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 
0)
+    maxMin +=
+    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as 
`indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because 
`pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[Array[Byte]]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += newBytes
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 
0)
+    blockletList += sorted.head
+    blockletList += sorted.last
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data 
will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(cgwritepath)
+  }
+
+
+}
+
+class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test 
OPTIONS('header'='false')")
+  }
+
+  test("test cg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+    sql(
+      """
+        | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age 
INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = 
CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      table.getAbsoluteTableIdentifier,
+      classOf[CGDataMapFactory].getName, "cgdatamap")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg 
OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 553e080..f694a6b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,25 +20,30 @@ package org.apache.carbondata.spark.testsuite.datamap
 import java.util
 
 import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, 
DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, 
AbstractCoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
 
-class C2DataMapFactory() extends DataMapFactory {
+class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
+
+  var identifier: AbsoluteTableIdentifier = _
 
   override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {}
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+  }
 
   override def fireEvent(event: Event): Unit = ???
 
@@ -46,13 +51,14 @@ class C2DataMapFactory() extends DataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): 
java.util.List[DataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): 
java.util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: String): 
util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = 
DataMapWriterSuite.dataMapWriterC2Mock
+  override def createWriter(segmentId: String, dataWritePath: String): 
AbstractDataMapWriter =
+    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, 
dataWritePath)
 
-  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, 
FilterType.EQUALTO)
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, 
List(ExpressionType.EQUALS).asJava)
 
   /**
    * Get all distributable objects of a segmentid
@@ -62,6 +68,7 @@ class C2DataMapFactory() extends DataMapFactory {
   override def toDistributable(segmentId: String): 
util.List[DataMapDistributable] = {
     ???
   }
+
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
@@ -164,9 +171,12 @@ class DataMapWriterSuite extends QueryTest with 
BeforeAndAfterAll {
 }
 
 object DataMapWriterSuite {
+
   var callbackSeq: Seq[String] = Seq[String]()
 
-  val dataMapWriterC2Mock = new DataMapWriter {
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: 
String,
+      dataWritePath: String) =
+    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
 
     override def onPageAdded(
         blockletId: Int,
@@ -191,9 +201,21 @@ object DataMapWriterSuite {
       callbackSeq :+= s"blocklet start $blockletId"
     }
 
-    override def onBlockStart(blockId: String, blockPath: String): Unit = {
+    /**
+     * Start of new block notification.
+     *
+     * @param blockId file name of the carbondata file
+     */
+    override def onBlockStart(blockId: String) = {
       callbackSeq :+= s"block start $blockId"
     }
 
+    /**
+     * This is called during closing of writer.So after this call no more data 
will be sent to this
+     * class.
+     */
+    override def finish() = {
+
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
new file mode 100644
index 0000000..d1bb65f
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, 
ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, 
AbstractFineGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, 
DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
DataMapStoreManager}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import 
org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapName: String = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+    this.dataMapName = dataMapName
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): 
AbstractDataMapWriter = {
+    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): 
java.util.List[AbstractFineGrainDataMap] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val dataMap: AbstractFineGrainDataMap = new FGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+  /**
+   * Get datamap for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): 
java.util.List[AbstractFineGrainDataMap]= {
+    val mapDistributable = 
distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractFineGrainDataMap = new FGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): 
java.util.List[DataMapDistributable] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d: DataMapDistributable = new 
BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event):Unit = {
+    ???
+  }
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(Seq("name").toList.asJava, new 
ArrayBuffer[ExpressionType]().toList.asJava)
+  }
+}
+
+class FGDataMap extends AbstractFineGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, 
Int)] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size - 4)
+    val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, 
footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), 
Long, Int)]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = 
{
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f =>
+      readAndFindData(f, value(0).getBytes())
+    }.filter(_.isDefined).map(_.get).asJava
+  }
+
+  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), 
Long, Int),
+      value: Array[Byte]): Option[FineGrainBlocklet] = {
+    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+    val outputStream = new 
ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(outputStream)
+    val blockletsData = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+    import scala.collection.Searching._
+    val searching = blockletsData
+      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), 
Seq(0)))(new Ordering[
+      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+      }
+    })
+    if (searching.insertionPoint >= 0) {
+      val f = blockletsData(searching.insertionPoint)
+      val pages = f._3.zipWithIndex.map { p =>
+        val pg = new FineGrainBlocklet.Page
+        pg.setPageId(p._1)
+        pg.setRowId(f._2(p._2).toArray)
+        pg
+      }
+      pages
+      Some(new FineGrainBlocklet(meta._1, meta._2.toString, 
pages.toList.asJava))
+    } else {
+      None
+    }
+
+  }
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+    }
+    tuples
+  }
+
+  def getEqualToExpression(expression: Expression, buffer: 
ArrayBuffer[Expression]): Unit = {
+    if (expression.getChildren != null) {
+      expression.getChildren.asScala.map { f =>
+        if (f.isInstanceOf[EqualToExpression]) {
+          buffer += f
+        }
+        getEqualToExpression(f, buffer)
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear():Unit = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String, dataWriterPath: String, dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+
+  var currentBlockId: String = null
+  val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+  val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, 
Int)]()
+  var position: Long = 0
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, 
r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+    var addedLast: Boolean = false
+    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], 
Seq[Int])]()
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 
0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ 
f._3)
+          addedLast = false
+        } else {
+          blockletListUpdated += oldValue
+          oldValue = (f._1, Seq(f._2), f._3)
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), f._3)
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletListUpdated += oldValue
+    }
+
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(blockletListUpdated)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    maxMin +=
+    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, 
blockletListUpdated.last
+      ._1), position, bytes.length))
+    position += bytes.length
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as 
`indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because 
`pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[(Array[Byte], Int)]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += ((newBytes, i))
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, 
r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+    var addedLast: Boolean = false
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 
0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+          addedLast = false
+        } else {
+          blockletList += oldValue
+          oldValue = (f._1, Seq(f._2), Seq(pageId))
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), Seq(pageId))
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletList += oldValue
+    }
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data 
will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(fgwritepath)
+  }
+
+
+}
+
+class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test 
OPTIONS('header'='false')")
+  }
+
+  test("test fg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = 
CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      table.getAbsoluteTableIdentifier,
+      classOf[FGDataMapFactory].getName, "fgdatamap")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 7067ef8..444d6cc 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -28,11 +28,12 @@ import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, 
DataMapWriter}
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, 
AbstractCoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
 import org.apache.carbondata.spark.exception.ConcurrentOperationException
@@ -264,7 +265,7 @@ object Global {
   var overwriteRunning = false
 }
 
-class WaitingDataMap() extends DataMapFactory {
+class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
   override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): 
Unit = { }
 
@@ -274,12 +275,12 @@ class WaitingDataMap() extends DataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): 
java.util.List[DataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): 
java.util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: String): 
util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = {
-    new DataMapWriter {
+  override def createWriter(segmentId: String, writerPath: String): 
AbstractDataMapWriter = {
+    new AbstractDataMapWriter(null, segmentId, writerPath) {
       override def onPageAdded(blockletId: Int, pageId: Int, pages: 
Array[ColumnPage]): Unit = { }
 
       override def onBlockletEnd(blockletId: Int): Unit = { }
@@ -295,10 +296,14 @@ class WaitingDataMap() extends DataMapFactory {
         // wait for 1 second to let second SQL to finish
         Thread.sleep(1000)
       }
+
+      override def finish(): Unit = {
+
+      }
     }
   }
 
-  override def getMeta: DataMapMeta = new 
DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
+  override def getMeta: DataMapMeta = new 
DataMapMeta(List("o_country").asJava, Seq(ExpressionType.EQUALS).asJava)
 
   override def toDistributable(segmentId: String): 
util.List[DataMapDistributable] = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 3dffbf6..08f73f0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -446,9 +446,10 @@ class CarbonScanRDD(
     CarbonTableInputFormat.setQuerySegment(conf, identifier)
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
-    if (CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+    CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    if (CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
       CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index b183e83..16dcecf 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -778,8 +778,9 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
             .start()
           qry.awaitTermination()
         } catch {
-          case ex =>
-            throw new Exception(ex.getMessage)
+          case ex: Throwable =>
+            LOGGER.error(ex.getMessage)
+            throw new Exception(ex.getMessage, ex)
         } finally {
           if (null != qry) {
             qry.stop()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 8e350d9..31a6701 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.datamap;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,8 +29,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.processing.store.TablePage;
@@ -43,25 +44,26 @@ public class DataMapWriterListener {
       DataMapWriterListener.class.getCanonicalName());
 
   // list indexed column name -> list of data map writer
-  private Map<List<String>, List<DataMapWriter>> registry = new 
ConcurrentHashMap<>();
+  private Map<List<String>, List<AbstractDataMapWriter>> registry = new 
ConcurrentHashMap<>();
 
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(AbsoluteTableIdentifier identifier, String 
segmentId) {
+  public void registerAllWriter(AbsoluteTableIdentifier identifier, String 
segmentId,
+      String dataWritePath) {
     List<TableDataMap> tableDataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(identifier);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();
-        register(factory, segmentId);
+        register(factory, segmentId, dataWritePath);
       }
     }
   }
 
   /**
-   * Register a DataMapWriter
+   * Register a AbstractDataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId) {
+  private void register(DataMapFactory factory, String segmentId, String 
dataWritePath) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();
@@ -70,8 +72,8 @@ public class DataMapWriterListener {
       return;
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
-    List<DataMapWriter> writers = registry.get(columns);
-    DataMapWriter writer = factory.createWriter(segmentId);
+    List<AbstractDataMapWriter> writers = registry.get(columns);
+    AbstractDataMapWriter writer = factory.createWriter(segmentId, 
dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {
@@ -79,36 +81,36 @@ public class DataMapWriterListener {
       writers.add(writer);
       registry.put(columns, writers);
     }
-    LOG.info("DataMapWriter " + writer + " added");
+    LOG.info("AbstractDataMapWriter " + writer + " added");
   }
 
   public void onBlockStart(String blockId, String blockPath) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId, blockPath);
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
+        writer.onBlockStart(blockId);
       }
     }
   }
 
   public void onBlockEnd(String blockId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockEnd(blockId);
       }
     }
   }
 
   public void onBlockletStart(int blockletId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockletStart(blockletId);
       }
     }
   }
 
   public void onBlockletEnd(int blockletId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockletEnd(blockletId);
       }
     }
@@ -121,18 +123,29 @@ public class DataMapWriterListener {
    * @param tablePage  page data
    */
   public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
-    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = 
registry.entrySet();
-    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+    Set<Map.Entry<List<String>, List<AbstractDataMapWriter>>> entries = 
registry.entrySet();
+    for (Map.Entry<List<String>, List<AbstractDataMapWriter>> entry : entries) 
{
       List<String> indexedColumns = entry.getKey();
       ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
       for (int i = 0; i < indexedColumns.size(); i++) {
         pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
       }
-      List<DataMapWriter> writers = entry.getValue();
-      for (DataMapWriter writer : writers) {
+      List<AbstractDataMapWriter> writers = entry.getValue();
+      for (AbstractDataMapWriter writer : writers) {
         writer.onPageAdded(blockletId, pageId, pages);
       }
     }
   }
 
+  /**
+   * Finish all datamap writers
+   */
+  public void finish() throws IOException {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
+        writer.finish();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index bc87823..0e9cbc5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -259,7 +260,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = 
CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableIdentifier(), 
configuration.getSegmentId());
+    listener.registerAllWriter(configuration.getTableIdentifier(), 
configuration.getSegmentId(),
+        storeLocation[new Random().nextInt(storeLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = 
configuration.getWritingCoresCount();
 
@@ -323,6 +325,12 @@ public class CarbonFactDataHandlerModel {
         segmentProperties.getDimensions(),
         segmentProperties.getMeasures());
 
+    DataMapWriterListener listener = new DataMapWriterListener();
+    listener.registerAllWriter(
+        
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        loadModel.getSegmentId(),
+        tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
+    carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     return carbonFactDataHandlerModel;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 02391cf..8d26ad2 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.store.writer;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
@@ -41,14 +39,11 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -63,7 +58,6 @@ import 
org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.io.IOUtils;
 
 public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
@@ -71,12 +65,6 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
 
   /**
-   * dfs.bytes-per-checksum
-   * HDFS checksum length, block size for a file should be exactly divisible
-   * by this value
-   */
-  private static final int HDFS_CHECKSUM_LENGTH = 512;
-  /**
    * file channel
    */
   protected FileChannel fileChannel;
@@ -208,35 +196,6 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
   }
 
   /**
-   * This method will return max of block size and file size
-   *
-   * @param blockSize
-   * @param fileSize
-   * @return
-   */
-  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
-    long maxSize = blockSize;
-    if (fileSize > blockSize) {
-      maxSize = fileSize;
-    }
-    // block size should be exactly divisible by 512 which is  maintained by 
HDFS as bytes
-    // per checksum, dfs.bytes-per-checksum=512 must divide block size
-    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
-    if (remainder > 0) {
-      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
-    }
-    // convert to make block size more readable.
-    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
-    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
-    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
-    LOGGER.info(
-        "The configured block size is " + readableBlockSize + ", the actual 
carbon file size is "
-            + readableFileSize + ", choose the max value " + readableMaxSize
-            + " as the block size on HDFS");
-    return maxSize;
-  }
-
-  /**
    * This method will be used to update the file channel with new file if 
exceeding block size
    * threshold, new file will be created once existing file reached the file 
size limit This
    * method will first check whether existing file size is exceeded the file
@@ -282,7 +241,7 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
 
   private String constructFactFileFullPath() {
     String factFilePath =
-        this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + 
this.carbonDataFileName;
+        this.model.getCarbonDataDirectoryPath() + File.separator + 
this.carbonDataFileName;
     return factFilePath;
   }
   /**
@@ -293,7 +252,9 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     if (copyInCurrentThread) {
-      copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath);
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
+          carbonDataFileTempPath, model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
     } else {
       executorServiceSubmitList.add(executorService.submit(new 
CopyThread(carbonDataFileTempPath)));
     }
@@ -446,7 +407,9 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
     }
     writer.close();
     // copy from temp to actual store location
-    copyCarbonDataFileToCarbonStorePath(fileName);
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
+            model.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
   }
 
   /**
@@ -456,80 +419,20 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   protected void closeExecutorService() throws CarbonDataWriterException {
-    executorService.shutdown();
     try {
+      listener.finish();
+      executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.HOURS);
-    } catch (InterruptedException e) {
-      throw new CarbonDataWriterException(e.getMessage());
-    }
-    for (int i = 0; i < executorServiceSubmitList.size(); i++) {
-      try {
+      for (int i = 0; i < executorServiceSubmitList.size(); i++) {
         executorServiceSubmitList.get(i).get();
-      } catch (InterruptedException e) {
-        throw new CarbonDataWriterException(e.getMessage());
-      } catch (ExecutionException e) {
-        throw new CarbonDataWriterException(e.getMessage());
       }
+    } catch (InterruptedException | ExecutionException | IOException e) {
+      LOGGER.error(e, "Error while finishing writer");
+      throw new CarbonDataWriterException(e.getMessage());
     }
   }
 
 
-  /**
-   * This method will copy the given file to carbon store location
-   *
-   * @param localFileName local file name with full path
-   * @throws CarbonDataWriterException
-   */
-  protected void copyCarbonDataFileToCarbonStorePath(String localFileName)
-      throws CarbonDataWriterException {
-    long copyStartTime = System.currentTimeMillis();
-    LOGGER.info("Copying " + localFileName + " --> " + 
model.getCarbonDataDirectoryPath());
-    try {
-      CarbonFile localCarbonFile =
-          FileFactory.getCarbonFile(localFileName, 
FileFactory.getFileType(localFileName));
-      String carbonFilePath = model.getCarbonDataDirectoryPath() + 
localFileName
-          .substring(localFileName.lastIndexOf(File.separator));
-      copyLocalFileToCarbonStore(carbonFilePath, localFileName,
-          CarbonCommonConstants.BYTEBUFFER_SIZE,
-          getMaxOfBlockAndFileSize(fileSizeInBytes, 
localCarbonFile.getSize()));
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while copying file from local store to carbon store", e);
-    }
-    LOGGER.info(
-        "Total copy time (ms) to copy file " + localFileName + " is " + 
(System.currentTimeMillis()
-            - copyStartTime));
-  }
-
-  /**
-   * This method will read the local carbon data file and write to carbon data 
file in HDFS
-   *
-   * @param carbonStoreFilePath
-   * @param localFilePath
-   * @param bufferSize
-   * @param blockSize
-   * @throws IOException
-   */
-  private void copyLocalFileToCarbonStore(String carbonStoreFilePath, String 
localFilePath,
-      int bufferSize, long blockSize) throws IOException {
-    DataOutputStream dataOutputStream = null;
-    DataInputStream dataInputStream = null;
-    try {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + 
" is " + blockSize
-            + " (bytes");
-      }
-      dataOutputStream = FileFactory
-          .getDataOutputStream(carbonStoreFilePath, 
FileFactory.getFileType(carbonStoreFilePath),
-              bufferSize, blockSize);
-      dataInputStream = FileFactory
-          .getDataInputStream(localFilePath, 
FileFactory.getFileType(localFilePath), bufferSize);
-      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
-    } finally {
-      CarbonUtil.closeStream(dataInputStream);
-      CarbonUtil.closeStream(dataOutputStream);
-    }
-  }
 
   /**
    * This method will copy the carbon data file from local store location to
@@ -554,7 +457,10 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
      * @throws Exception if unable to compute a result
      */
     @Override public Void call() throws Exception {
-      copyCarbonDataFileToCarbonStorePath(fileName);
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
+          fileName,
+          model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
       return null;
     }
 

Reply via email to