Repository: carbondata
Updated Branches:
  refs/heads/master aec47e06f -> 81038f55e


[CARBONDATA-2727][BloomDataMap] Support create bloom datamap on newly added 
column

Add a result collector with rowId infomation for datamap rebuild if table 
schema is changed;
Use keygenerator to retrieve surrogate value of dictIndexColumn from query 
result;

This closes #2490


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/81038f55
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/81038f55
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/81038f55

Branch: refs/heads/master
Commit: 81038f55ef9a582f82305378988f603ded76e524
Parents: aec47e0
Author: Manhua <kevin...@qq.com>
Authored: Wed Jul 11 19:39:31 2018 +0800
Committer: xuchuanyin <xuchuan...@hust.edu.cn>
Committed: Tue Jul 17 23:31:43 2018 +0800

----------------------------------------------------------------------
 .../scan/collector/ResultCollectorFactory.java  |  31 ++---
 ...RowIdRestructureBasedRawResultCollector.java | 138 +++++++++++++++++++
 .../bloom/AbstractBloomDataMapWriter.java       |  72 +---------
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   2 +-
 .../datamap/bloom/BloomDataMapBuilder.java      |   8 ++
 .../datamap/bloom/BloomDataMapWriter.java       |  72 ++++++++++
 .../datamap/IndexDataMapRebuildRDD.scala        | 131 +++++++++++-------
 .../bloom/BloomCoarseGrainDataMapSuite.scala    |  96 +++++++++++++
 8 files changed, 413 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
index ea4afd1..e0a0b90 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
@@ -18,15 +18,7 @@ package org.apache.carbondata.core.scan.collector;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import 
org.apache.carbondata.core.scan.collector.impl.AbstractScannedResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector;
-import 
org.apache.carbondata.core.scan.collector.impl.RowIdRawBasedResultCollector;
+import org.apache.carbondata.core.scan.collector.impl.*;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 
 /**
@@ -51,14 +43,21 @@ public class ResultCollectorFactory {
     AbstractScannedResultCollector scannerResultAggregator = null;
     if (blockExecutionInfo.isRawRecordDetailQuery()) {
       if (blockExecutionInfo.isRestructuredBlock()) {
-        LOGGER.info("Restructure based raw collector is used to scan and 
collect the data");
-        scannerResultAggregator = new 
RestructureBasedRawResultCollector(blockExecutionInfo);
-      } else if (blockExecutionInfo.isRequiredRowId()) {
-        LOGGER.info("RowId based raw collector is used to scan and collect the 
data");
-        scannerResultAggregator = new 
RowIdRawBasedResultCollector(blockExecutionInfo);
+        if (blockExecutionInfo.isRequiredRowId()) {
+          LOGGER.info("RowId Restructure based raw ollector is used to scan 
and collect the data");
+          scannerResultAggregator = new 
RowIdRestructureBasedRawResultCollector(blockExecutionInfo);
+        } else {
+          LOGGER.info("Restructure based raw collector is used to scan and 
collect the data");
+          scannerResultAggregator = new 
RestructureBasedRawResultCollector(blockExecutionInfo);
+        }
       } else {
-        LOGGER.info("Row based raw collector is used to scan and collect the 
data");
-        scannerResultAggregator = new 
RawBasedResultCollector(blockExecutionInfo);
+        if (blockExecutionInfo.isRequiredRowId()) {
+          LOGGER.info("RowId based raw collector is used to scan and collect 
the data");
+          scannerResultAggregator = new 
RowIdRawBasedResultCollector(blockExecutionInfo);
+        } else {
+          LOGGER.info("Row based raw collector is used to scan and collect the 
data");
+          scannerResultAggregator = new 
RawBasedResultCollector(blockExecutionInfo);
+        }
       }
     } else if (blockExecutionInfo.isVectorBatchCollector()) {
       if (blockExecutionInfo.isRestructuredBlock()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java
new file mode 100644
index 0000000..28e778f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.collector.impl;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ * This class returns all the dimensions in a ByteArrayWrapper and append
+ * blockletNo/PageId/RowId at end of the row.
+ */
+@InterfaceAudience.Internal
+public class RowIdRestructureBasedRawResultCollector extends 
RestructureBasedRawResultCollector {
+
+  public RowIdRestructureBasedRawResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
+    super(blockExecutionInfos);
+  }
+
+  @Override
+  protected void scanAndFillData(BlockletScannedResult scannedResult, int 
batchSize,
+                               List<Object[]> listBasedResult, 
ProjectionMeasure[] queryMeasures) {
+    int numberOfPages = scannedResult.numberOfpages();
+    // loop will exit once the batchSize data has been read or the pages have 
been exhausted
+    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+      if (currentPageRowCount == 0) {
+        scannedResult.incrementPageCounter();
+        continue;
+      }
+      int rowCounter = scannedResult.getRowCounter();
+      // getRowCounter holds total number rows processed. Calculate the
+      // Left over space through getRowCounter only.
+      int availableRows = currentPageRowCount - rowCounter;
+      // rows available in current page that can be processed from current page
+      int availableBatchRowCount = Math.min(batchSize, availableRows);
+      // this condition will be true if no data left in the current 
block/blocklet to be scanned
+      if (availableBatchRowCount < 1) {
+        break;
+      }
+      if (batchSize > availableRows) {
+        batchSize = batchSize - availableRows;
+      } else {
+        // this is done because in IUD cases actuals rows fetch can be less 
than batch size as
+        // some of the rows could have deleted. So in those cases batchSize 
need to be
+        // re initialized with left over value
+        batchSize = 0;
+      }
+      // for every iteration of available rows filling newly created list of 
Object[] and add it to
+      // the final list so there is no mismatch in the counter while filling 
dimension and
+      // measure data
+      List<Object[]> collectedData = new ArrayList<>(availableBatchRowCount);
+      // fill dimension data
+      fillDimensionData(scannedResult, collectedData, queryMeasures, 
availableBatchRowCount);
+      fillMeasureData(scannedResult, collectedData);
+      // increment the number of rows scanned in scanned result statistics
+      // incrementScannedResultRowCounter(scannedResult, 
availableBatchRowCount);
+      // assign the left over rows to batch size if the number of rows fetched 
are lesser
+      // than batchSize
+      if (collectedData.size() < availableBatchRowCount) {
+        batchSize += availableBatchRowCount - listBasedResult.size();
+      }
+      // add the collected data to the final list
+      listBasedResult.addAll(collectedData);
+    }
+  }
+
+  private void fillDimensionData(BlockletScannedResult scannedResult,
+               List<Object[]> listBasedResult, ProjectionMeasure[] 
queryMeasures, int batchSize) {
+    long startTime = System.currentTimeMillis();
+    List<byte[]> dictionaryKeyArrayBatch = 
scannedResult.getDictionaryKeyArrayBatch(batchSize);
+    List<byte[][]> noDictionaryKeyArrayBatch =
+        scannedResult.getNoDictionaryKeyArrayBatch(batchSize);
+    List<byte[][]> complexTypeKeyArrayBatch = 
scannedResult.getComplexTypeKeyArrayBatch(batchSize);
+    // it will same for one blocklet so can be computed only once
+    byte[] implicitColumnByteArray = scannedResult.getBlockletId()
+        .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    // Note: size check in for loop is for dictionaryKeyArrayBatch as this 
size can be lesser than
+    // batch size in case of IUD scenarios
+    for (int i = 0; i < dictionaryKeyArrayBatch.size(); i++) {
+      // 1 for ByteArrayWrapper object which will contain dictionary and no 
dictionary data
+      // 3 for blockletId, pageId, rowId
+      Object[] row = new Object[1 + queryMeasures.length + 3];
+      scannedResult.incrementCounter();
+      row[1 + queryMeasures.length] = scannedResult.getBlockletNumber();
+      row[1 + queryMeasures.length + 1] = 
scannedResult.getCurrentPageCounter();
+      ByteArrayWrapper wrapper = new ByteArrayWrapper();
+      wrapper.setDictionaryKey(dictionaryKeyArrayBatch.get(i));
+      wrapper.setNoDictionaryKeys(noDictionaryKeyArrayBatch.get(i));
+      wrapper.setComplexTypesKeys(complexTypeKeyArrayBatch.get(i));
+      wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
+      row[0] = wrapper;
+      row[1 + queryMeasures.length + 2] = scannedResult.getCurrentRowId();
+      listBasedResult.add(row);
+    }
+    QueryStatistic keyColumnFillingTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME);
+    
keyColumnFillingTime.addCountStatistic(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME,
+        keyColumnFillingTime.getCount() + (System.currentTimeMillis() - 
startTime));
+  }
+
+  private void fillMeasureData(BlockletScannedResult scannedResult,
+                               List<Object[]> listBasedResult) {
+    long startTime = System.currentTimeMillis();
+    // if list is not empty after filling the dimension data then only fill 
the measure data
+    if (!listBasedResult.isEmpty()) {
+      fillMeasureDataBatch(listBasedResult, 1, scannedResult);
+    }
+    QueryStatistic measureFillingTime = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.MEASURE_FILLING_TIME);
+    
measureFillingTime.addCountStatistic(QueryStatisticsConstants.MEASURE_FILLING_TIME,
+        measureFillingTime.getCount() + (System.currentTimeMillis() - 
startTime));
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
index fcecc01..176be6e 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
@@ -20,9 +20,7 @@ package org.apache.carbondata.datamap.bloom;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
@@ -33,16 +31,10 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.Predicate;
 import org.apache.hadoop.util.bloom.CarbonBloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.hadoop.util.hash.Hash;
@@ -58,13 +50,6 @@ public abstract class AbstractBloomDataMapWriter extends 
DataMapWriter {
   private List<String> currentDMFiles;
   private List<DataOutputStream> currentDataOutStreams;
   protected List<CarbonBloomFilter> indexBloomFilters;
-  private KeyGenerator keyGenerator;
-  private ColumnarSplitter columnarSplitter;
-  // for the dict/sort/date column, they are encoded in MDK,
-  // this maps the index column name to the index in MDK
-  private Map<String, Integer> indexCol2MdkIdx;
-  // this gives the reverse map to indexCol2MdkIdx
-  private Map<Integer, String> mdkIdx2IndexCol;
 
   AbstractBloomDataMapWriter(String tablePath, String dataMapName, 
List<CarbonColumn> indexColumns,
       Segment segment, String shardName, SegmentProperties segmentProperties,
@@ -79,27 +64,6 @@ public abstract class AbstractBloomDataMapWriter extends 
DataMapWriter {
     indexBloomFilters = new ArrayList<>(indexColumns.size());
     initDataMapFile();
     resetBloomFilters();
-
-    keyGenerator = segmentProperties.getDimensionKeyGenerator();
-    columnarSplitter = segmentProperties.getFixedLengthKeySplitter();
-    this.indexCol2MdkIdx = new HashMap<>();
-    this.mdkIdx2IndexCol = new HashMap<>();
-    int idx = 0;
-    for (final CarbonDimension dimension : segmentProperties.getDimensions()) {
-      if (!dimension.isGlobalDictionaryEncoding() && 
!dimension.isDirectDictionaryEncoding()) {
-        continue;
-      }
-      boolean isExistInIndex = CollectionUtils.exists(indexColumns, new 
Predicate() {
-        @Override public boolean evaluate(Object object) {
-          return ((CarbonColumn) 
object).getColName().equalsIgnoreCase(dimension.getColName());
-        }
-      });
-      if (isExistInIndex) {
-        this.indexCol2MdkIdx.put(dimension.getColName(), idx);
-        this.mdkIdx2IndexCol.put(idx, dimension.getColName());
-      }
-      idx++;
-    }
   }
 
   @Override
@@ -172,7 +136,7 @@ public abstract class AbstractBloomDataMapWriter extends 
DataMapWriter {
     } else {
       if (indexColumns.get(indexColIdx).hasEncoding(Encoding.DICTIONARY)
           || 
indexColumns.get(indexColIdx).hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        indexValue = convertDictionaryValue(indexColIdx, (byte[]) value);
+        indexValue = convertDictionaryValue(indexColIdx, value);
       } else {
         indexValue = convertNonDictionaryValue(indexColIdx, (byte[]) value);
       }
@@ -183,38 +147,7 @@ public abstract class AbstractBloomDataMapWriter extends 
DataMapWriter {
     indexBloomFilters.get(indexColIdx).add(new Key(indexValue));
   }
 
-  protected byte[] convertDictionaryValue(int indexColIdx, byte[] value) {
-    byte[] fakeMdkBytes;
-    // this means that we need to pad some fake bytes
-    // to get the whole MDK in corresponding position
-    if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) {
-      int totalSize = 0;
-      for (int size : columnarSplitter.getBlockKeySize()) {
-        totalSize += size;
-      }
-      fakeMdkBytes = new byte[totalSize];
-
-      // put this bytes to corresponding position
-      int thisKeyIdx = 
indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName());
-      int destPos = 0;
-      for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; 
keyIdx++) {
-        if (thisKeyIdx == keyIdx) {
-          System.arraycopy(value, 0,
-              fakeMdkBytes, destPos, 
columnarSplitter.getBlockKeySize()[thisKeyIdx]);
-          break;
-        }
-        destPos += columnarSplitter.getBlockKeySize()[keyIdx];
-      }
-    } else {
-      fakeMdkBytes = value;
-    }
-    // for dict columns including dictionary and date columns
-    // decode value to get the surrogate key
-    int surrogateKey = (int) keyGenerator.getKey(fakeMdkBytes,
-        indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName()));
-    // store the dictionary key in bloom
-    return CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey);
-  }
+  protected abstract byte[] convertDictionaryValue(int indexColIdx, Object 
value);
 
   protected abstract byte[] convertNonDictionaryValue(int indexColIdx, byte[] 
value);
 
@@ -276,5 +209,4 @@ public abstract class AbstractBloomDataMapWriter extends 
DataMapWriter {
           currentDataOutStreams.get(indexColId));
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index d9646d5..35ebd20 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -385,7 +385,7 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
       case ALTER_DROP:
         return true;
       case ALTER_ADD_COLUMN:
-        return true;
+        return false;
       case ALTER_CHANGE_DATATYPE:
         return true;
       case STREAMING:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
index 7ba8c42..29e3060 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -24,7 +24,9 @@ import 
org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Implementation for BloomFilter DataMap to rebuild the datamap for main 
table with existing data
@@ -75,6 +77,12 @@ public class BloomDataMapBuilder extends 
AbstractBloomDataMapWriter implements D
   }
 
   @Override
+  protected byte[] convertDictionaryValue(int indexColIdx, Object value) {
+    // input value from IndexDataMapRebuildRDD is already decoded as surrogate 
key
+    return CarbonUtil.getValueAsBytes(DataTypes.INT, value);
+  }
+
+  @Override
   public void close() throws IOException {
     releaseResouce();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index 2769773..1d01e66 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -17,13 +17,22 @@
 package org.apache.carbondata.datamap.bloom;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Predicate;
 
 /**
  * BloomDataMap is constructed in CG level (blocklet level).
@@ -34,6 +43,13 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
  */
 @InterfaceAudience.Internal
 public class BloomDataMapWriter extends AbstractBloomDataMapWriter {
+  private KeyGenerator keyGenerator;
+  private ColumnarSplitter columnarSplitter;
+  // for the dict/sort/date column, they are encoded in MDK,
+  // this maps the index column name to the index in MDK
+  private Map<String, Integer> indexCol2MdkIdx;
+  // this gives the reverse map to indexCol2MdkIdx
+  private Map<Integer, String> mdkIdx2IndexCol;
 
   BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> 
indexColumns,
       Segment segment, String shardName, SegmentProperties segmentProperties,
@@ -41,6 +57,27 @@ public class BloomDataMapWriter extends 
AbstractBloomDataMapWriter {
       throws IOException {
     super(tablePath, dataMapName, indexColumns, segment, shardName, 
segmentProperties,
         bloomFilterSize, bloomFilterFpp, compressBloom);
+
+    keyGenerator = segmentProperties.getDimensionKeyGenerator();
+    columnarSplitter = segmentProperties.getFixedLengthKeySplitter();
+    this.indexCol2MdkIdx = new HashMap<>();
+    this.mdkIdx2IndexCol = new HashMap<>();
+    int idx = 0;
+    for (final CarbonDimension dimension : segmentProperties.getDimensions()) {
+      if (!dimension.isGlobalDictionaryEncoding() && 
!dimension.isDirectDictionaryEncoding()) {
+        continue;
+      }
+      boolean isExistInIndex = CollectionUtils.exists(indexColumns, new 
Predicate() {
+        @Override public boolean evaluate(Object object) {
+          return ((CarbonColumn) 
object).getColName().equalsIgnoreCase(dimension.getColName());
+        }
+      });
+      if (isExistInIndex) {
+        this.indexCol2MdkIdx.put(dimension.getColName(), idx);
+        this.mdkIdx2IndexCol.put(idx, dimension.getColName());
+      }
+      idx++;
+    }
   }
 
   protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
@@ -50,4 +87,39 @@ public class BloomDataMapWriter extends 
AbstractBloomDataMapWriter {
       return DataConvertUtil.getRawBytes(value);
     }
   }
+
+  @Override
+  protected byte[] convertDictionaryValue(int indexColIdx, Object value) {
+    // input value from onPageAdded in load process is byte[]
+    byte[] fakeMdkBytes;
+    // this means that we need to pad some fake bytes
+    // to get the whole MDK in corresponding position
+    if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) {
+      int totalSize = 0;
+      for (int size : columnarSplitter.getBlockKeySize()) {
+        totalSize += size;
+      }
+      fakeMdkBytes = new byte[totalSize];
+
+      // put this bytes to corresponding position
+      int thisKeyIdx = 
indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName());
+      int destPos = 0;
+      for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; 
keyIdx++) {
+        if (thisKeyIdx == keyIdx) {
+          System.arraycopy(value, 0,
+              fakeMdkBytes, destPos, 
columnarSplitter.getBlockKeySize()[thisKeyIdx]);
+          break;
+        }
+        destPos += columnarSplitter.getBlockKeySize()[keyIdx];
+      }
+    } else {
+      fakeMdkBytes = (byte[])value;
+    }
+    // for dict columns including dictionary and date columns
+    // decode value to get the surrogate key
+    int surrogateKey = (int) keyGenerator.getKey(fakeMdkBytes,
+        indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName()));
+    // store the dictionary key in bloom
+    return CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 85466f1..70e5cba 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -23,7 +23,9 @@ import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
+import org.apache.commons.lang3.ArrayUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
@@ -33,19 +35,22 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapRegistry, 
DataMapStoreManager, Segment}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher
-import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter
+import org.apache.carbondata.core.keygenerator.KeyGenerator
+import 
org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.TaskMetricsMap
+import org.apache.carbondata.core.util.{CarbonUtil, TaskMetricsMap}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, 
CarbonProjection, CarbonRecordReader}
@@ -55,6 +60,7 @@ import org.apache.carbondata.spark.{RefreshResult, 
RefreshResultImpl}
 import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, 
CarbonSparkPartition}
 import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
+
 /**
  * Helper object to rebuild the index DataMap
  */
@@ -154,89 +160,114 @@ class OriginalReadSupport(dataTypes: Array[DataType]) 
extends CarbonReadSupport[
  */
 class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: 
Array[CarbonColumn])
   extends CarbonReadSupport[Array[Object]] {
-  var columnarSplitter: ColumnarSplitter = _
+  var dimensionKeyGenerator: KeyGenerator = _
+  // for the dictionary dimensions
+  var indexCol2IdxInDictArray: Map[String, Int] = Map()
   // for the non dictionary dimensions
   var indexCol2IdxInNoDictArray: Map[String, Int] = Map()
   // for the measures
   var indexCol2IdxInMeasureArray: Map[String, Int] = Map()
-  // for the dictionary/date dimensions
-  var dictIndexCol2MdkIndex: Map[String, Int] = Map()
-  var mdkIndex2DictIndexCol: Map[Int, String] = Map()
-  var existDim = false
+
+  /**
+   * rebuild process get data from query, if some columns added to table but 
not in this segment
+   * it will be filled with default value and generate new key for dict 
dimension.
+   * Here we use same way as `RowIdRestructureBasedRawResultCollector` to 
prepare
+   * key generator to get surrogate value of dict column result.
+   * So we do not need to make a fake mdk to split when adding row to datamap
+   */
+  def prepareKeyGenForDictIndexColumns(carbonTable: CarbonTable,
+                                       dictIndexColumns: 
ListBuffer[CarbonColumn]): Unit = {
+
+    val columnCardinality = new 
util.ArrayList[Integer](dictIndexColumns.length)
+    val columnPartitioner = new 
util.ArrayList[Integer](dictIndexColumns.length)
+
+    dictIndexColumns.foreach { col =>
+      val dim = carbonTable.getDimensionByName(carbonTable.getTableName, 
col.getColName)
+      val currentBlockDimension = 
segmentProperties.getDimensionFromCurrentBlock(dim)
+      if (null != currentBlockDimension) {
+        columnCardinality.add(segmentProperties.getDimColumnsCardinality.apply(
+          currentBlockDimension.getKeyOrdinal))
+        columnPartitioner.add(segmentProperties.getDimensionPartitions.apply(
+          currentBlockDimension.getKeyOrdinal
+        ))
+      } else {
+        columnPartitioner.add(1)
+        if (col.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          columnCardinality.add(Integer.MAX_VALUE)
+        } else {
+          val defaultValue = col.getDefaultValue
+          if (null != col.getDefaultValue) {
+            
columnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1)
+          } else {
+            
columnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY)
+          }
+        }
+      }
+    }
+
+    if (!columnCardinality.isEmpty) {
+      val latestColumnCardinality = 
ArrayUtils.toPrimitive(columnCardinality.toArray(
+        new Array[Integer](columnCardinality.size)))
+      val latestColumnPartitioner = 
ArrayUtils.toPrimitive(columnPartitioner.toArray(
+        new Array[Integer](columnPartitioner.size)))
+      val dimensionBitLength = CarbonUtil.getDimensionBitLength(
+        latestColumnCardinality, latestColumnPartitioner)
+      this.dimensionKeyGenerator = new 
MultiDimKeyVarLengthGenerator(dimensionBitLength)
+    }
+  }
 
   override def initialize(carbonColumns: Array[CarbonColumn],
       carbonTable: CarbonTable): Unit = {
-    this.columnarSplitter = segmentProperties.getFixedLengthKeySplitter
 
+    val dictIndexColumns = new ListBuffer[CarbonColumn]()
+
+    // prepare index info to extract data from query result
     indexColumns.foreach { col =>
       if (col.isDimension) {
         val dim = carbonTable.getDimensionByName(carbonTable.getTableName, 
col.getColName)
         if (!dim.isGlobalDictionaryEncoding && 
!dim.isDirectDictionaryEncoding) {
           indexCol2IdxInNoDictArray =
             indexCol2IdxInNoDictArray + (col.getColName -> 
indexCol2IdxInNoDictArray.size)
+        } else {
+          dictIndexColumns.append(col)
+          indexCol2IdxInDictArray =
+            indexCol2IdxInDictArray + (col.getColName -> 
indexCol2IdxInDictArray.size)
         }
       } else {
         indexCol2IdxInMeasureArray =
           indexCol2IdxInMeasureArray + (col.getColName -> 
indexCol2IdxInMeasureArray.size)
       }
     }
-    dictIndexCol2MdkIndex = segmentProperties.getDimensions.asScala
-      .filter(col => col.isGlobalDictionaryEncoding || 
col.isDirectDictionaryEncoding)
-      .map(_.getColName)
-      .zipWithIndex
-      .filter(p => indexColumns.exists(c => 
c.getColName.equalsIgnoreCase(p._1)))
-      .toMap
-    mdkIndex2DictIndexCol = dictIndexCol2MdkIndex.map(p => (p._2, p._1))
-    existDim = indexCol2IdxInNoDictArray.nonEmpty || 
dictIndexCol2MdkIndex.nonEmpty
+
+    if (dictIndexColumns.size > 0) {
+      prepareKeyGenForDictIndexColumns(carbonTable, dictIndexColumns)
+    }
   }
 
   /**
    * input: all the dimensions are bundled in one ByteArrayWrapper in position 
0,
-   * then comes the measures one by one;
+   * then comes the measures one by one; last 3 elements are block/page/row id
    * output: all the dimensions and measures comes one after another
    */
   override def readRow(data: Array[Object]): Array[Object] = {
-    val dictArray = if (existDim) {
-      val dictKeys = data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey
-      // note that the index column may only contains a portion of all the 
dict columns, so we
-      // need to pad fake bytes to dict keys in order to reconstruct value 
later
-      if (columnarSplitter.getBlockKeySize.length > 
dictIndexCol2MdkIndex.size) {
-        val res = new Array[Byte](columnarSplitter.getBlockKeySize.sum)
-        var startPos = 0
-        var desPos = 0
-        columnarSplitter.getBlockKeySize.indices.foreach { idx =>
-          if (mdkIndex2DictIndexCol.contains(idx)) {
-            val size = columnarSplitter.getBlockKeySize.apply(idx)
-            System.arraycopy(dictKeys, startPos, res, desPos, size)
-            startPos += size
-          }
-          desPos += columnarSplitter.getBlockKeySize.apply(idx)
-        }
 
-        Option(res)
-      } else {
-        Option(dictKeys)
-      }
-    } else {
-      None
+    var surrogatKeys = new Array[Long](0)
+    if(null != dimensionKeyGenerator) {
+        surrogatKeys = dimensionKeyGenerator.getKeyArray(
+          data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey)
     }
 
-    val dictKeys = if (existDim) {
-      Option(columnarSplitter.splitKey(dictArray.get))
-    } else {
-      None
-    }
+    // fill return row from data
     val rtn = new Array[Object](indexColumns.length + 3)
-
     indexColumns.zipWithIndex.foreach { case (col, i) =>
-      rtn(i) = if (dictIndexCol2MdkIndex.contains(col.getColName)) {
-        dictKeys.get(dictIndexCol2MdkIndex.get(col.getColName).get)
+      rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) {
+        
surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer]
       } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) {
         data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
-          indexCol2IdxInNoDictArray.apply(col.getColName))
+          indexCol2IdxInNoDictArray(col.getColName))
       } else {
         // measures start from 1
-        data(1 + indexCol2IdxInMeasureArray.apply(col.getColName))
+        data(1 + indexCol2IdxInMeasureArray(col.getColName))
       }
     }
     rtn(indexColumns.length) = data(data.length - 3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index df5ee18..0b0c665 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -512,6 +512,102 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll with
       "BloomFilter datamap does not support complex datatype column"))
   }
 
+  test("test create bloom datamap on newly added column") {
+    val datamap1 = "datamap1"
+    val datamap2 = "datamap2"
+    val datamap3 = "datamap3"
+
+    // create a table with dict/noDict/measure column
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, 
age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, 
s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128',
+         | 'DICTIONARY_INCLUDE'='s1,s2')
+         |  """.stripMargin)
+
+    // load data into table (segment0)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+
+    // create simple datamap on segment0
+    sql(
+      s"""
+         | CREATE DATAMAP $datamap1 ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    // add some columns including dict/noDict/measure column
+    sql(
+      s"""
+         | ALTER TABLE $bloomDMSampleTable
+         | ADD COLUMNS(num1 INT, dictString STRING, noDictString STRING)
+         | TBLPROPERTIES('DEFAULT.VALUE.num1'='999', 
'DEFAULT.VALUE.dictString'='old',
+         | 'DICTIONARY_INCLUDE'='dictString'
+         | )
+         """.stripMargin)
+
+    // load data into table (segment1)
+    sql(
+      s"""
+         | INSERT INTO TABLE $bloomDMSampleTable VALUES
+         | 
(100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',0,'S01','S02'),
+         | 
(101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',4,'S11','S12'),
+         | 
(102,'name2','city2',12,'s12','s22','s32','s42','s52','s62','s72','s82',5,'S21','S22')
+           """.stripMargin)
+
+    // check data after columns added
+    var res = sql(
+      s"""
+         | SELECT name, city, num1, dictString, noDictString
+         | FROM $bloomDMSampleTable
+         | WHERE id = 101
+         | """.stripMargin)
+    checkExistence(res, true, "999", "null")
+
+    // create datamap on newly added column
+    sql(
+      s"""
+         | CREATE DATAMAP $datamap2 ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | 
DMProperties('INDEX_COLUMNS'='s1,dictString,s8,noDictString,age,num1',
+         | 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    // load data into table (segment2)
+    sql(
+      s"""
+         | INSERT INTO TABLE $bloomDMSampleTable VALUES
+         | 
(100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',1,'S01','S02'),
+         | 
(101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',2,'S11','S12'),
+         | 
(102,'name2','city1',12,'s12','s22','s32','s42','s52','s62','s72','s82',3,'S21','S22')
+           """.stripMargin)
+
+    var explainString = sql(
+      s"""
+         | explain SELECT id, name, num1, dictString
+         | FROM $bloomDMSampleTable
+         | WHERE num1 = 1
+           """.stripMargin).collect()
+    assert(explainString(0).getString(0).contains(
+      "- name: datamap2\n    - provider: bloomfilter\n    - skipped blocklets: 
1"))
+
+    explainString = sql(
+      s"""
+         | explain SELECT id, name, num1, dictString
+         | FROM $bloomDMSampleTable
+         | WHERE dictString = 'S21'
+           """.stripMargin).collect()
+    assert(explainString(0).getString(0).contains(
+      "- name: datamap2\n    - provider: bloomfilter\n    - skipped blocklets: 
0"))
+
+  }
+
+
   override protected def afterAll(): Unit = {
     deleteFile(bigFile)
     deleteFile(smallFile)

Reply via email to