Repository: carbondata
Updated Branches:
  refs/heads/master b08ef0012 -> 21c5fb1db


[CARBONDATA-2390] Refresh Lucene data map for the exists table with data

if the table has old data before the creation of the Lucene data map, we should 
use Refresh command to build data map manually.

This closes #2218


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/21c5fb1d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/21c5fb1d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/21c5fb1d

Branch: refs/heads/master
Commit: 21c5fb1dbe648252cc0163ade594adcfce75795d
Parents: b08ef00
Author: QiangCai <[email protected]>
Authored: Tue Apr 24 00:10:01 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Thu Apr 26 19:29:48 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |   2 +-
 .../scan/collector/ResultCollectorFactory.java  |   4 +
 .../impl/DictionaryBasedResultCollector.java    |   2 +-
 .../impl/RowIdBasedResultCollector.java         |  77 +++++
 .../executor/impl/AbstractQueryExecutor.java    |   1 +
 .../scan/executor/infos/BlockExecutionInfo.java |  15 +-
 .../carbondata/core/scan/model/QueryModel.java  |  13 +
 .../core/scan/result/BlockletScannedResult.java |   4 +
 datamap/lucene/pom.xml                          |   2 +-
 .../lucene/LuceneDataMapFactoryBase.java        |   4 +-
 .../datamap/lucene/LuceneDataMapWriter.java     |  13 +-
 .../lucene/LuceneIndexRefreshBuilder.java       | 220 ++++++++++++++
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  35 +++
 .../org/apache/carbondata/spark/KeyVal.scala    |   8 +
 integration/spark2/pom.xml                      |   5 +
 .../lucene/LuceneDataMapRefreshRDD.scala        | 299 +++++++++++++++++++
 .../datamap/CarbonCreateDataMapCommand.scala    |   4 +-
 .../datamap/CarbonDataMapRefreshCommand.scala   |  16 +-
 18 files changed, 708 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f0ab857..7f187b9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -839,7 +839,7 @@ public class CarbonTable implements Serializable {
    * Return true if 'autoRefreshDataMap' is enabled, by default it is enabled
    */
   public boolean isAutoRefreshDataMap() {
-    String refresh = 
getTableInfo().getFactTable().getTableProperties().get("autoRefreshDataMap");
+    String refresh = 
getTableInfo().getFactTable().getTableProperties().get("autorefreshdatamap");
     return refresh == null || refresh.equalsIgnoreCase("true");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
index b625c58..0b1dde5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
 import 
org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector;
 import 
org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector;
 import 
org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector;
+import 
org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 
 /**
@@ -67,6 +68,9 @@ public class ResultCollectorFactory {
       if (blockExecutionInfo.isRestructuredBlock()) {
         LOGGER.info("Restructure based dictionary collector is used to scan 
and collect the data");
         scannerResultAggregator = new 
RestructureBasedDictionaryResultCollector(blockExecutionInfo);
+      } else if (blockExecutionInfo.isRequiredRowId()) {
+        LOGGER.info("RowId based dictionary collector is used to scan and 
collect the data");
+        scannerResultAggregator = new 
RowIdBasedResultCollector(blockExecutionInfo);
       } else {
         LOGGER.info("Row based dictionary collector is used to scan and 
collect the data");
         scannerResultAggregator = new 
DictionaryBasedResultCollector(blockExecutionInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index 4322034..bb048aa 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -69,7 +69,7 @@ public class DictionaryBasedResultCollector extends 
AbstractScannedResultCollect
 
   boolean isDimensionExists;
 
-  private Map<Integer, GenericQueryType> comlexDimensionInfoMap;
+  protected Map<Integer, GenericQueryType> comlexDimensionInfoMap;
 
   public DictionaryBasedResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
     super(blockExecutionInfos);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
new file mode 100644
index 0000000..ec458d6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.collector.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+
+/**
+ * This class append blocklet id, page id and row id to each row.
+ */
[email protected]
+public class RowIdBasedResultCollector extends DictionaryBasedResultCollector {
+
+  public RowIdBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    super(blockExecutionInfos);
+  }
+
+  @Override
+  public List<Object[]> collectResultInRow(BlockletScannedResult 
scannedResult, int batchSize) {
+
+    // scan the record and add to list
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+    int rowCounter = 0;
+    int[] surrogateResult;
+    byte[][] noDictionaryKeys;
+    byte[][] complexTypeKeyArray;
+    int columnCount = queryDimensions.length + queryMeasures.length;
+    while (scannedResult.hasNext() && rowCounter < batchSize) {
+      Object[] row = new Object[columnCount + 3];
+      row[columnCount] = Integer.parseInt(scannedResult.getBlockletNumber());
+      row[columnCount + 1] = scannedResult.getCurrentPageCounter();
+      if (isDimensionExists) {
+        surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
+        noDictionaryKeys = scannedResult.getNoDictionaryKeyArray();
+        complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+        dictionaryColumnIndex = 0;
+        noDictionaryColumnIndex = 0;
+        complexTypeColumnIndex = 0;
+        for (int i = 0; i < queryDimensions.length; i++) {
+          fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, 
complexTypeKeyArray,
+              comlexDimensionInfoMap, row, i);
+        }
+      } else {
+        scannedResult.incrementCounter();
+      }
+      row[columnCount + 2] = scannedResult.getCurrentRowId();
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+        continue;
+      }
+      fillMeasureData(scannedResult, row);
+      listBasedResult.add(row);
+      rowCounter++;
+    }
+    return listBasedResult;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index bc410ce..08c5cc7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -458,6 +458,7 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
     
blockExecutionInfo.setActualQueryMeasures(queryModel.getProjectionMeasures()
         .toArray(new 
ProjectionMeasure[queryModel.getProjectionMeasures().size()]));
     DataTypeUtil.setDataTypeConverter(queryModel.getConverter());
+    blockExecutionInfo.setRequiredRowId(queryModel.isRequiredRowId());
     return blockExecutionInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index 06adf8f..6633195 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -86,7 +86,7 @@ public class BlockExecutionInfo {
   private int[] projectionListDimensionIndexes;
 
   /**
-   * list of dimension present in the projection
+   * list of measure present in the projection
    */
   private int[] projectionListMeasureIndexes;
 
@@ -211,6 +211,11 @@ public class BlockExecutionInfo {
   private Map<String, DeleteDeltaVo> deletedRecordsMap;
 
   /**
+   * whether it require to output the row id
+   */
+  private boolean requiredRowId;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -618,4 +623,12 @@ public class BlockExecutionInfo {
   public void setPrefetchBlocklet(boolean prefetchBlocklet) {
     this.prefetchBlocklet = prefetchBlocklet;
   }
+
+  public boolean isRequiredRowId() {
+    return requiredRowId;
+  }
+
+  public void setRequiredRowId(boolean requiredRowId) {
+    this.requiredRowId = requiredRowId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 47a6bdd..409bc2a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -104,6 +104,11 @@ public class QueryModel {
    */
   private boolean readPageByPage;
 
+  /**
+   * whether it require to output the row id
+   */
+  private boolean requiredRowId;
+
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     invalidSegmentIds = new ArrayList<>();
@@ -356,4 +361,12 @@ public class QueryModel {
   public void setReadPageByPage(boolean readPageByPage) {
     this.readPageByPage = readPageByPage;
   }
+
+  public boolean isRequiredRowId() {
+    return requiredRowId;
+  }
+
+  public void setRequiredRowId(boolean requiredRowId) {
+    this.requiredRowId = requiredRowId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index df403c5..33caa98 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -618,4 +618,8 @@ public abstract class BlockletScannedResult {
     }
     return false;
   }
+
+  public String getBlockletNumber() {
+    return blockletNumber;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/datamap/lucene/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/lucene/pom.xml b/datamap/lucene/pom.xml
index 4019065..f212e5d 100644
--- a/datamap/lucene/pom.xml
+++ b/datamap/lucene/pom.xml
@@ -22,7 +22,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark2</artifactId>
+      <artifactId>carbondata-core</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index a7b8831..9727492 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -126,7 +126,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
    * 4. TEXT_COLUMNS should be exists in table columns
    * 5. TEXT_COLUMNS support only String DataType columns
    */
-  private List<String> validateAndGetIndexedColumns(DataMapSchema 
dataMapSchema,
+  public static List<String> validateAndGetIndexedColumns(DataMapSchema 
dataMapSchema,
       CarbonTable carbonTable) throws MalformedDataMapCommandException {
     String textColumnsStr = dataMapSchema.getProperties().get(TEXT_COLUMNS);
     if (textColumnsStr == null || StringUtils.isBlank(textColumnsStr)) {
@@ -148,7 +148,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
         }
       }
     }
-    indexedCarbonColumns = new ArrayList<>(textColumns.length);
+    List<String> indexedCarbonColumns = new ArrayList<>(textColumns.length);
     for (int i = 0; i < textColumns.length; i++) {
       CarbonColumn column = 
carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]);
       if (null == column) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 2025f73..4e8adee 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -82,11 +82,11 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   private List<String> indexedCarbonColumns = null;
 
-  private static final String BLOCKLETID_NAME = "blockletId";
+  public static final String BLOCKLETID_NAME = "blockletId";
 
-  private static final String PAGEID_NAME = "pageId";
+  public static final String PAGEID_NAME = "pageId";
 
-  private static final String ROWID_NAME = "rowId";
+  public static final String ROWID_NAME = "rowId";
 
   LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, 
Segment segment,
       String writeDirectoryPath, boolean isFineGrain, List<String> 
indexedCarbonColumns) {
@@ -111,8 +111,9 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * Start of new block notification.
    */
   public void onBlockStart(String blockId, String indexShardName) throws 
IOException {
-    // save this block id for lucene index , used in onPageAdd function
-
+    if (indexWriter != null) {
+      return;
+    }
     // get index path, put index data into segment's path
     String strIndexPath = getIndexPath(indexShardName);
     Path indexPath = FileFactory.getPath(strIndexPath);
@@ -343,7 +344,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
    *
    * @return store path based on taskID
    */
-  private static String genDataMapStorePathOnTaskId(String tablePath, String 
segmentId,
+  public static String genDataMapStorePathOnTaskId(String tablePath, String 
segmentId,
       String dataMapName, String taskName) {
     return CarbonTablePath.getSegmentPath(tablePath, segmentId) + 
File.separator + dataMapName
         + File.separator + taskName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
new file mode 100644
index 0000000..2d5dcf8
--- /dev/null
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
+import org.apache.lucene.codecs.lucene62.Lucene62Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.IntRangeField;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+public class LuceneIndexRefreshBuilder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
+
+  private String strIndexPath;
+
+  private String[] indexColumns;
+  private DataType[] dataTypes;
+
+  private int columnsCount;
+
+  private IndexWriter indexWriter = null;
+
+  private IndexWriter pageIndexWriter = null;
+
+  private Analyzer analyzer = null;
+
+  public LuceneIndexRefreshBuilder(String strIndexPath, String[] indexColumns,
+      DataType[] dataTypes) {
+    this.strIndexPath = strIndexPath;
+    this.indexColumns = indexColumns;
+    this.columnsCount = indexColumns.length;
+    this.dataTypes = dataTypes;
+  }
+
+  public void initialize() throws IOException {
+    // get index path, put index data into segment's path
+    Path indexPath = FileFactory.getPath(strIndexPath);
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // if index path not exists, create it
+    if (!fs.exists(indexPath)) {
+      fs.mkdirs(indexPath);
+    }
+
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // create a index writer
+    Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
+
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
+      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+    }
+
+    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+  }
+
+  private IndexWriter createPageIndexWriter() throws IOException {
+    // save index data into ram, write into disk after one page finished
+    RAMDirectory ramDir = new RAMDirectory();
+    IndexWriter ramIndexWriter = new IndexWriter(ramDir, new 
IndexWriterConfig(analyzer));
+
+    return ramIndexWriter;
+  }
+
+  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
+
+    Directory directory = pageIndexWriter.getDirectory();
+
+    // close ram writer
+    pageIndexWriter.close();
+
+    // add ram index data into disk
+    indexWriter.addIndexes(new Directory[] { directory });
+
+    // delete this ram data
+    directory.close();
+  }
+
+  public void addDocument(Object[] values) throws IOException {
+
+    if (values.length != indexColumns.length + 3) {
+      throw new IOException("The column number (" + values.length + ") of the 
row  is incorrect.");
+    }
+    int rowId = (int) values[indexColumns.length + 2];
+    if (rowId == 0) {
+      if (pageIndexWriter != null) {
+        addPageIndex(pageIndexWriter);
+      }
+      pageIndexWriter = createPageIndexWriter();
+    }
+
+    // create a new document
+    Document doc = new Document();
+
+    // add blocklet Id
+    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME,
+        new int[] { (int) values[columnsCount] }));
+    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
+
+    // add page id
+    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME,
+        new int[] { (int) values[columnsCount + 1] }));
+    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
+
+    // add row id
+    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, new int[] { rowId }));
+    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
+
+    // add other fields
+    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
+      addField(doc, indexColumns[colIdx], dataTypes[colIdx], values[colIdx]);
+    }
+
+    pageIndexWriter.addDocument(doc);
+  }
+
+  private boolean addField(Document doc, String fieldName, DataType type, 
Object value) {
+    if (type == DataTypes.BYTE) {
+      // byte type , use int range to deal with byte, lucene has no byte type
+      IntRangeField field =
+          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] 
{ Byte.MAX_VALUE });
+      field.setIntValue((int) value);
+      doc.add(field);
+    } else if (type == DataTypes.SHORT) {
+      // short type , use int range to deal with short type, lucene has no 
short type
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 
Short.MIN_VALUE },
+          new int[] { Short.MAX_VALUE });
+      field.setShortValue((short) value);
+      doc.add(field);
+    } else if (type == DataTypes.INT) {
+      // int type , use int point to deal with int type
+      doc.add(new IntPoint(fieldName, new int[] { (int) value }));
+    } else if (type == DataTypes.LONG) {
+      // long type , use long point to deal with long type
+      doc.add(new LongPoint(fieldName, new long[] { (long) value }));
+    } else if (type == DataTypes.FLOAT) {
+      doc.add(new FloatPoint(fieldName, new float[] { (float) value }));
+    } else if (type == DataTypes.DOUBLE) {
+      doc.add(new DoublePoint(fieldName, new double[] { (double) value }));
+    } else if (type == DataTypes.STRING) {
+      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
+    } else if (type == DataTypes.DATE) {
+      // TODO: how to get data value
+    } else if (type == DataTypes.TIMESTAMP) {
+      // TODO: how to get
+    } else if (type == DataTypes.BOOLEAN) {
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new 
int[] { 1 });
+      field.setIntValue((boolean) value ? 1 : 0);
+      doc.add(field);
+    } else {
+      LOGGER.error("unsupport data type " + type);
+      throw new RuntimeException("unsupported data type " + type);
+    }
+    return true;
+  }
+
+  public void finish() throws IOException {
+    if (indexWriter != null && pageIndexWriter != null) {
+      addPageIndex(pageIndexWriter);
+    }
+  }
+
+  public void close() throws IOException {
+    if (indexWriter != null) {
+      indexWriter.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 994472d..ca8ba91 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -59,6 +59,15 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
         | STORED BY 'carbondata'
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
+
+    sql("DROP TABLE IF EXISTS datamap_test4")
+
+    sql(
+      """
+        | CREATE TABLE datamap_test4(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 
'autorefreshdatamap' = 'false')
+      """.stripMargin)
   }
 
   test("validate TEXT_COLUMNS DataMap property") {
@@ -128,6 +137,31 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
     sql("drop datamap dm on table datamap_test")
   }
 
+  test("test lucene refresh data map") {
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 
OPTIONS('header'='false')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 
OPTIONS('header'='false')")
+
+    sql(
+      s"""
+         | CREATE DATAMAP dm4 ON TABLE datamap_test4
+         | USING 'lucene'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 
OPTIONS('header'='false')")
+
+    sql("refresh datamap dm4 ON TABLE datamap_test4")
+
+    checkAnswer(sql("SELECT * FROM datamap_test4 WHERE 
TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test4 where name='n10'"))
+    checkAnswer(sql("SELECT * FROM datamap_test4 WHERE 
TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test4 WHERE 
city='c020'"))
+
+    sql("drop datamap dm4 on table datamap_test4")
+
+  }
+
+
   test("test lucene fine grain data map drop") {
     sql("DROP TABLE IF EXISTS datamap_test1")
     sql(
@@ -204,6 +238,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamap_test1")
     sql("DROP TABLE IF EXISTS datamap_test2")
     sql("DROP TABLE IF EXISTS datamap_test3")
+    sql("DROP TABLE IF EXISTS datamap_test4")
     sql("use default")
     sql("drop database if exists lucene cascade")
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 450fed0..bd28bed 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -139,3 +139,11 @@ trait RestructureResult[K, V] extends Serializable {
 class RestructureResultImpl extends RestructureResult[Int, Boolean] {
   override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
 }
+
+trait RefreshResult[K, V] extends Serializable {
+  def getKey(key: String, value: Boolean): (K, V)
+}
+
+class RefreshResultImpl extends RefreshResult[String, Boolean] {
+  override def getKey(key: String, value: Boolean): (String, Boolean) = (key, 
value)
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index aca2d3c..2372539 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -50,6 +50,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-lucene</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
new file mode 100644
index 0000000..2bc3eaf
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{CarbonInputMetrics, Partition, SparkContext, 
TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.TaskMetricsMap
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, 
CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl}
+import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, 
CarbonSparkPartition}
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+
+object LuceneDataMapRefreshRDD {
+
+  def refreshDataMap(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      schema: DataMapSchema
+  ): Unit = {
+    val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
+    val validAndInvalidSegments = 
segmentStatusManager.getValidAndInvalidSegments()
+    val validSegments = validAndInvalidSegments.getValidSegments()
+    val indexedCarbonColumns =
+      LuceneDataMapFactoryBase.validateAndGetIndexedColumns(schema, 
carbonTable)
+
+    // loop all segments to rebuild DataMap
+    val tableInfo = carbonTable.getTableInfo()
+    validSegments.asScala.foreach { segment =>
+      val dataMapStorePath = LuceneDataMapWriter.genDataMapStorePath(
+        tableIdentifier.getTablePath(),
+        segment.getSegmentNo(),
+        schema.getDataMapName)
+      // if lucene datamap folder is exists, not require to build lucene 
datamap again
+      refreshOneSegment(sparkSession, tableInfo, dataMapStorePath, 
schema.getDataMapName,
+        indexedCarbonColumns, segment.getSegmentNo());
+    }
+  }
+
+  def refreshOneSegment(
+      sparkSession: SparkSession,
+      tableInfo: TableInfo,
+      dataMapStorePath: String,
+      dataMapName: String,
+      indexColumns: java.util.List[String],
+      segmentId: String): Unit = {
+
+    if (!FileFactory.isFileExist(dataMapStorePath)) {
+      if (FileFactory.mkdirs(dataMapStorePath, 
FileFactory.getFileType(dataMapStorePath))) {
+        try {
+          val status = new LuceneDataMapRefreshRDD[String, Boolean](
+            sparkSession.sparkContext,
+            new RefreshResultImpl(),
+            tableInfo,
+            dataMapStorePath,
+            dataMapName,
+            indexColumns.asScala.toArray,
+            segmentId
+          ).collect()
+
+          status.find(_._2 == false).foreach { task =>
+            throw new Exception(
+              s"Task Failed to refresh datamap $dataMapName on 
segment_$segmentId")
+          }
+        } catch {
+          case ex =>
+            // process failure
+            
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
+            throw new Exception(
+              s"Failed to refresh datamap $dataMapName on segment_$segmentId", 
ex)
+        }
+      }
+    }
+  }
+
+}
+
+class OriginalReadSupport(dataTypes: Array[DataType]) extends 
CarbonReadSupport[Array[Object]] {
+  override def initialize(carbonColumns: Array[CarbonColumn],
+      carbonTable: CarbonTable): Unit = {
+  }
+
+  override def readRow(data: Array[Object]): Array[Object] = {
+    for (i <- 0 until dataTypes.length) {
+      if (dataTypes(i) == DataTypes.STRING) {
+        data(i) = data(i).toString
+      }
+    }
+    data
+  }
+
+  override def close(): Unit = {
+  }
+}
+
+class LuceneDataMapRefreshRDD[K, V](
+    sc: SparkContext,
+    result: RefreshResult[K, V],
+    @transient tableInfo: TableInfo,
+    dataMapStorePath: String,
+    dataMapName: String,
+    indexColumns: Array[String],
+    segmentId: String
+) extends CarbonRDDWithTableInfo[(K, V)](sc, Nil, tableInfo.serialize()) {
+
+  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() 
+ "")
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new util.Date())
+  }
+
+  override def internalCompute(split: Partition, context: TaskContext): 
Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    var status = false
+    val inputMetrics = new CarbonInputMetrics
+    TaskMetricsMap.getInstance().registerThreadCallback()
+    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    inputMetrics.initBytesReadCallback(context, inputSplit)
+
+    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
+    val attemptContext = new TaskAttemptContextImpl(new Configuration(), 
attemptId)
+    val format = createInputFormat(attemptContext)
+
+    val taskName = 
CarbonTablePath.getUniqueTaskName(inputSplit.getAllSplits.get(0).getBlockPath)
+
+    val tableInfo = getTableInfo
+    val identifier = tableInfo.getOrCreateAbsoluteTableIdentifier()
+
+    val columns = tableInfo.getFactTable.getListOfColumns.asScala
+    val dataTypes = indexColumns.map { columnName =>
+      columns.find(_.getColumnName.equals(columnName)).get.getDataType
+    }
+
+    val indexPath = 
LuceneDataMapWriter.genDataMapStorePathOnTaskId(identifier.getTablePath,
+      segmentId, dataMapName, taskName)
+
+    val model = format.createQueryModel(inputSplit, attemptContext)
+    // one query id per table
+    model.setQueryId(queryId)
+    model.setVectorReader(false)
+    model.setForcedDetailRawQuery(false)
+    model.setRequiredRowId(true)
+    var reader: CarbonRecordReader[Array[Object]] = null
+    var indexBuilder: LuceneIndexRefreshBuilder = null
+    try {
+      reader = new CarbonRecordReader(model, new 
OriginalReadSupport(dataTypes), inputMetrics)
+      reader.initialize(inputSplit, attemptContext)
+
+      indexBuilder = new LuceneIndexRefreshBuilder(indexPath, indexColumns, 
dataTypes)
+      indexBuilder.initialize()
+
+      while (reader.nextKeyValue()) {
+        indexBuilder.addDocument(reader.getCurrentValue)
+      }
+
+      indexBuilder.finish()
+
+      status = true
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close()
+        } catch {
+          case ex =>
+            LOGGER.error(ex, "Failed to close reader")
+        }
+      }
+
+      if (indexBuilder != null) {
+        try {
+          indexBuilder.close()
+        } catch {
+          case ex =>
+            LOGGER.error(ex, "Failed to close index writer")
+        }
+      }
+    }
+
+    new Iterator[(K, V)] {
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(split.index.toString, status)
+      }
+    }
+  }
+
+
+  private def createInputFormat(
+      attemptContext: TaskAttemptContextImpl) = {
+    val format = new CarbonTableInputFormat[Object]
+    val tableInfo1 = getTableInfo
+    val conf = attemptContext.getConfiguration
+    CarbonInputFormat.setTableInfo(conf, tableInfo1)
+    CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
+    CarbonInputFormat.setDataTypeConverter(conf, 
classOf[SparkDataTypeConverterImpl])
+
+    val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier()
+    CarbonInputFormat.setTablePath(
+      conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+
+    CarbonInputFormat.setSegmentsToAccess(
+      conf,
+      Segment.toSegmentList(Array(segmentId), null))
+
+    CarbonInputFormat.setColumnProjection(
+      conf,
+      new CarbonProjection(indexColumns))
+    format
+  }
+
+  override protected def getPartitions = {
+    val conf = new Configuration()
+    val jobConf = new JobConf(conf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job = Job.getInstance(jobConf)
+    job.getConfiguration.set("query.id", queryId)
+
+    val format = new CarbonTableInputFormat[Object]
+
+    CarbonInputFormat.setSegmentsToAccess(
+      job.getConfiguration,
+      Segment.toSegmentList(Array(segmentId), null))
+
+    CarbonInputFormat.setTableInfo(
+      job.getConfiguration,
+      tableInfo)
+    CarbonInputFormat.setTablePath(
+      job.getConfiguration,
+      tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath)
+    CarbonInputFormat.setDatabaseName(
+      job.getConfiguration,
+      tableInfo.getDatabaseName)
+    CarbonInputFormat.setTableName(
+      job.getConfiguration,
+      tableInfo.getFactTable.getTableName)
+
+    format
+      .getSplits(job)
+      .asScala
+      .map(_.asInstanceOf[CarbonInputSplit])
+      .groupBy(_.taskId)
+      .map { group =>
+        new CarbonMultiBlockSplit(
+          group._2.asJava,
+          group._2.flatMap(_.getLocations).toArray)
+      }
+      .zipWithIndex
+      .map { split =>
+        new CarbonSparkPartition(id, split._2, split._1)
+      }
+      .toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 03f165c..050f154 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -118,7 +118,9 @@ case class CarbonCreateDataMapCommand(
     if (dataMapProvider != null) {
       dataMapProvider.initData(mainTable)
       if (mainTable != null && mainTable.isAutoRefreshDataMap) {
-        dataMapProvider.rebuild(mainTable, dataMapSchema)
+        if 
(!DataMapClassProvider.LUCENE.getShortName.equals(dataMapSchema.getProviderName))
 {
+          dataMapProvider.rebuild(mainTable, dataMapSchema)
+        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21c5fb1d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
index 1b02df0..1c39e85 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
@@ -23,7 +23,9 @@ import org.apache.spark.sql.execution.command.DataCommand
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.datamap.lucene.LuceneDataMapRefreshRDD
 
 /**
  * Refresh the datamaps through sync with main table data. After sync with 
parent table's it enables
@@ -35,14 +37,22 @@ case class CarbonDataMapRefreshCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val schema = 
DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
-    val provider = DataMapManager.get().getDataMapProvider(schema, 
sparkSession)
     val table = tableIdentifier match {
       case Some(identifier) =>
         CarbonEnv.getCarbonTable(identifier)(sparkSession)
-      case _ => null
+      case _ =>
+        CarbonEnv.getCarbonTable(
+          Option(schema.getRelationIdentifier.getDatabaseName),
+          schema.getRelationIdentifier.getTableName
+        )(sparkSession)
     }
     // Sync the datamap with parent table
-    provider.rebuild(table, schema)
+    if 
(DataMapClassProvider.LUCENE.getShortName.endsWith(schema.getProviderName)) {
+      LuceneDataMapRefreshRDD.refreshDataMap(sparkSession, table, schema)
+    } else {
+      val provider = DataMapManager.get().getDataMapProvider(schema, 
sparkSession)
+      provider.rebuild(table, schema)
+    }
     // After sync success enable the datamap.
     DataMapStatusManager.enableDataMap(dataMapName)
     Seq.empty

Reply via email to