Repository: carbondata
Updated Branches:
  refs/heads/master 668bfdd50 -> 8b33ab240


[CARBONDATA-2376] Improve Lucene datamap performance by eliminating blockid 
while writing and reading index

Problem:
Currently DataMap interface implementations use blockid and blockletid while 
writing index files, Actually blockid is not needed to store in index files as 
it only requires blockletid. So it adds more memory and disk size to write 
index files.

Solution:
Use taskname as index name to identify the indexname. And filter the blocklets 
directly by avoiding blockids.And pass the taskName as indexname to identify 
the blockid from blocletdatamap.

Corrected the implementations of LuceneDatamap, BloomFilterDataMap, CGDataMap, 
FGDataMap and MinMaxDataMap

This closes #2206


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b33ab24
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b33ab24
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b33ab24

Branch: refs/heads/master
Commit: 8b33ab240126e999e9196369025917370172eee4
Parents: 668bfdd
Author: ravipesala <[email protected]>
Authored: Sun Apr 22 10:19:53 2018 +0530
Committer: Jacky Li <[email protected]>
Committed: Mon Apr 23 16:28:35 2018 +0800

----------------------------------------------------------------------
 .../apache/carbondata/core/datamap/Segment.java | 12 +--
 .../core/datamap/dev/DataMapWriter.java         |  2 +-
 .../dev/fgdatamap/FineGrainBlocklet.java        |  4 +-
 .../carbondata/core/indexstore/Blocklet.java    | 18 ++--
 .../core/indexstore/ExtendedBlocklet.java       |  2 +-
 .../blockletindex/BlockletDataMapFactory.java   |  3 +-
 .../table/DiskBasedDMSchemaStorageProvider.java |  4 +-
 .../core/util/path/CarbonTablePath.java         | 11 ++-
 .../datamap/bloom/BloomCoarseGrainDataMap.java  | 22 ++---
 .../bloom/BloomCoarseGrainDataMapFactory.java   | 15 ++--
 .../carbondata/datamap/bloom/BloomDMModel.java  |  9 +-
 .../datamap/bloom/BloomDataMapWriter.java       | 43 +++++-----
 .../bloom/BloomCoarseGrainDataMapSuite.scala    |  4 +-
 datamap/examples/pom.xml                        |  5 ++
 .../datamap/examples/MinMaxDataWriter.java      | 11 ++-
 .../datamap/examples/MinMaxDataMapSuite.scala   |  2 -
 .../lucene/LuceneDataMapFactoryBase.java        |  6 +-
 .../datamap/lucene/LuceneDataMapWriter.java     | 53 ++++--------
 .../datamap/lucene/LuceneFineGrainDataMap.java  | 90 ++++++++++----------
 .../hadoop/api/CarbonInputFormat.java           |  7 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   | 52 +++++------
 .../testsuite/datamap/DataMapWriterSuite.scala  |  2 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   | 59 +++++++------
 .../testsuite/datamap/TestDataMapStatus.scala   |  2 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  2 +-
 .../datamap/DataMapWriterListener.java          |  4 +-
 .../store/writer/AbstractFactDataWriter.java    |  5 +-
 27 files changed, 218 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index ce0b90b..251ea38 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -42,9 +42,9 @@ public class Segment implements Serializable {
   private String segmentFileName;
 
   /**
-   * List of index files which are already got filtered through CG index 
operation.
+   * List of index shards which are already got filtered through CG index 
operation.
    */
-  private Set<String> filteredIndexFiles = new HashSet<>();
+  private Set<String> filteredIndexShardNames = new HashSet<>();
 
   /**
    * Points to the Read Committed Scope of the segment. This is a flavor of
@@ -149,12 +149,12 @@ public class Segment implements Serializable {
     return null;
   }
 
-  public Set<String> getFilteredIndexFiles() {
-    return filteredIndexFiles;
+  public Set<String> getFilteredIndexShardNames() {
+    return filteredIndexShardNames;
   }
 
-  public void setFilteredIndexFile(String filteredIndexFile) {
-    this.filteredIndexFiles.add(filteredIndexFile);
+  public void setFilteredIndexShardName(String filteredIndexShardName) {
+    this.filteredIndexShardNames.add(filteredIndexShardName);
   }
 
   @Override public boolean equals(Object o) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 29670a1..1933f70 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -54,7 +54,7 @@ public abstract class DataMapWriter {
    *
    * @param blockId file name of the carbondata file
    */
-  public abstract void onBlockStart(String blockId, long taskId) throws 
IOException;
+  public abstract void onBlockStart(String blockId, String indexShardName) 
throws IOException;
 
   /**
    * End of block notification

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
index b42500f..9c78cc8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
@@ -41,8 +41,8 @@ public class FineGrainBlocklet extends Blocklet implements 
Serializable {
 
   private List<Page> pages;
 
-  public FineGrainBlocklet(String blockId, String blockletId, List<Page> 
pages) {
-    super(blockId, blockletId);
+  public FineGrainBlocklet(String taskName, String blockletId, List<Page> 
pages) {
+    super(taskName, blockletId);
     this.pages = pages;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java 
b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index c3eda6b..9b40be4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -29,13 +29,13 @@ import 
org.apache.carbondata.core.metadata.schema.table.Writable;
 public class Blocklet implements Writable,Serializable {
 
   /** file path of this blocklet */
-  private String blockId;
+  private String taskName;
 
   /** id to identify the blocklet inside the block (it is a sequential number) 
*/
   private String blockletId;
 
-  public Blocklet(String blockId, String blockletId) {
-    this.blockId = blockId;
+  public Blocklet(String taskName, String blockletId) {
+    this.taskName = taskName;
     this.blockletId = blockletId;
   }
 
@@ -47,17 +47,17 @@ public class Blocklet implements Writable,Serializable {
     return blockletId;
   }
 
-  public String getBlockId() {
-    return blockId;
+  public String getTaskName() {
+    return taskName;
   }
 
   @Override public void write(DataOutput out) throws IOException {
-    out.writeUTF(blockId);
+    out.writeUTF(taskName);
     out.writeUTF(blockletId);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
-    blockId = in.readUTF();
+    taskName = in.readUTF();
     blockletId = in.readUTF();
   }
 
@@ -67,7 +67,7 @@ public class Blocklet implements Writable,Serializable {
 
     Blocklet blocklet = (Blocklet) o;
 
-    if (blockId != null ? !blockId.equals(blocklet.blockId) : blocklet.blockId 
!= null) {
+    if (taskName != null ? !taskName.equals(blocklet.taskName) : 
blocklet.taskName != null) {
       return false;
     }
     return blockletId != null ?
@@ -76,7 +76,7 @@ public class Blocklet implements Writable,Serializable {
   }
 
   @Override public int hashCode() {
-    int result = blockId != null ? blockId.hashCode() : 0;
+    int result = taskName != null ? taskName.hashCode() : 0;
     result = 31 * result + (blockletId != null ? blockletId.hashCode() : 0);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index d2af5cb..ea2752c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -66,7 +66,7 @@ public class ExtendedBlocklet extends Blocklet {
   }
 
   public String getPath() {
-    return getBlockId();
+    return getTaskName();
   }
 
   public String getDataMapWriterPath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index caac733..7c6427d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -145,9 +145,8 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
 
   private ExtendedBlocklet 
getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
-    String carbonIndexFileName = 
CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getIndexFileName().equals(carbonIndexFileName)) {
+      if (identifier.getIndexFileName().startsWith(blocklet.getTaskName())) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) 
dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index 9168f55..e1a929c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -108,8 +108,8 @@ public class DiskBasedDMSchemaStorageProvider implements 
DataMapSchemaStoragePro
     for (DataMapSchema dataMapSchema : this.dataMapSchemas) {
       List<RelationIdentifier> parentTables = dataMapSchema.getParentTables();
       for (RelationIdentifier identifier : parentTables) {
-        if (identifier.getTableName().equals(carbonTable.getTableName()) &&
-            
identifier.getDatabaseName().equals(carbonTable.getDatabaseName())) {
+        if 
(identifier.getTableName().equalsIgnoreCase(carbonTable.getTableName()) &&
+            
identifier.getDatabaseName().equalsIgnoreCase(carbonTable.getDatabaseName())) {
           dataMapSchemas.add(dataMapSchema);
           break;
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 6de26ad..0538e7f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -643,8 +643,17 @@ public class CarbonTablePath {
   }
 
   public static String getCarbonIndexFileName(String actualBlockName) {
+    return getUniqueTaskName(actualBlockName) + INDEX_FILE_EXT;
+  }
+
+  /**
+   * Unique task name
+   * @param actualBlockName
+   * @return
+   */
+  public static String getUniqueTaskName(String actualBlockName) {
     return DataFileUtil.getTaskNo(actualBlockName) + "-" + 
DataFileUtil.getBucketNo(actualBlockName)
-        + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName) + 
INDEX_FILE_EXT;
+        + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 6e1a2eb..81cccf2 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -36,7 +36,6 @@ import 
org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -68,10 +67,12 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
   private List<BloomDMModel> bloomIndexList;
   private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
   public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
+  private String shardName;
 
   @Override
-  public void init(DataMapModel dataMapModel) throws MemoryException, 
IOException {
+  public void init(DataMapModel dataMapModel) throws IOException {
     Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+    this.shardName = indexPath.getName();
     FileSystem fs = FileFactory.getFileSystem(indexPath);
     if (!fs.exists(indexPath)) {
       throw new IOException(
@@ -93,8 +94,9 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
     indexCol2BloomDMList = ArrayListMultimap.create();
     for (int i = 0; i < indexFileStatus.length; i++) {
       indexFilePath[i] = indexFileStatus[i].getPath().toString();
-      String indexCol = StringUtils.substringBetween(indexFilePath[i], 
".carbondata.",
-          BLOOM_INDEX_SUFFIX);
+      String indexfilename = indexFileStatus[i].getPath().getName();
+      String indexCol =
+          indexfilename.substring(0, indexfilename.length() - 
BLOOM_INDEX_SUFFIX.length());
       indexedColumn.add(indexCol);
       bloomIndexList.addAll(readBloomIndex(indexFilePath[i]));
       indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i]));
@@ -149,15 +151,15 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
               convertValueToBytes(bloomQueryModel.dataType, 
bloomQueryModel.filterValue));
           if (scanRequired) {
             LOGGER.info(String.format(
-                "BloomCoarseGrainDataMap: Need to scan block#%s -> 
blocklet#%s",
-                bloomDMModel.getBlockId(), 
String.valueOf(bloomDMModel.getBlockletNo())));
-            Blocklet blocklet = new Blocklet(bloomDMModel.getBlockId(),
-                String.valueOf(bloomDMModel.getBlockletNo()));
+                "BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
+                String.valueOf(bloomDMModel.getBlockletNo())));
+            Blocklet blocklet =
+                new Blocklet(shardName, 
String.valueOf(bloomDMModel.getBlockletNo()));
             hitBlocklets.add(blocklet);
           } else {
             LOGGER.info(String.format(
-                "BloomCoarseGrainDataMap: Skip scan block#%s -> blocklet#%s",
-                bloomDMModel.getBlockId(), 
String.valueOf(bloomDMModel.getBlockletNo())));
+                "BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
+                String.valueOf(bloomDMModel.getBlockletNo())));
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 3430a65..a2f9693 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -168,15 +167,19 @@ public class BloomCoarseGrainDataMapFactory implements 
DataMapFactory<CoarseGrai
   @Override
   public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws 
IOException {
     List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
-    BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
     try {
-      bloomDM.init(new DataMapModel(BloomDataMapWriter.genDataMapStorePath(
+      String dataMapStorePath = BloomDataMapWriter.genDataMapStorePath(
           CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), 
segment.getSegmentNo()),
-          dataMapName)));
-    } catch (MemoryException e) {
+          dataMapName);
+      CarbonFile[] carbonFiles = 
FileFactory.getCarbonFile(dataMapStorePath).listFiles();
+      for (CarbonFile carbonFile : carbonFiles) {
+        BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
+        bloomDM.init(new DataMapModel(carbonFile.getAbsolutePath()));
+        dataMaps.add(bloomDM);
+      }
+    } catch (Exception e) {
       throw new IOException("Error occurs while init Bloom DataMap", e);
     }
-    dataMaps.add(bloomDM);
     return dataMaps;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
index b72f08f..6351199 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
@@ -25,20 +25,14 @@ import com.google.common.hash.BloomFilter;
 @InterfaceAudience.Internal
 public class BloomDMModel implements Serializable {
   private static final long serialVersionUID = 7281578747306832771L;
-  private String blockId;
   private int blockletNo;
   private BloomFilter<byte[]> bloomFilter;
 
-  public BloomDMModel(String blockId, int blockletNo, BloomFilter<byte[]> 
bloomFilter) {
-    this.blockId = blockId;
+  public BloomDMModel(int blockletNo, BloomFilter<byte[]> bloomFilter) {
     this.blockletNo = blockletNo;
     this.bloomFilter = bloomFilter;
   }
 
-  public String getBlockId() {
-    return blockId;
-  }
-
   public int getBlockletNo() {
     return blockletNo;
   }
@@ -49,7 +43,6 @@ public class BloomDMModel implements Serializable {
 
   @Override public String toString() {
     final StringBuilder sb = new StringBuilder("BloomDMModel{");
-    sb.append("blockId='").append(blockId).append('\'');
     sb.append(", blockletNo=").append(blockletNo);
     sb.append(", bloomFilter=").append(bloomFilter);
     sb.append('}');

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index 4065523..76ee084 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -38,8 +39,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnels;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 /**
  * BloomDataMap is constructed in blocklet level. For each indexed column, a 
bloom filter is
@@ -55,7 +54,7 @@ public class BloomDataMapWriter extends DataMapWriter {
   // map column name to ordinal in pages
   private Map<String, Integer> col2Ordianl;
   private Map<String, DataType> col2DataType;
-  private String currentBlockId;
+  private String indexShardName;
   private int currentBlockletId;
   private List<String> currentDMFiles;
   private List<DataOutputStream> currentDataOutStreams;
@@ -80,22 +79,16 @@ public class BloomDataMapWriter extends DataMapWriter {
   }
 
   @Override
-  public void onBlockStart(String blockId, long taskId) throws IOException {
-    this.currentBlockId = blockId;
-    this.currentBlockletId = 0;
-    currentDMFiles.clear();
-    currentDataOutStreams.clear();
-    currentObjectOutStreams.clear();
-    initDataMapFile();
+  public void onBlockStart(String blockId, String indexShardName) throws 
IOException {
+    if (this.indexShardName == null) {
+      this.indexShardName = indexShardName;
+      initDataMapFile();
+    }
   }
 
   @Override
   public void onBlockEnd(String blockId) throws IOException {
-    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
-      CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId),
-          this.currentObjectOutStreams.get(indexColId));
-      commitFile(this.currentDMFiles.get(indexColId));
-    }
+
   }
 
   @Override
@@ -159,9 +152,11 @@ public class BloomDataMapWriter extends DataMapWriter {
 
   private void initDataMapFile() throws IOException {
     String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, 
this.dataMapName);
+    dataMapDir = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR + 
this.indexShardName;
+    FileFactory.mkdirs(dataMapDir, FileFactory.getFileType(dataMapDir));
     for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
-      String dmFile = dataMapDir + File.separator + this.currentBlockId
-          + '.' + indexedColumns.get(indexColId) + 
BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
+      String dmFile = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR +
+          indexedColumns.get(indexColId) + 
BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
       DataOutputStream dataOutStream = null;
       ObjectOutputStream objectOutStream = null;
       try {
@@ -182,7 +177,7 @@ public class BloomDataMapWriter extends DataMapWriter {
 
   private void writeBloomDataMapFile() throws IOException {
     for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
-      BloomDMModel model = new BloomDMModel(this.currentBlockId, 
this.currentBlockletId,
+      BloomDMModel model = new BloomDMModel(this.currentBlockletId,
           indexBloomFilters.get(indexColId));
       // only in higher version of guava-bloom-filter, it provides 
readFrom/writeTo interface.
       // In lower version, we use default java serializer to write bloomfilter.
@@ -194,7 +189,11 @@ public class BloomDataMapWriter extends DataMapWriter {
 
   @Override
   public void finish() throws IOException {
-
+    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
+      CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId),
+          this.currentObjectOutStreams.get(indexColId));
+      commitFile(this.currentDMFiles.get(indexColId));
+    }
   }
 
   @Override
@@ -213,11 +212,7 @@ public class BloomDataMapWriter extends DataMapWriter {
   public static String genDataMapStorePath(String dataPath, String dataMapName)
       throws IOException {
     String dmDir = dataPath + File.separator + dataMapName;
-    Path dmPath = FileFactory.getPath(dmDir);
-    FileSystem fs = FileFactory.getFileSystem(dmPath);
-    if (!fs.exists(dmPath)) {
-      fs.mkdirs(dmPath);
-    }
+    FileFactory.mkdirs(dmDir, FileFactory.getFileType(dmDir));
     return dmDir;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
 
b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index e7bab95..21283fe 100644
--- 
a/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ 
b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -75,8 +75,8 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
     sql(s"select * from $bloomDMSampleTable limit 5").show(false)
 
     checkExistence(sql(s"show datamap on table $bloomDMSampleTable"), true, 
dataMapName)
-    checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"),
-      Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, 
"(NA)"))
+//    checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"),
+//      Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, 
"(NA)"))
     checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 1"),
       sql(s"select * from $normalTable where id = 1"))
     checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 999"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/examples/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml
index 30e1522..a22e902 100644
--- a/datamap/examples/pom.xml
+++ b/datamap/examples/pom.xml
@@ -41,6 +41,11 @@
       <artifactId>carbondata-spark2</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index e68b481..21a0b8e 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -61,6 +61,7 @@ public class MinMaxDataWriter extends DataMapWriter {
   private String dataMapName;
   private int columnCnt;
   private DataType[] dataTypeArray;
+  private String indexShardName;
 
   /**
    * Since the sequence of indexed columns is defined the same as order in 
user-created, so
@@ -90,12 +91,14 @@ public class MinMaxDataWriter extends DataMapWriter {
     }
   }
 
-  @Override public void onBlockStart(String blockId, long taskId) {
-    blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
+  @Override public void onBlockStart(String blockId, String indexShardName) {
+    if (blockMinMaxMap == null) {
+      blockMinMaxMap = new HashMap<>();
+      this.indexShardName = indexShardName;
+    }
   }
 
   @Override public void onBlockEnd(String blockId) {
-    updateMinMaxIndex(blockId);
   }
 
   @Override public void onBlockletStart(int blockletId) {
@@ -300,7 +303,7 @@ public class MinMaxDataWriter extends DataMapWriter {
   }
 
   @Override public void finish() throws IOException {
-
+    updateMinMaxIndex(indexShardName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
 
b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
index b568536..8436e07 100644
--- 
a/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
+++ 
b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
@@ -69,8 +69,6 @@ class MinMaxDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
        """.stripMargin)
 
     sql(s"show datamap on table $minMaxDMSampleTable").show(false)
-    checkAnswer(sql(s"show datamap on table $minMaxDMSampleTable"),
-      Row(dataMapName, classOf[MinMaxIndexDataMapFactory].getName, "(NA)"))
     // not that the table will use default dimension as sort_columns, so for 
the following cases,
     // the pruning result will differ.
     // 1 blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 672880f..a7b8831 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -210,10 +210,14 @@ abstract class LuceneDataMapFactoryBase<T extends 
DataMap> implements DataMapFac
    */
   @Override
   public List<DataMapDistributable> toDistributable(Segment segment) {
-    List<DataMapDistributable> lstDataMapDistribute = new 
ArrayList<DataMapDistributable>();
+    List<DataMapDistributable> lstDataMapDistribute = new ArrayList<>();
     CarbonFile[] indexDirs = LuceneDataMapWriter
         .getAllIndexDirs(tableIdentifier.getTablePath(), 
segment.getSegmentNo(), dataMapName);
     for (CarbonFile indexDir : indexDirs) {
+      // Filter out the tasks which are filtered through CG datamap.
+      if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
+        continue;
+      }
       DataMapDistributable luceneDataMapDistributable = new 
LuceneDataMapDistributable(
           CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), 
segment.getSegmentNo()),
           indexDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 6cc89d6..2025f73 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -29,7 +29,6 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -52,7 +51,6 @@ import org.apache.lucene.document.IntPoint;
 import org.apache.lucene.document.IntRangeField;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -78,16 +76,12 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   private Analyzer analyzer = null;
 
-  private String blockId = null;
-
   private String dataMapName = null;
 
   private boolean isFineGrain = true;
 
   private List<String> indexedCarbonColumns = null;
 
-  private static final String BLOCKID_NAME = "blockId";
-
   private static final String BLOCKLETID_NAME = "blockletId";
 
   private static final String PAGEID_NAME = "pageId";
@@ -102,24 +96,25 @@ public class LuceneDataMapWriter extends DataMapWriter {
     this.indexedCarbonColumns = indexedCarbonColumns;
   }
 
-  private String getIndexPath(long taskId) {
+  private String getIndexPath(String taskName) {
     if (isFineGrain) {
-      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName, taskId);
+      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName,
+          taskName);
     } else {
       // TODO: where write data in coarse grain data map
-      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName, taskId);
+      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName,
+          taskName);
     }
   }
 
   /**
    * Start of new block notification.
    */
-  public void onBlockStart(String blockId, long taskId) throws IOException {
+  public void onBlockStart(String blockId, String indexShardName) throws 
IOException {
     // save this block id for lucene index , used in onPageAdd function
-    this.blockId = blockId;
 
     // get index path, put index data into segment's path
-    String strIndexPath = getIndexPath(taskId);
+    String strIndexPath = getIndexPath(indexShardName);
     Path indexPath = FileFactory.getPath(strIndexPath);
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
@@ -154,13 +149,6 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * End of block notification
    */
   public void onBlockEnd(String blockId) throws IOException {
-    // clean this block id
-    this.blockId = null;
-
-    // finished a file , close this index writer
-    if (indexWriter != null) {
-      indexWriter.close();
-    }
 
   }
 
@@ -201,10 +189,6 @@ public class LuceneDataMapWriter extends DataMapWriter {
     for (int rowId = 0; rowId < pageSize; rowId++) {
       // create a new document
       Document doc = new Document();
-
-      // add block id, save this id
-      doc.add(new StringField(BLOCKID_NAME, blockId, Field.Store.YES));
-
       // add blocklet Id
       doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId }));
       doc.add(new StoredField(BLOCKLETID_NAME, blockletId));
@@ -339,7 +323,10 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * class.
    */
   public void finish() throws IOException {
-
+    // finished a file , close this index writer
+    if (indexWriter != null) {
+      indexWriter.close();
+    }
   }
 
   /**
@@ -350,16 +337,16 @@ public class LuceneDataMapWriter extends DataMapWriter {
   }
 
   /**
-   * Return store path for datamap based on the taskId, if three tasks get 
launched during loading,
+   * Return store path for datamap based on the taskName,if three tasks get 
launched during loading,
    * then three folders will be created based on the three task Ids and lucene 
index file will be
    * written into those folders
+   *
    * @return store path based on taskID
    */
   private static String genDataMapStorePathOnTaskId(String tablePath, String 
segmentId,
-      String dataMapName, long taskId) {
+      String dataMapName, String taskName) {
     return CarbonTablePath.getSegmentPath(tablePath, segmentId) + 
File.separator + dataMapName
-        + File.separator + dataMapName + CarbonCommonConstants.UNDERSCORE + 
taskId
-        + CarbonCommonConstants.UNDERSCORE + System.currentTimeMillis();
+        + File.separator + taskName;
   }
 
   /**
@@ -375,14 +362,6 @@ public class LuceneDataMapWriter extends DataMapWriter {
         CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator 
+ dataMapName;
     FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
     final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
-    return dirPath.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        if (file.isDirectory() && file.getName().startsWith(dataMapName)) {
-          return true;
-        } else {
-          return false;
-        }
-      }
-    });
+    return dirPath.listFiles();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index 3caefd2..cf5e2da 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -18,7 +18,12 @@
 package org.apache.carbondata.datamap.lucene;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
@@ -29,7 +34,6 @@ import 
org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -45,20 +49,21 @@ import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.*;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.store.hdfs.HdfsDirectory;
 
 @InterfaceAudience.Internal
 public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
-  private static final int BLOCKID_ID = 0;
+  private static final int BLOCKLETID_ID = 0;
 
-  private static final int BLOCKLETID_ID = 1;
+  private static final int PAGEID_ID = 1;
 
-  private static final int PAGEID_ID = 2;
-
-  private static final int ROWID_ID = 3;
+  private static final int ROWID_ID = 2;
 
   /**
    * log information
@@ -81,6 +86,8 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
    */
   private Analyzer analyzer;
 
+  private String taskName;
+
   LuceneFineGrainDataMap(Analyzer analyzer) {
     this.analyzer = analyzer;
   }
@@ -88,12 +95,14 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
   /**
    * It is called to load the data map to memory or to initialize it.
    */
-  public void init(DataMapModel dataMapModel) throws MemoryException, 
IOException {
+  public void init(DataMapModel dataMapModel) throws IOException {
     // get this path from file path
     Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
 
     LOGGER.info("Lucene index read path " + indexPath.toString());
 
+    this.taskName = indexPath.getName();
+
     // get file system , use hdfs file system , realized in solr project
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
@@ -192,7 +201,7 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
 
     // temporary data, delete duplicated data
     // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
-    Map<String, Map<String, Map<Integer, Set<Integer>>>> mapBlocks = new 
HashMap<>();
+    Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>();
 
     for (ScoreDoc scoreDoc : result.scoreDocs) {
       // get a document
@@ -201,20 +210,12 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
       // get all fields
       List<IndexableField> fieldsInDoc = doc.getFields();
 
-      // get this block id Map<BlockId, Map<BlockletId, Map<PageId, 
Set<RowId>>>>
-      String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue();
-      Map<String, Map<Integer, Set<Integer>>> mapBlocklets = 
mapBlocks.get(blockId);
-      if (mapBlocklets == null) {
-        mapBlocklets = new HashMap<>();
-        mapBlocks.put(blockId, mapBlocklets);
-      }
-
       // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>>
       String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue();
-      Map<Integer, Set<Integer>> mapPageIds = mapBlocklets.get(blockletId);
+      Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId);
       if (mapPageIds == null) {
         mapPageIds = new HashMap<>();
-        mapBlocklets.put(blockletId, mapPageIds);
+        mapBlocks.put(blockletId, mapPageIds);
       }
 
       // get the page id Map<PageId, Set<RowId>>
@@ -235,37 +236,32 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
 
     // transform all blocks into result type blocklets
     // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
-    for (Map.Entry<String, Map<String, Map<Integer, Set<Integer>>>> mapBlock :
+    for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet :
         mapBlocks.entrySet()) {
-      String blockId = mapBlock.getKey();
-      Map<String, Map<Integer, Set<Integer>>> mapBlocklets = 
mapBlock.getValue();
-      // for blocklets in this block Map<BlockletId, Map<PageId, Set<RowId>>>
-      for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet : 
mapBlocklets.entrySet()) {
-        String blockletId = mapBlocklet.getKey();
-        Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue();
-        List<FineGrainBlocklet.Page> pages = new 
ArrayList<FineGrainBlocklet.Page>();
-
-        // for pages in this blocklet Map<PageId, Set<RowId>>>
-        for (Map.Entry<Integer, Set<Integer>> mapPageId : 
mapPageIds.entrySet()) {
-          // construct array rowid
-          int[] rowIds = new int[mapPageId.getValue().size()];
-          int i = 0;
-          // for rowids in this page Set<RowId>
-          for (Integer rowid : mapPageId.getValue()) {
-            rowIds[i++] = rowid;
-          }
-          // construct one page
-          FineGrainBlocklet.Page page = new FineGrainBlocklet.Page();
-          page.setPageId(mapPageId.getKey());
-          page.setRowId(rowIds);
-
-          // add this page into list pages
-          pages.add(page);
+      String blockletId = mapBlocklet.getKey();
+      Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue();
+      List<FineGrainBlocklet.Page> pages = new 
ArrayList<FineGrainBlocklet.Page>();
+
+      // for pages in this blocklet Map<PageId, Set<RowId>>>
+      for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) 
{
+        // construct array rowid
+        int[] rowIds = new int[mapPageId.getValue().size()];
+        int i = 0;
+        // for rowids in this page Set<RowId>
+        for (Integer rowid : mapPageId.getValue()) {
+          rowIds[i++] = rowid;
         }
+        // construct one page
+        FineGrainBlocklet.Page page = new FineGrainBlocklet.Page();
+        page.setPageId(mapPageId.getKey());
+        page.setRowId(rowIds);
 
-        // add a FineGrainBlocklet
-        blocklets.add(new FineGrainBlocklet(blockId, blockletId, pages));
+        // add this page into list pages
+        pages.add(page);
       }
+
+      // add a FineGrainBlocklet
+      blocklets.add(new FineGrainBlocklet(taskName, blockletId, pages));
     }
 
     return blocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 2ff4961..38b46ea 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -466,15 +466,14 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     for (Segment segment : segments) {
       boolean found = false;
       // Clear the old pruned index files if any present
-      segment.getFilteredIndexFiles().clear();
+      segment.getFilteredIndexShardNames().clear();
       // Check the segment exist in any of the pruned blocklets.
       for (ExtendedBlocklet blocklet : prunedBlocklets) {
         if (blocklet.getSegmentId().equals(segment.getSegmentNo())) {
           found = true;
           // Set the pruned index file to the segment for further pruning.
-          String carbonIndexFileName =
-              CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
-          segment.setFilteredIndexFile(carbonIndexFileName);
+          String uniqueTaskName = 
CarbonTablePath.getUniqueTaskName(blocklet.getTaskName());
+          segment.setFilteredIndexShardName(uniqueTaskName);
         }
       }
       // Add to remove segments list if not present in pruned blocklets.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index ae21416..e428e24 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -71,12 +71,10 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
    * Get the datamap for segmentid
    */
   override def getDataMaps(segment: Segment): 
java.util.List[CoarseGrainDataMap] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo))
+    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo)
+    val file = FileFactory.getCarbonFile(path+ "/" 
+dataMapSchema.getDataMapName)
 
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
-    })
+    val files = file.listFiles()
     files.map {f =>
       val dataMap: CoarseGrainDataMap = new CGDataMap()
       dataMap.init(new DataMapModel(f.getCanonicalPath))
@@ -109,12 +107,10 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
    * @return
    */
   override def toDistributable(segment: Segment): 
java.util.List[DataMapDistributable] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo))
+    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo)
+    val file = FileFactory.getCarbonFile(path+ "/" 
+dataMapSchema.getDataMapName)
 
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
-    })
+    val files = file.listFiles()
     files.map { f =>
       val d:DataMapDistributable = new 
BlockletDataMapDistributable(f.getCanonicalPath)
       d
@@ -153,23 +149,26 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
 
 class CGDataMap extends CoarseGrainDataMap {
 
-  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+  var maxMin: ArrayBuffer[(Int, (Array[Byte], Array[Byte]))] = _
   var FileReader: FileReader = _
   var filePath: String = _
   val compressor = new SnappyCompressor
+  var taskName: String = _
 
   /**
    * It is called to load the data map to memory or to initialize it.
    */
   override def init(dataMapModel: DataMapModel): Unit = {
     this.filePath = dataMapModel.getFilePath
-    val size = FileFactory.getCarbonFile(filePath).getSize
+    val carbonFile = FileFactory.getCarbonFile(filePath)
+    taskName = carbonFile.getName
+    val size = carbonFile.getSize
     FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
     val footerLen = FileReader.readInt(filePath, size-4)
     val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
     val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
     val obj = new ObjectInputStream(in)
-    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, 
(Array[Byte], Array[Byte]))]]
+    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(Int, (Array[Byte], 
Array[Byte]))]]
   }
 
   /**
@@ -191,15 +190,15 @@ class CGDataMap extends CoarseGrainDataMap {
     }
     val meta = findMeta(value(0).getBytes)
     meta.map { f=>
-      new Blocklet(f._1, f._2 + "")
+      new Blocklet(taskName, f._1 + "")
     }.asJava
   }
 
 
   private def findMeta(value: Array[Byte]) = {
     val tuples = maxMin.filter { f =>
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._1) <= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._2) >= 0
     }
     tuples
   }
@@ -235,13 +234,11 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
     dataMapSchema: DataMapSchema)
   extends DataMapWriter(identifier, segment, dataWritePath) {
 
-  var currentBlockId: String = null
-  val cgwritepath = dataWritePath + "/" +
-                    dataMapSchema.getDataMapName + System.nanoTime() + 
".datamap"
-  lazy val stream: DataOutputStream = FileFactory
-    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+  var taskName: String = _
+
+  val cgwritepath = dataWritePath + "/" + dataMapSchema.getDataMapName +"/"
   val blockletList = new ArrayBuffer[Array[Byte]]()
-  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+  val maxMin = new ArrayBuffer[(Int, (Array[Byte], Array[Byte]))]()
   val compressor = new SnappyCompressor
 
   /**
@@ -249,8 +246,8 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
    *
    * @param blockId file name of the carbondata file
    */
-  override def onBlockStart(blockId: String, taskId: Long): Unit = {
-    currentBlockId = blockId
+  override def onBlockStart(blockId: String, taskName: String): Unit = {
+    this.taskName = taskName
   }
 
   /**
@@ -278,7 +275,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
     val sorted = blockletList
       .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 
0)
     maxMin +=
-    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+    ((blockletId, (sorted.last, sorted.head)))
     blockletList.clear()
   }
 
@@ -315,6 +312,9 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
    * class.
    */
   override def finish(): Unit = {
+    FileFactory.mkdirs(cgwritepath, FileFactory.getFileType(cgwritepath))
+    var stream: DataOutputStream = FileFactory
+      .getDataOutputStream(cgwritepath + "/"+taskName, 
FileFactory.getFileType(cgwritepath))
     val out = new ByteOutputStream()
     val outStream = new ObjectOutputStream(out)
     outStream.writeObject(maxMin)
@@ -323,7 +323,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
     stream.write(bytes)
     stream.writeInt(bytes.length)
     stream.close()
-    commitFile(cgwritepath)
+    commitFile(cgwritepath + "/"+taskName)
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 329c888..54a8dc2 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -210,7 +210,7 @@ object DataMapWriterSuite {
      *
      * @param blockId file name of the carbondata file
      */
-    override def onBlockStart(blockId: String, taskId: Long) = {
+    override def onBlockStart(blockId: String, taskId: String) = {
       callbackSeq :+= s"block start $blockId"
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d33191c..f7886a2 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -73,12 +73,10 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
    * Get the datamap for segmentid
    */
   override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] 
= {
-    val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo))
+    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo)
+    val file = FileFactory.getCarbonFile(path+ "/" 
+dataMapSchema.getDataMapName)
 
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
-    })
+    val files = file.listFiles()
     files.map { f =>
       val dataMap: FineGrainDataMap = new FGDataMap()
       dataMap.init(new DataMapModel(f.getCanonicalPath))
@@ -102,12 +100,10 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
    * @return
    */
   override def toDistributable(segment: Segment): 
java.util.List[DataMapDistributable] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo))
+    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
segment.getSegmentNo)
+    val file = FileFactory.getCarbonFile(path+ "/" 
+dataMapSchema.getDataMapName)
 
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".datamap")
-    })
+    val files = file.listFiles()
     files.map { f =>
       val d: DataMapDistributable = new 
BlockletDataMapDistributable(f.getCanonicalPath)
       d
@@ -152,24 +148,27 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
 
 class FGDataMap extends FineGrainDataMap {
 
-  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, 
Int)] = _
+  var maxMin: ArrayBuffer[(Int, (Array[Byte], Array[Byte]), Long, Int)] = _
   var FileReader: FileReader = _
   var filePath: String = _
   val compressor = new SnappyCompressor
+  var taskName:String = _
 
   /**
    * It is called to load the data map to memory or to initialize it.
    */
   override def init(dataMapModel: DataMapModel): Unit = {
     this.filePath = dataMapModel.getFilePath
-    val size = FileFactory.getCarbonFile(filePath).getSize
+    val carbonFile = FileFactory.getCarbonFile(filePath)
+    taskName = carbonFile.getName
+    val size = carbonFile.getSize
     FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
     val footerLen = FileReader.readInt(filePath, size - 4)
     val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, 
footerLen)
     val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
     val obj = new ObjectInputStream(in)
     maxMin = obj.readObject()
-      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), 
Long, Int)]]
+      .asInstanceOf[ArrayBuffer[(Int, (Array[Byte], Array[Byte]), Long, Int)]]
   }
 
   /**
@@ -195,9 +194,9 @@ class FGDataMap extends FineGrainDataMap {
     }.filter(_.isDefined).map(_.get).asJava
   }
 
-  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), 
Long, Int),
+  private def readAndFindData(meta: (Int, (Array[Byte], Array[Byte]), Long, 
Int),
       value: Array[Byte]): Option[FineGrainBlocklet] = {
-    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+    val bytes = FileReader.readByteArray(filePath, meta._3, meta._4)
     val outputStream = new 
ByteArrayInputStream(compressor.unCompressByte(bytes))
     val obj = new ObjectInputStream(outputStream)
     val blockletsData = obj.readObject()
@@ -220,7 +219,7 @@ class FGDataMap extends FineGrainDataMap {
         pg.setRowId(f._2(p._2).toArray)
         pg
       }
-      Some(new FineGrainBlocklet(meta._1, meta._2.toString, 
pages.toList.asJava))
+      Some(new FineGrainBlocklet(taskName, meta._1.toString, 
pages.toList.asJava))
     } else {
       None
     }
@@ -228,8 +227,8 @@ class FGDataMap extends FineGrainDataMap {
 
   private def findMeta(value: Array[Byte]) = {
     val tuples = maxMin.filter { f =>
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._1) >= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._2) <= 0
     }
     tuples
   }
@@ -263,13 +262,11 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
     segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
   extends DataMapWriter(identifier, segment, dataWriterPath) {
 
-  var currentBlockId: String = null
-  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + 
System.nanoTime() +
-                    ".datamap"
-  val stream: DataOutputStream = FileFactory
-    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+  var taskName: String = _
+  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName +"/"
+  var stream: DataOutputStream = _
   val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
-  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, 
Int)]()
+  val maxMin = new ArrayBuffer[(Int, (Array[Byte], Array[Byte]), Long, Int)]()
   var position: Long = 0
   val compressor = new SnappyCompressor
 
@@ -278,8 +275,13 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
    *
    * @param blockId file name of the carbondata file
    */
-  override def onBlockStart(blockId: String, taskId: Long): Unit = {
-    currentBlockId = blockId
+  override def onBlockStart(blockId: String, taskId: String): Unit = {
+    this.taskName = taskId
+    if (stream == null) {
+      FileFactory.mkdirs(fgwritepath, FileFactory.getFileType(fgwritepath))
+      stream = FileFactory
+        .getDataOutputStream(fgwritepath + "/"+taskName, 
FileFactory.getFileType(fgwritepath))
+    }
   }
 
   /**
@@ -336,7 +338,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
     val bytes = compressor.compressByte(out.getBytes)
     stream.write(bytes)
     maxMin +=
-    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, 
blockletListUpdated.last
+    ((blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
       ._1), position, bytes.length))
     position += bytes.length
     blockletList.clear()
@@ -394,6 +396,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
    * class.
    */
   override def finish(): Unit = {
+    FileFactory.mkdirs(fgwritepath, FileFactory.getFileType(fgwritepath))
     val out = new ByteOutputStream()
     val outStream = new ObjectOutputStream(out)
     outStream.writeObject(maxMin)
@@ -402,7 +405,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
     stream.write(bytes)
     stream.writeInt(bytes.length)
     stream.close()
-    commitFile(fgwritepath)
+    commitFile(fgwritepath + "/"+taskName)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index e19c4b7..d48ac6b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -203,7 +203,7 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
       override def onBlockletStart(blockletId: Int): Unit = { }
 
-      override def onBlockStart(blockId: String, taskId: Long): Unit = {
+      override def onBlockStart(blockId: String, taskId: String): Unit = {
         // trigger the second SQL to execute
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 55e9bac..2b7bd46 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -307,7 +307,7 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
       override def onBlockletStart(blockletId: Int): Unit = { }
 
-      override def onBlockStart(blockId: String, taskId: Long): Unit = {
+      override def onBlockStart(blockId: String, taskId: String): Unit = {
         // trigger the second SQL to execute
         Global.overwriteRunning = true
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 6e7f2d6..a9e30d3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -91,10 +91,10 @@ public class DataMapWriterListener {
     LOG.info("DataMapWriter " + writer + " added");
   }
 
-  public void onBlockStart(String blockId, String blockPath, long taskId) 
throws IOException {
+  public void onBlockStart(String blockId, String blockPath, String taskName) 
throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId, taskId);
+        writer.onBlockStart(blockId, taskName);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 94ade87..e5c90e0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -250,8 +250,8 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
   private void notifyDataMapBlockStart() {
     if (listener != null) {
       try {
-        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath,
-            model.getCarbonDataFileAttributes().getTaskId());
+        String taskName = 
CarbonTablePath.getUniqueTaskName(carbonDataFileName);
+        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath, 
taskName);
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing datamap", 
e);
       }
@@ -266,7 +266,6 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
         throw new CarbonDataWriterException("Problem while writing datamap", 
e);
       }
     }
-    blockletId = 0;
   }
 
   /**

Reply via email to