http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 67effda..cccfb3f 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.spark.testsuite.datamap
 
+import java.io.File
 import java.util
 
 import scala.collection.JavaConverters._
@@ -26,14 +27,12 @@ import org.scalatest.BeforeAndAfterAll
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, 
CoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.status.{DataMapStatus, 
DataMapStatusManager}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
Segment}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.features.TableOperation
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
@@ -44,10 +43,11 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
   val testData = s"$resourcesPath/sample.csv"
 
   override def beforeAll: Unit = {
+    new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
     drop
   }
 
-  test("datamap status disable for new datamap") {
+  test("datamap status enable for new datamap") {
     sql("DROP TABLE IF EXISTS datamapstatustest")
     sql(
       """
@@ -64,11 +64,33 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
 
     assert(details.length == 1)
 
+    assert(details.exists(p => p.getDataMapName.equals("statusdatamap") && 
p.getStatus == DataMapStatus.ENABLED))
+    sql("DROP TABLE IF EXISTS datamapstatustest")
+  }
+
+  test("datamap status disable for new datamap with deferred rebuild") {
+    sql("DROP TABLE IF EXISTS datamapstatustest")
+    sql(
+      """
+        | CREATE TABLE datamapstatustest(id int, name string, city string, age 
int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""create datamap statusdatamap on table datamapstatustest
+         |using '${classOf[TestDataMapFactory].getName}'
+         |with deferred rebuild
+         |dmproperties('index_columns'='name')
+         | """.stripMargin)
+
+    val details = DataMapStatusManager.readDataMapStatusDetails()
+
+    assert(details.length == 1)
+
     assert(details.exists(p => p.getDataMapName.equals("statusdatamap") && 
p.getStatus == DataMapStatus.DISABLED))
     sql("DROP TABLE IF EXISTS datamapstatustest")
   }
 
-  test("datamap status disable after new load") {
+  test("datamap status disable after new load  with deferred rebuild") {
     sql("DROP TABLE IF EXISTS datamapstatustest1")
     sql(
       """
@@ -78,6 +100,7 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
     sql(
       s"""create datamap statusdatamap1 on table datamapstatustest1
          |using '${classOf[TestDataMapFactory].getName}'
+         |with deferred rebuild
          |dmproperties('index_columns'='name')
          | """.stripMargin)
 
@@ -94,8 +117,7 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamapstatustest1")
   }
 
-  // enable it in PR2255
-  ignore("datamap status with refresh datamap") {
+  test("datamap status with REBUILD DATAMAP") {
     sql("DROP TABLE IF EXISTS datamapstatustest2")
     sql(
       """
@@ -105,6 +127,7 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
     sql(
       s"""create datamap statusdatamap2 on table datamapstatustest2
          |using '${classOf[TestDataMapFactory].getName}'
+         |with deferred rebuild
          |dmproperties('index_columns'='name')
          | """.stripMargin)
 
@@ -119,7 +142,7 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
     assert(details.length == 1)
     assert(details.exists(p => p.getDataMapName.equals("statusdatamap2") && 
p.getStatus == DataMapStatus.DISABLED))
 
-    sql(s"refresh datamap statusdatamap2 on table datamapstatustest2")
+    sql(s"REBUILD DATAMAP statusdatamap2 on table datamapstatustest2")
 
     details = DataMapStatusManager.readDataMapStatusDetails()
     assert(details.length == 1)
@@ -128,8 +151,7 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamapstatustest2")
   }
 
-  // enable it in PR2255
-  ignore("datamap create without on table test") {
+  test("datamap create without on table test") {
     sql("DROP TABLE IF EXISTS datamapstatustest3")
     sql(
       """
@@ -144,10 +166,20 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
            | """.stripMargin)
 
     }
+    sql("DROP TABLE IF EXISTS datamapstatustest3")
+  }
 
+  test("rebuild datamap status") {
+    sql("DROP TABLE IF EXISTS datamapstatustest3")
+    sql(
+      """
+        | CREATE TABLE datamapstatustest3(id int, name string, city string, 
age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
     sql(
       s"""create datamap statusdatamap3 on table datamapstatustest3
          |using '${classOf[TestDataMapFactory].getName}'
+         |with deferred rebuild
          |dmproperties('index_columns'='name')
          | """.stripMargin)
 
@@ -162,7 +194,7 @@ class TestDataMapStatus extends QueryTest with 
BeforeAndAfterAll {
     assert(details.length == 1)
     assert(details.exists(p => p.getDataMapName.equals("statusdatamap3") && 
p.getStatus == DataMapStatus.DISABLED))
 
-    sql(s"refresh datamap statusdatamap3")
+    sql(s"REBUILD DATAMAP statusdatamap3")
 
     details = DataMapStatusManager.readDataMapStatusDetails()
     assert(details.length == 1)
@@ -245,8 +277,19 @@ class TestDataMapFactory(
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
-    ???
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
+    return new DataMapBuilder {
+      override def initialize(): Unit = { }
+
+      override def addRow(blockletId: Int,
+          pageId: Int,
+          rowId: Int,
+          values: Array[AnyRef]): Unit = { }
+
+      override def finish(): Unit = { }
+
+      override def close(): Unit = { }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index c193fcf..2f3488e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -28,8 +28,6 @@ import org.apache.carbondata.spark.util.DataGenerator
  * Test Suite for search mode
  */
 
-// TODO: Need to Fix
-@Ignore
 class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
 
   val numRows = 500 * 1000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 9c5297d..3cabc7b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -24,11 +24,12 @@ import java.util.concurrent.{Callable, ExecutorService, 
Executors, Future}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, 
CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
Segment}
 import org.apache.carbondata.core.datastore.page.ColumnPage
@@ -43,18 +44,30 @@ import org.apache.carbondata.events.Event
 // This testsuite test insert and insert overwrite with other commands 
concurrently
 class TestInsertAndOtherCommandConcurrent extends QueryTest with 
BeforeAndAfterAll with BeforeAndAfterEach {
   private val executorService: ExecutorService = 
Executors.newFixedThreadPool(10)
-  var df: DataFrame = _
+  var testData: DataFrame = _
 
   override def beforeAll {
     dropTable()
     buildTestData()
 
+    createTable("orders", testData.schema)
+    createTable("orders_overwrite", testData.schema)
     sql(
       s"""
          | create datamap test on table orders
          | using '${classOf[WaitingDataMapFactory].getName}'
          | dmproperties('index_columns'='o_name')
        """.stripMargin)
+
+    testData.write
+      .format("carbondata")
+      .option("tableName", "temp_table")
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    sql(s"insert into orders select * from temp_table")
+    sql(s"insert into orders_overwrite select * from temp_table")
   }
 
   private def buildTestData(): Unit = {
@@ -66,23 +79,17 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest 
with BeforeAndAfterA
     import sqlContext.implicits._
 
     val sdf = new SimpleDateFormat("yyyy-MM-dd")
-    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
+    testData = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
       .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 
10 + 10)).getTime),
         "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + 
value), 14999 + value,
         "ordersTable" + value))
       .toDF("o_id", "o_date", "o_country", "o_name",
         "o_phonetype", "o_serialname", "o_salary", "o_comment")
-    createTable("orders")
-    createTable("orders_overwrite")
   }
 
-  private def createTable(tableName: String): Unit = {
-    df.write
-      .format("carbondata")
-      .option("tableName", tableName)
-      .option("tempCSV", "false")
-      .mode(SaveMode.Overwrite)
-      .save()
+  private def createTable(tableName: String, schema: StructType): Unit = {
+    val schemaString = schema.fields.map(x => x.name + " " + 
x.dataType.typeName).mkString(", ")
+    sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata")
   }
 
   override def afterAll {
@@ -91,7 +98,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest 
with BeforeAndAfterA
   }
 
   override def beforeEach(): Unit = {
-    Global.overwriteRunning = false
+    Global.loading = false
   }
 
   private def dropTable() = {
@@ -101,12 +108,12 @@ class TestInsertAndOtherCommandConcurrent extends 
QueryTest with BeforeAndAfterA
 
   // run the input SQL and block until it is running
   private def runSqlAsync(sql: String): Future[String] = {
-    assert(!Global.overwriteRunning)
+    assert(!Global.loading)
     var count = 0
     val future = executorService.submit(
       new QueryTask(sql)
     )
-    while (!Global.overwriteRunning && count < 1000) {
+    while (!Global.loading && count < 1000) {
       Thread.sleep(10)
       // to avoid dead loop in case WaitingDataMapFactory is not invoked
       count += 1
@@ -202,9 +209,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest 
with BeforeAndAfterA
     sql("drop table if exists t1")
 
     // number of segment is 1 after createTable
-    createTable("t1")
-    // number of segment is 2 after insert
-    sql("insert into table t1 select * from orders_overwrite")
+    createTable("t1", testData.schema)
 
     sql(
       s"""
@@ -212,6 +217,12 @@ class TestInsertAndOtherCommandConcurrent extends 
QueryTest with BeforeAndAfterA
          | using '${classOf[WaitingDataMapFactory].getName}'
          | dmproperties('index_columns'='o_name')
        """.stripMargin)
+
+    sql("insert into table t1 select * from orders_overwrite")
+    Thread.sleep(1100)
+    sql("insert into table t1 select * from orders_overwrite")
+    Thread.sleep(1100)
+
     val future = runSqlAsync("insert into table t1 select * from 
orders_overwrite")
     sql("alter table t1 compact 'MAJOR'")
     assert(future.get.contains("PASS"))
@@ -279,15 +290,13 @@ class TestInsertAndOtherCommandConcurrent extends 
QueryTest with BeforeAndAfterA
 }
 
 object Global {
-  var overwriteRunning = false
+  var loading = false
 }
 
 class WaitingDataMapFactory(
     carbonTable: CarbonTable,
     dataMapSchema: DataMapSchema) extends 
CoarseGrainDataMapFactory(carbonTable, dataMapSchema) {
 
-  private var identifier: AbsoluteTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier
-
   override def fireEvent(event: Event): Unit = ???
 
   override def clear(segmentId: Segment): Unit = {}
@@ -311,14 +320,14 @@ class WaitingDataMapFactory(
 
       override def onBlockStart(blockId: String): Unit = {
         // trigger the second SQL to execute
-        Global.overwriteRunning = true
+        Global.loading = true
 
         // wait for 1 second to let second SQL to finish
         Thread.sleep(1000)
       }
 
       override def finish(): Unit = {
-
+        Global.loading = false
       }
     }
   }
@@ -341,8 +350,8 @@ class WaitingDataMapFactory(
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index fea3482..890f8fc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -181,7 +181,8 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val ON = carbonKeyWord("ON")
   protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES")
   protected val SELECT = carbonKeyWord("SELECT")
-  protected val REFRESH = carbonKeyWord("REFRESH")
+  protected val REBUILD = carbonKeyWord("REBUILD")
+  protected val DEFERRED = carbonKeyWord("DEFERRED")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
deleted file mode 100644
index 043acb1..0000000
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Property that can be specified when creating DataMap
- */
-@InterfaceAudience.Internal
-public class DataMapProperty {
-
-  /**
-   * Used to specify the store location of the datamap
-   */
-  public static final String PARTITIONING = "partitioning";
-  public static final String PATH = "path";
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index a5124a0..0642e01 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -105,7 +105,7 @@ public class IndexDataMapProvider extends DataMapProvider {
 
   @Override
   public void rebuild() {
-    IndexDataMapRefreshRDD.rebuildDataMap(sparkSession, getMainTable(), 
getDataMapSchema());
+    IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), 
getDataMapSchema());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index 746a361..37d49e5 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -17,11 +17,15 @@
 
 package org.apache.carbondata.datamap;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.DataMapCatalog;
 import org.apache.carbondata.core.datamap.DataMapProvider;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
@@ -55,9 +59,11 @@ public class PreAggregateDataMapProvider extends 
DataMapProvider {
   private void validateDmProperty(DataMapSchema dataMapSchema)
       throws MalformedDataMapCommandException {
     if (!dataMapSchema.getProperties().isEmpty()) {
-      if (dataMapSchema.getProperties().size() > 2 || (
-              !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH) 
&&
-                      
!dataMapSchema.getProperties().containsKey(DataMapProperty.PARTITIONING))) {
+      Map<String, String> properties = new 
HashMap<>(dataMapSchema.getProperties());
+      properties.remove(DataMapProperty.DEFERRED_REBUILD);
+      properties.remove(DataMapProperty.PATH);
+      properties.remove(DataMapProperty.PARTITIONING);
+      if (properties.size() > 0) {
         throw new MalformedDataMapCommandException(
                 "Only 'path' and 'partitioning' dmproperties are allowed for 
this datamap");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
new file mode 100644
index 0000000..5902783
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap
+
+import java.io.{File, IOException}
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapRegistry, 
DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.TaskMetricsMap
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, 
CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl}
+import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, 
CarbonSparkPartition}
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+
+/**
+ * Helper object to rebuild the index DataMap
+ */
+object IndexDataMapRebuildRDD {
+
+  /**
+   * Rebuild the datamap for all existing data in the table
+   */
+  def rebuildDataMap(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      schema: DataMapSchema
+  ): Unit = {
+    val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
+    val validAndInvalidSegments = 
segmentStatusManager.getValidAndInvalidSegments()
+    val validSegments = validAndInvalidSegments.getValidSegments
+    val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
+
+    // loop all segments to rebuild DataMap
+    validSegments.asScala.foreach { segment =>
+      // if lucene datamap folder is exists, not require to build lucene 
datamap again
+      refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
+        indexedCarbonColumns, segment.getSegmentNo);
+    }
+  }
+
+  private def refreshOneSegment(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      dataMapName: String,
+      indexColumns: java.util.List[CarbonColumn],
+      segmentId: String): Unit = {
+
+    val dataMapStorePath =
+      CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
+      File.separator +
+      dataMapName
+
+    if (!FileFactory.isFileExist(dataMapStorePath)) {
+      if (FileFactory.mkdirs(dataMapStorePath, 
FileFactory.getFileType(dataMapStorePath))) {
+        try {
+          val status = new IndexDataMapRebuildRDD[String, Boolean](
+            sparkSession,
+            new RefreshResultImpl(),
+            carbonTable.getTableInfo,
+            dataMapName,
+            indexColumns.asScala.toArray,
+            segmentId
+          ).collect()
+
+          status.find(_._2 == false).foreach { task =>
+            throw new Exception(
+              s"Task Failed to rebuild datamap $dataMapName on 
segment_$segmentId")
+          }
+        } catch {
+          case ex: Throwable =>
+            // process failure
+            
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
+            throw new Exception(
+              s"Failed to refresh datamap $dataMapName on segment_$segmentId", 
ex)
+        }
+      } else {
+        throw new IOException(s"Failed to create directory $dataMapStorePath")
+      }
+    }
+  }
+
+}
+
+class OriginalReadSupport(dataTypes: Array[DataType]) extends 
CarbonReadSupport[Array[Object]] {
+  override def initialize(carbonColumns: Array[CarbonColumn],
+      carbonTable: CarbonTable): Unit = {
+  }
+
+  override def readRow(data: Array[Object]): Array[Object] = {
+    dataTypes.zipWithIndex.foreach { case (dataType, i) =>
+      if (dataType == DataTypes.STRING) {
+        data(i) = data(i).toString
+      }
+    }
+    data
+  }
+
+  override def close(): Unit = {
+  }
+}
+
+class IndexDataMapRebuildRDD[K, V](
+    session: SparkSession,
+    result: RefreshResult[K, V],
+    @transient tableInfo: TableInfo,
+    dataMapName: String,
+    indexColumns: Array[CarbonColumn],
+    segmentId: String
+) extends CarbonRDDWithTableInfo[(K, V)](
+  session.sparkContext, Nil, tableInfo.serialize()) {
+
+  private val dataMapSchema = 
DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
+  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() 
+ "")
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new util.Date())
+  }
+
+  override def internalCompute(split: Partition, context: TaskContext): 
Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val dataMapFactory =
+      DataMapManager.get().getDataMapProvider(
+        CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, 
session).getDataMapFactory
+    var status = false
+    val inputMetrics = new CarbonInputMetrics
+    TaskMetricsMap.getInstance().registerThreadCallback()
+    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    inputMetrics.initBytesReadCallback(context, inputSplit)
+
+    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
+    val attemptContext = new TaskAttemptContextImpl(new Configuration(), 
attemptId)
+    val format = createInputFormat(attemptContext)
+
+    val model = format.createQueryModel(inputSplit, attemptContext)
+    // one query id per table
+    model.setQueryId(queryId)
+    model.setVectorReader(false)
+    model.setForcedDetailRawQuery(false)
+    model.setRequiredRowId(true)
+
+    var reader: CarbonRecordReader[Array[Object]] = null
+    var refresher: DataMapBuilder = null
+    try {
+      reader = new CarbonRecordReader(
+        model, new OriginalReadSupport(indexColumns.map(_.getDataType)), 
inputMetrics)
+      reader.initialize(inputSplit, attemptContext)
+
+      // we use task name as shard name to create the folder for this datamap
+      val shardName = 
CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
+      refresher = dataMapFactory.createBuilder(new Segment(segmentId), 
shardName)
+      refresher.initialize()
+
+      var blockletId = 0
+      var firstRow = true
+      while (reader.nextKeyValue()) {
+        val rowWithPosition = reader.getCurrentValue
+        val size = rowWithPosition.length
+        val pageId = rowWithPosition(size - 2).asInstanceOf[Int]
+        val rowId = rowWithPosition(size - 1).asInstanceOf[Int]
+
+        if (!firstRow && pageId == 0 && rowId == 0) {
+          // new blocklet started, increase blockletId
+          blockletId = blockletId + 1
+        } else {
+          firstRow = false
+        }
+
+        refresher.addRow(blockletId, pageId, rowId, rowWithPosition)
+      }
+
+      refresher.finish()
+
+      status = true
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close()
+        } catch {
+          case ex: Throwable =>
+            LOGGER.error(ex, "Failed to close reader")
+        }
+      }
+
+      if (refresher != null) {
+        try {
+          refresher.close()
+        } catch {
+          case ex: Throwable =>
+            LOGGER.error(ex, "Failed to close index writer")
+        }
+      }
+    }
+
+    new Iterator[(K, V)] {
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(split.index.toString, status)
+      }
+    }
+  }
+
+
+  private def createInputFormat(
+      attemptContext: TaskAttemptContextImpl) = {
+    val format = new CarbonTableInputFormat[Object]
+    val tableInfo1 = getTableInfo
+    val conf = attemptContext.getConfiguration
+    CarbonInputFormat.setTableInfo(conf, tableInfo1)
+    CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
+    CarbonInputFormat.setDataTypeConverter(conf, 
classOf[SparkDataTypeConverterImpl])
+
+    val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier()
+    CarbonInputFormat.setTablePath(
+      conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+
+    CarbonInputFormat.setSegmentsToAccess(
+      conf,
+      Segment.toSegmentList(Array(segmentId), null))
+
+    CarbonInputFormat.setColumnProjection(
+      conf,
+      new CarbonProjection(indexColumns.map(_.getColName)))
+    format
+  }
+
+  override protected def getPartitions = {
+    if (!dataMapSchema.isIndexDataMap) {
+      throw new UnsupportedOperationException
+    }
+    val conf = new Configuration()
+    val jobConf = new JobConf(conf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job = Job.getInstance(jobConf)
+    job.getConfiguration.set("query.id", queryId)
+
+    val format = new CarbonTableInputFormat[Object]
+
+    CarbonInputFormat.setSegmentsToAccess(
+      job.getConfiguration,
+      Segment.toSegmentList(Array(segmentId), null))
+
+    CarbonInputFormat.setTableInfo(
+      job.getConfiguration,
+      tableInfo)
+    CarbonInputFormat.setTablePath(
+      job.getConfiguration,
+      tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath)
+    CarbonInputFormat.setDatabaseName(
+      job.getConfiguration,
+      tableInfo.getDatabaseName)
+    CarbonInputFormat.setTableName(
+      job.getConfiguration,
+      tableInfo.getFactTable.getTableName)
+
+    format
+      .getSplits(job)
+      .asScala
+      .map(_.asInstanceOf[CarbonInputSplit])
+      .groupBy(_.taskId)
+      .map { group =>
+        new CarbonMultiBlockSplit(
+          group._2.asJava,
+          group._2.flatMap(_.getLocations).toArray)
+      }
+      .zipWithIndex
+      .map { split =>
+        new CarbonSparkPartition(id, split._2, split._1)
+      }
+      .toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
deleted file mode 100644
index c341c36..0000000
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap
-
-import java.io.{File, IOException}
-import java.text.SimpleDateFormat
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.TaskMetricsMap
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, 
CarbonProjection, CarbonRecordReader}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
-import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl}
-import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, 
CarbonSparkPartition}
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
-
-/**
- * Helper object to rebuild the index DataMap
- */
-object IndexDataMapRefreshRDD {
-
-  /**
-   * Rebuild the datamap for all existing data in the table
-   */
-  def rebuildDataMap(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      schema: DataMapSchema
-  ): Unit = {
-    val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
-    val validAndInvalidSegments = 
segmentStatusManager.getValidAndInvalidSegments()
-    val validSegments = validAndInvalidSegments.getValidSegments
-    val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
-
-    // loop all segments to rebuild DataMap
-    validSegments.asScala.foreach { segment =>
-      // if lucene datamap folder is exists, not require to build lucene 
datamap again
-      refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
-        indexedCarbonColumns, segment.getSegmentNo);
-    }
-  }
-
-  private def refreshOneSegment(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      dataMapName: String,
-      indexColumns: java.util.List[CarbonColumn],
-      segmentId: String): Unit = {
-
-    val dataMapStorePath =
-      CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
-      File.separator +
-      dataMapName
-
-    if (!FileFactory.isFileExist(dataMapStorePath)) {
-      if (FileFactory.mkdirs(dataMapStorePath, 
FileFactory.getFileType(dataMapStorePath))) {
-        try {
-          val status = new IndexDataMapRefreshRDD[String, Boolean](
-            sparkSession,
-            new RefreshResultImpl(),
-            carbonTable.getTableInfo,
-            dataMapName,
-            indexColumns.asScala.toArray,
-            segmentId
-          ).collect()
-
-          status.find(_._2 == false).foreach { task =>
-            throw new Exception(
-              s"Task Failed to refresh datamap $dataMapName on 
segment_$segmentId")
-          }
-        } catch {
-          case ex: Throwable =>
-            // process failure
-            
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
-            throw new Exception(
-              s"Failed to refresh datamap $dataMapName on segment_$segmentId", 
ex)
-        }
-      } else {
-        throw new IOException(s"Failed to create directory $dataMapStorePath")
-      }
-    }
-  }
-
-}
-
-class OriginalReadSupport(dataTypes: Array[DataType]) extends 
CarbonReadSupport[Array[Object]] {
-  override def initialize(carbonColumns: Array[CarbonColumn],
-      carbonTable: CarbonTable): Unit = {
-  }
-
-  override def readRow(data: Array[Object]): Array[Object] = {
-    dataTypes.zipWithIndex.foreach { case (dataType, i) =>
-      if (dataType == DataTypes.STRING) {
-        data(i) = data(i).toString
-      }
-    }
-    data
-  }
-
-  override def close(): Unit = {
-  }
-}
-
-class IndexDataMapRefreshRDD[K, V](
-    session: SparkSession,
-    result: RefreshResult[K, V],
-    @transient tableInfo: TableInfo,
-    dataMapName: String,
-    indexColumns: Array[CarbonColumn],
-    segmentId: String
-) extends CarbonRDDWithTableInfo[(K, V)](
-  session.sparkContext, Nil, tableInfo.serialize()) {
-
-  private val dataMapSchema = 
DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
-  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() 
+ "")
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new util.Date())
-  }
-
-  override def internalCompute(split: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val dataMapFactory =
-      DataMapManager.get().getDataMapProvider(
-        CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, 
session).getDataMapFactory
-    var status = false
-    val inputMetrics = new CarbonInputMetrics
-    TaskMetricsMap.getInstance().registerThreadCallback()
-    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    inputMetrics.initBytesReadCallback(context, inputSplit)
-
-    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
-    val attemptContext = new TaskAttemptContextImpl(new Configuration(), 
attemptId)
-    val format = createInputFormat(attemptContext)
-
-    val model = format.createQueryModel(inputSplit, attemptContext)
-    // one query id per table
-    model.setQueryId(queryId)
-    model.setVectorReader(false)
-    model.setForcedDetailRawQuery(false)
-    model.setRequiredRowId(true)
-
-    var reader: CarbonRecordReader[Array[Object]] = null
-    var refresher: DataMapRefresher = null
-    try {
-      reader = new CarbonRecordReader(
-        model, new OriginalReadSupport(indexColumns.map(_.getDataType)), 
inputMetrics)
-      reader.initialize(inputSplit, attemptContext)
-
-      // we use task name as shard name to create the folder for this datamap
-      val shardName = 
CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
-      refresher = dataMapFactory.createRefresher(new Segment(segmentId), 
shardName)
-      refresher.initialize()
-
-      var blockletId = 0
-      var firstRow = true
-      while (reader.nextKeyValue()) {
-        val rowWithPosition = reader.getCurrentValue
-        val size = rowWithPosition.length
-        val pageId = rowWithPosition(size - 2).asInstanceOf[Int]
-        val rowId = rowWithPosition(size - 1).asInstanceOf[Int]
-
-        if (!firstRow && pageId == 0 && rowId == 0) {
-          // new blocklet started, increase blockletId
-          blockletId = blockletId + 1
-        } else {
-          firstRow = false
-        }
-
-        refresher.addRow(blockletId, pageId, rowId, rowWithPosition)
-      }
-
-      refresher.finish()
-
-      status = true
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close()
-        } catch {
-          case ex: Throwable =>
-            LOGGER.error(ex, "Failed to close reader")
-        }
-      }
-
-      if (refresher != null) {
-        try {
-          refresher.close()
-        } catch {
-          case ex: Throwable =>
-            LOGGER.error(ex, "Failed to close index writer")
-        }
-      }
-    }
-
-    new Iterator[(K, V)] {
-
-      var finished = false
-
-      override def hasNext: Boolean = {
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey(split.index.toString, status)
-      }
-    }
-  }
-
-
-  private def createInputFormat(
-      attemptContext: TaskAttemptContextImpl) = {
-    val format = new CarbonTableInputFormat[Object]
-    val tableInfo1 = getTableInfo
-    val conf = attemptContext.getConfiguration
-    CarbonInputFormat.setTableInfo(conf, tableInfo1)
-    CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
-    CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
-    CarbonInputFormat.setDataTypeConverter(conf, 
classOf[SparkDataTypeConverterImpl])
-
-    val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier()
-    CarbonInputFormat.setTablePath(
-      conf,
-      identifier.appendWithLocalPrefix(identifier.getTablePath))
-
-    CarbonInputFormat.setSegmentsToAccess(
-      conf,
-      Segment.toSegmentList(Array(segmentId), null))
-
-    CarbonInputFormat.setColumnProjection(
-      conf,
-      new CarbonProjection(indexColumns.map(_.getColName)))
-    format
-  }
-
-  override protected def getPartitions = {
-    if (!dataMapSchema.isIndexDataMap) {
-      throw new UnsupportedOperationException
-    }
-    val conf = new Configuration()
-    val jobConf = new JobConf(conf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
-    val job = Job.getInstance(jobConf)
-    job.getConfiguration.set("query.id", queryId)
-
-    val format = new CarbonTableInputFormat[Object]
-
-    CarbonInputFormat.setSegmentsToAccess(
-      job.getConfiguration,
-      Segment.toSegmentList(Array(segmentId), null))
-
-    CarbonInputFormat.setTableInfo(
-      job.getConfiguration,
-      tableInfo)
-    CarbonInputFormat.setTablePath(
-      job.getConfiguration,
-      tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath)
-    CarbonInputFormat.setDatabaseName(
-      job.getConfiguration,
-      tableInfo.getDatabaseName)
-    CarbonInputFormat.setTableName(
-      job.getConfiguration,
-      tableInfo.getFactTable.getTableName)
-
-    format
-      .getSplits(job)
-      .asScala
-      .map(_.asInstanceOf[CarbonInputSplit])
-      .groupBy(_.taskId)
-      .map { group =>
-        new CarbonMultiBlockSplit(
-          group._2.asJava,
-          group._2.flatMap(_.getLocations).toArray)
-      }
-      .zipWithIndex
-      .map { split =>
-        new CarbonSparkPartition(id, split._2, split._1)
-      }
-      .toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 42c9c25..bdbaef5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -848,11 +848,11 @@ object CarbonDataRDDFactory {
       val errorMessage = s"Dataload failed due to failure in table status 
updation for" +
                          s" ${carbonLoadModel.getTableName}"
       LOGGER.audit("Data load is failed for " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+                   
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
       LOGGER.error("Dataload failed due to failure in table status updation.")
       throw new Exception(errorMessage)
     } else {
-      DataMapStatusManager.disableDataMapsOfTable(carbonTable)
+      DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
     }
     done
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 5fac5a8..497f95a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.profiler.{Profiler, SQLStart}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
+import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.scan.expression.LiteralExpression
@@ -110,6 +111,16 @@ class CarbonSession(@transient val sc: SparkContext,
     )
   }
 
+  /**
+   * Return true if the specified sql statement will hit the datamap
+   * This API is for test purpose only
+   */
+  @InterfaceAudience.Developer(Array("DataMap"))
+  def isDataMapHit(sqlStatement: String, dataMapName: String): Boolean = {
+    val message = sql(s"EXPLAIN $sqlStatement").collect()
+    message(0).getString(0).contains(dataMapName)
+  }
+
   def isSearchModeEnabled: Boolean = carbonStore != null
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 34a4013..25589d4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution.command.datamap
 
+import java.util
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -27,7 +29,7 @@ import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandExcept
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.{DataMapProvider, 
DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import 
org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, 
DataMapProperty}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
 
@@ -41,7 +43,8 @@ case class CarbonCreateDataMapCommand(
     dmProviderName: String,
     dmProperties: Map[String, String],
     queryString: Option[String],
-    ifNotExistsSet: Boolean = false)
+    ifNotExistsSet: Boolean = false,
+    deferredRebuild: Boolean = false)
   extends AtomicRunnableCommand {
 
   private var dataMapProvider: DataMapProvider = _
@@ -78,9 +81,16 @@ case class CarbonCreateDataMapCommand(
     }
 
     dataMapSchema = new DataMapSchema(dataMapName, dmProviderName)
-    dataMapSchema.setProperties(new java.util.HashMap[String, String](
-      dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
 
+    val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava
+    val javaMap = new java.util.HashMap[String, String](property)
+    javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString)
+    dataMapSchema.setProperties(javaMap)
+
+    if (dataMapSchema.isIndexDataMap && mainTable == null) {
+      throw new MalformedDataMapCommandException(
+        "For this datamap, main table is required. Use `CREATE DATAMAP ... ON 
TABLE ...` ")
+    }
     dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, 
dataMapSchema, sparkSession)
 
     // If it is index datamap, check whether the column has datamap created 
already
@@ -101,6 +111,10 @@ case class CarbonCreateDataMapCommand(
         dataMapProvider.initMeta(queryString.orNull)
         DataMapStatusManager.disableDataMap(dataMapName)
       case _ =>
+        if (deferredRebuild) {
+          throw new MalformedDataMapCommandException(
+            "DEFERRED REBUILD is not supported on this DataMap")
+        }
         dataMapProvider.initMeta(queryString.orNull)
     }
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -111,10 +125,11 @@ case class CarbonCreateDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dataMapProvider != null) {
       dataMapProvider.initData()
-      if (mainTable != null &&
-          mainTable.isAutoRefreshDataMap &&
-          !dataMapSchema.isIndexDataMap) {
+      if (mainTable != null && !deferredRebuild) {
         dataMapProvider.rebuild()
+        if (dataMapSchema.isIndexDataMap) {
+          DataMapStatusManager.enableDataMap(dataMapName)
+        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
new file mode 100644
index 0000000..6493c83
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.datamap
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+
+import org.apache.carbondata.core.datamap.{DataMapRegistry, 
DataMapStoreManager}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD}
+
+/**
+ * Rebuild the datamaps through sync with main table data. After sync with 
parent table's it enables
+ * the datamap.
+ */
+case class CarbonDataMapRebuildCommand(
+    dataMapName: String,
+    tableIdentifier: Option[TableIdentifier]) extends DataCommand {
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val schema = 
DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
+
+    val table = tableIdentifier match {
+      case Some(identifier) =>
+        CarbonEnv.getCarbonTable(identifier)(sparkSession)
+      case _ =>
+        CarbonEnv.getCarbonTable(
+          Option(schema.getRelationIdentifier.getDatabaseName),
+          schema.getRelationIdentifier.getTableName
+        )(sparkSession)
+    }
+    val provider = DataMapManager.get().getDataMapProvider(table, schema, 
sparkSession)
+    provider.rebuild()
+
+    // After rebuild successfully enable the datamap.
+    DataMapStatusManager.enableDataMap(dataMapName)
+    Seq.empty
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
deleted file mode 100644
index 4f3b7bc..0000000
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.datamap
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.DataCommand
-
-import org.apache.carbondata.core.datamap.{DataMapRegistry, 
DataMapStoreManager}
-import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRefreshRDD}
-
-/**
- * Refresh the datamaps through sync with main table data. After sync with 
parent table's it enables
- * the datamap.
- */
-case class CarbonDataMapRefreshCommand(
-    dataMapName: String,
-    tableIdentifier: Option[TableIdentifier]) extends DataCommand {
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val schema = 
DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
-
-    val table = tableIdentifier match {
-      case Some(identifier) =>
-        CarbonEnv.getCarbonTable(identifier)(sparkSession)
-      case _ =>
-        CarbonEnv.getCarbonTable(
-          Option(schema.getRelationIdentifier.getDatabaseName),
-          schema.getRelationIdentifier.getTableName
-        )(sparkSession)
-    }
-    val provider = DataMapManager.get().getDataMapProvider(table, schema, 
sparkSession)
-    provider.rebuild()
-
-    // After sync success enable the datamap.
-    DataMapStatusManager.enableDataMap(dataMapName)
-    Seq.empty
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 1bb7d7c..4f60297 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -231,8 +231,9 @@ class SparkCarbonFileFormat extends FileFormat
 
         val tab = model.getTable
         DataMapStoreManager.getInstance().clearDataMaps(identifier)
-        val dataMapExprWrapper = DataMapChooser.get
-          .choose(tab, model.getFilterExpressionResolverTree)
+
+        val dataMapExprWrapper = new DataMapChooser(tab).choose(
+          model.getFilterExpressionResolverTree)
 
         // TODO : handle the partition for CarbonFileLevelFormat
         val prunedBlocklets = dataMapExprWrapper.prune(segments, null)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 62da7ed..9dd8105 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
-import 
org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, 
CarbonDataMapRefreshCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
+import 
org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, 
CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
 import org.apache.spark.sql.execution.command.management._
 import 
org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropPartitionCommand,
 CarbonAlterTableSplitPartitionCommand}
 import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
 CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
@@ -147,19 +147,23 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   /**
    * The syntax of datamap creation is as follows.
-   * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 
'DataMapProviderName'
+   * CREATE DATAMAP IF NOT EXISTS datamapName [ON TABLE tableName]
+   * USING 'DataMapProviderName'
+   * [WITH DEFERRED REBUILD]
    * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
    */
   protected lazy val createDataMap: Parser[LogicalPlan] =
     CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
     opt(ontable) ~
-    (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ 
")").? ~
+    (USING ~> stringLit) ~
+    opt(WITH ~> DEFERRED ~> REBUILD) ~
+    (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
     (AS ~> restInput).? <~ opt(";") ^^ {
-      case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ dmprops ~ 
query =>
+      case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ deferred ~ 
dmprops ~ query =>
 
         val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, 
String]
         CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, 
query,
-          ifnotexists.isDefined)
+          ifnotexists.isDefined, deferred.isDefined)
     }
 
   protected lazy val ontable: Parser[TableIdentifier] =
@@ -190,12 +194,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   /**
    * The syntax of show datamap is used to show datamaps on the table
-   * REFRESH DATAMAP datamapname [ON TABLE] tableName
+   * REBUILD DATAMAP datamapname [ON TABLE] tableName
    */
   protected lazy val refreshDataMap: Parser[LogicalPlan] =
-    REFRESH ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ {
+    REBUILD ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ {
       case datamap ~ tableIdent =>
-        CarbonDataMapRefreshCommand(datamap, tableIdent)
+        CarbonDataMapRebuildCommand(datamap, tableIdent)
     }
 
   protected lazy val deleteRecords: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index a8e7b6c..7df3901 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -22,44 +22,56 @@ import java.util.UUID
 
 import scala.util.Random
 
+import org.apache.spark.sql.{CarbonSession, DataFrame}
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 
-class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
-  val inputFile = s"$resourcesPath/bloom_datamap_input.csv"
+class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
+  val bigFile = s"$resourcesPath/bloom_datamap_input_big.csv"
+  val smallFile = s"$resourcesPath/bloom_datamap_input_small.csv"
   val normalTable = "carbon_normal"
   val bloomDMSampleTable = "carbon_bloom"
   val dataMapName = "bloom_dm"
-  val lineNum = 500000
 
   override protected def beforeAll(): Unit = {
     new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
-    createFile(inputFile, line = lineNum, start = 0)
+    createFile(bigFile, line = 500000)
+    createFile(smallFile)
     sql(s"DROP TABLE IF EXISTS $normalTable")
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  private def checkQuery = {
+  override def afterEach(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  private def checkSqlHitDataMap(sqlText: String, dataMapName: String, 
shouldHit: Boolean): DataFrame = {
+    if (shouldHit) {
+      
assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText,
 dataMapName))
+    } else {
+      
assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText,
 dataMapName))
+    }
+    sql(sqlText)
+  }
+
+  private def checkQuery(dataMapName: String, shouldHit: Boolean = true) = {
     checkAnswer(
-      sql(s"select * from $bloomDMSampleTable where id = 1"),
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 1", 
dataMapName, shouldHit),
       sql(s"select * from $normalTable where id = 1"))
     checkAnswer(
-      sql(s"select * from $bloomDMSampleTable where id = 999"),
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 999", 
dataMapName, shouldHit),
       sql(s"select * from $normalTable where id = 999"))
     checkAnswer(
-      sql(s"select * from $bloomDMSampleTable where city = 'city_1'"),
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 
'city_1'", dataMapName, shouldHit),
       sql(s"select * from $normalTable where city = 'city_1'"))
     checkAnswer(
-      sql(s"select * from $bloomDMSampleTable where city = 'city_999'"),
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 
'city_999'", dataMapName, shouldHit),
       sql(s"select * from $normalTable where city = 'city_999'"))
-    checkAnswer(
-      sql(s"select count(distinct id), count(distinct name), count(distinct 
city)," +
-          s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"),
-      sql(s"select count(distinct id), count(distinct name), count(distinct 
city)," +
-          s" count(distinct s1), count(distinct s2) from $normalTable"))
-    checkAnswer(
+     checkAnswer(
       sql(s"select min(id), max(id), min(name), max(name), min(city), 
max(city)" +
           s" from $bloomDMSampleTable"),
       sql(s"select min(id), max(id), min(name), max(name), min(city), 
max(city)" +
@@ -86,28 +98,34 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
       """.stripMargin)
 
+    var map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
     // load two segments
     (1 to 2).foreach { i =>
       sql(
         s"""
-           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+           | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
            | OPTIONS('header'='false')
          """.stripMargin)
       sql(
         s"""
-           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+           | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
            | OPTIONS('header'='false')
          """.stripMargin)
     }
 
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
     sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
     checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, 
dataMapName)
-    checkQuery
+    checkQuery(dataMapName)
     sql(s"DROP TABLE IF EXISTS $normalTable")
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  test("test create bloom datamap and refresh datamap") {
+  test("test create bloom datamap and REBUILD DATAMAP") {
     sql(
       s"""
          | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
@@ -125,12 +143,12 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
     (1 to 2).foreach { i =>
       sql(
         s"""
-           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+           | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
            | OPTIONS('header'='false')
          """.stripMargin)
       sql(
         s"""
-           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+           | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
            | OPTIONS('header'='false')
          """.stripMargin)
     }
@@ -142,18 +160,135 @@ class BloomCoarseGrainDataMapSuite extends QueryTest 
with BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
       """.stripMargin)
 
-    sql(s"REFRESH DATAMAP $dataMapName ON TABLE $bloomDMSampleTable")
     sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
     checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, 
dataMapName)
-    checkQuery
+    checkQuery(dataMapName)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, 
s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, 
age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, 
s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    var map = DataMapStatusManager.readDataMapStatusMap()
+    assert(!map.get(dataMapName).isEnabled)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(!map.get(dataMapName).isEnabled)
+
+    // once we rebuild, it should be enabled
+    sql(s"REBUILD DATAMAP $dataMapName ON TABLE $bloomDMSampleTable")
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, 
dataMapName)
+    checkQuery(dataMapName)
+
+    // once we load again, datamap should be disabled, since it is lazy
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(!map.get(dataMapName).isEnabled)
+    checkQuery(dataMapName, shouldHit = false)
+
     sql(s"DROP TABLE IF EXISTS $normalTable")
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  // todo: will add more tests on bloom datamap, such as exception, delete 
datamap, show profiler
+  test("test create bloom datamap with DEFERRED REBUILD, query not hit 
datamap") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, 
s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, 
age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, 
s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, 
dataMapName)
+
+    // datamap is not loaded, so it should not hit
+    checkQuery(dataMapName, shouldHit = false)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
 
   override protected def afterAll(): Unit = {
-    deleteFile(inputFile)
+    deleteFile(bigFile)
+    deleteFile(smallFile)
     sql(s"DROP TABLE IF EXISTS $normalTable")
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 9c3d5d6..3dc34d3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -61,8 +61,12 @@ public class DataMapWriterListener {
     }
     if (tableIndices != null) {
       for (TableDataMap tableDataMap : tableIndices) {
-        DataMapFactory factory = tableDataMap.getDataMapFactory();
-        register(factory, segmentId, taskNo);
+        // register it only if it is not lazy datamap, for lazy datamap, user
+        // will rebuild the datamap manually
+        if (!tableDataMap.getDataMapSchema().isLazy()) {
+          DataMapFactory factory = tableDataMap.getDataMapFactory();
+          register(factory, segmentId, taskNo);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
 
b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 957e9f8..35acb17 100644
--- 
a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ 
b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -21,12 +21,11 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -104,8 +103,10 @@ public class SearchRequestHandler {
     LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
         request.searchId(), queryModel.toString(), 
mbSplit.getAllSplits().size()));
 
-    // If there is FGDataMap, prune the split by applying FGDataMap
-    queryModel = tryPruneByFGDataMap(request.searchId(), table, queryModel, 
mbSplit);
+    // If there is DataMap selected in Master, prune the split by it
+    if (request.dataMap() != null) {
+      queryModel = prune(request.searchId(), table, queryModel, mbSplit, 
request.dataMap().get());
+    }
 
     // In search mode, reader will read multiple blocks by using a thread pool
     CarbonRecordReader<CarbonRow> reader =
@@ -135,35 +136,32 @@ public class SearchRequestHandler {
    * If there is FGDataMap defined for this table and filter condition in the 
query,
    * prune the splits by the DataMap and set the pruned split into the 
QueryModel and return
    */
-  private QueryModel tryPruneByFGDataMap(int queryId,
-      CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) 
throws IOException {
-    DataMapExprWrapper wrapper =
-        DataMapChooser.get().choose(table, 
queryModel.getFilterExpressionResolverTree());
-
-    if (wrapper.getDataMapLevel() == DataMapLevel.FG) {
-      List<Segment> segments = new LinkedList<>();
-      for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-        segments.add(Segment.toSegment(
-            split.getSegmentId(), new 
LatestFilesReadCommittedScope(table.getTablePath())));
-      }
-      List<ExtendedBlocklet> prunnedBlocklets = wrapper.prune(segments, null);
+  private QueryModel prune(int queryId, CarbonTable table, QueryModel 
queryModel,
+      CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws 
IOException {
+    Objects.requireNonNull(datamap);
+    List<Segment> segments = new LinkedList<>();
+    for (CarbonInputSplit split : mbSplit.getAllSplits()) {
+      segments.add(
+          Segment.toSegment(split.getSegmentId(),
+              new LatestFilesReadCommittedScope(table.getTablePath())));
+    }
+    List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null);
 
-      List<String> pathToRead = new LinkedList<>();
-      for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
-        pathToRead.add(prunnedBlocklet.getPath());
-      }
+    List<String> pathToRead = new LinkedList<>();
+    for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
+      pathToRead.add(prunnedBlocklet.getPath());
+    }
 
-      List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
-      List<TableBlockInfo> blockToRead = new LinkedList<>();
-      for (TableBlockInfo block : blocks) {
-        if (pathToRead.contains(block.getFilePath())) {
-          blockToRead.add(block);
-        }
+    List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
+    List<TableBlockInfo> blockToRead = new LinkedList<>();
+    for (TableBlockInfo block : blocks) {
+      if (pathToRead.contains(block.getFilePath())) {
+        blockToRead.add(block);
       }
-      LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned 
blocks: %d",
-          queryId, blockToRead.size()));
-      queryModel.setTableBlockInfos(blockToRead);
     }
+    LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned 
blocks: %d", queryId,
+        blockToRead.size()));
+    queryModel.setTableBlockInfos(blockToRead);
     return queryModel;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala 
b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index 5b31a49..2e9a532 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -38,12 +38,15 @@ import org.apache.spark.util.ThreadUtils
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapChooser
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.store.worker.Status
@@ -212,11 +215,13 @@ class Master(sparkConf: SparkConf) {
     // prune data and get a mapping of worker hostname to list of blocks,
     // then add these blocks to the SearchRequest and fire the RPC call
     val nodeBlockMapping: JMap[String, JList[Distributable]] = 
pruneBlock(table, columns, filter)
+    val fgDataMap = chooseFGDataMap(table, filter)
     val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
       // Build a SearchRequest
       val split = new SerializableWritable[CarbonMultiBlockSplit](
         new CarbonMultiBlockSplit(blocks, splitAddress))
-      val request = SearchRequest(queryId, split, table.getTableInfo, columns, 
filter, localLimit)
+      val request =
+        SearchRequest(queryId, split, table.getTableInfo, columns, filter, 
localLimit, fgDataMap)
 
       // Find an Endpoind and send the request to it
       // This RPC is non-blocking so that we do not need to wait before send 
to next worker
@@ -249,6 +254,14 @@ class Master(sparkConf: SparkConf) {
     output.toArray
   }
 
+  private def chooseFGDataMap(
+      table: CarbonTable,
+      filter: Expression): Option[DataMapExprWrapper] = {
+    val chooser = new DataMapChooser(table)
+    val filterInterface = table.resolveFilter(filter)
+    Option(chooser.chooseFGDataMap(filterInterface))
+  }
+
   /**
    * Prune data by using CarbonInputFormat.getSplit
    * Return a mapping of host address to list of block
@@ -261,6 +274,9 @@ class Master(sparkConf: SparkConf) {
     val job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
       job, table, columns, filter, null, null)
+
+    // We will do FG pruning in reader side, so don't do it here
+    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
     val splits = format.getSplits(job)
     val distributables = splits.asScala.map { split =>
       split.asInstanceOf[Distributable]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala 
b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
index e467fd3..1532284 100644
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SerializableWritable
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
@@ -37,11 +38,11 @@ class Searcher(override val rpcEnv: RpcEnv) extends 
RpcEndpoint {
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case req@SearchRequest(_, _, _, _, _, _) =>
+    case req: SearchRequest =>
       val response = new SearchRequestHandler().handleSearch(req)
       context.reply(response)
 
-    case req@ShutdownRequest(_) =>
+    case req: ShutdownRequest =>
       val response = new SearchRequestHandler().handleShutdown(req)
       context.reply(response)
 
@@ -59,7 +60,8 @@ case class SearchRequest(
     tableInfo: TableInfo,
     projectColumns: Array[String],
     filterExpression: Expression,
-    limit: Long)
+    limit: Long,
+    dataMap: Option[DataMapExprWrapper])
 
 // Search result sent from worker to master
 case class SearchResult(

Reply via email to