[CARBONDATA-2459][DataMap] Add cache for bloom filter datamap

Loading bloom filter from bloomindex file is slow. Adding cache for this 
procedure will surely improve the query performance

This closes #2300


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d14c403f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d14c403f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d14c403f

Branch: refs/heads/spark-2.3
Commit: d14c403f6282ca8b574dae2fa5ab77caa5cf3c18
Parents: ffddba7
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Fri May 11 21:49:43 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sun May 13 02:05:30 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  13 ++
 .../datamap/bloom/BloomCoarseGrainDataMap.java  | 108 +++------
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   4 +
 .../datamap/bloom/BloomDataMapCache.java        | 232 +++++++++++++++++++
 .../datamap/bloom/BloomDataMapWriter.java       |   5 +-
 5 files changed, 283 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d14c403f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 56607b9..f3a821b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1729,6 +1729,19 @@ public final class CarbonCommonConstants {
   // Property to enable parallel datamap loading for a table
   public static final String CARBON_LOAD_DATAMAPS_PARALLEL = 
"carbon.load.datamaps.parallel.";
 
+  /**
+   * Cache size in MB for bloom filter datamap. It is an integer and should be 
greater than 0
+   * and it will be used during query.
+   */
+  @CarbonProperty
+  public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE =
+      "carbon.query.datamap.bloom.cache.size";
+
+  /**
+   * default value in size for cache size of bloom filter datamap.
+   */
+  public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL 
= "512";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d14c403f/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 725d5cd..09de25e 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -17,13 +17,10 @@
 
 package org.apache.carbondata.datamap.bloom;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -45,13 +42,8 @@ import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonUtil;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 
 /**
  * BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed 
column,
@@ -62,15 +54,16 @@ import org.apache.hadoop.fs.PathFilter;
 public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
+  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
   private Set<String> indexedColumn;
   private List<BloomDMModel> bloomIndexList;
-  private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
-  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
   private String shardName;
+  private BloomDataMapCache bloomDataMapCache;
+  private Path indexPath;
 
   @Override
   public void init(DataMapModel dataMapModel) throws IOException {
-    Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+    this.indexPath = FileFactory.getPath(dataMapModel.getFilePath());
     this.shardName = indexPath.getName();
     FileSystem fs = FileFactory.getFileSystem(indexPath);
     if (!fs.exists(indexPath)) {
@@ -81,52 +74,11 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
       throw new IOException(
           String.format("Path %s for Bloom index dataMap must be a directory", 
indexPath));
     }
-
-    FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() {
-      @Override public boolean accept(Path path) {
-        return path.getName().endsWith(BLOOM_INDEX_SUFFIX);
-      }
-    });
-    indexedColumn = new HashSet<String>();
-    bloomIndexList = new ArrayList<BloomDMModel>();
-    indexCol2BloomDMList = ArrayListMultimap.create();
-    for (int i = 0; i < indexFileStatus.length; i++) {
-      String indexfilename = indexFileStatus[i].getPath().getName();
-      String indexCol =
-          indexfilename.substring(0, indexfilename.length() - 
BLOOM_INDEX_SUFFIX.length());
-      indexedColumn.add(indexCol);
-      List<BloomDMModel> models = 
readBloomIndex(indexFileStatus[i].getPath().toString());
-      bloomIndexList.addAll(models);
-      indexCol2BloomDMList.put(indexCol, models);
-    }
-    LOGGER.info("find bloom index datamap for column: "
-        + StringUtils.join(indexedColumn, ", "));
+    this.bloomDataMapCache = BloomDataMapCache.getInstance();
   }
 
-  private List<BloomDMModel> readBloomIndex(String indexFile) throws 
IOException {
-    LOGGER.info("read bloom index from file: " + indexFile);
-    List<BloomDMModel> bloomDMModelList = new ArrayList<BloomDMModel>();
-    DataInputStream dataInStream = null;
-    ObjectInputStream objectInStream = null;
-    try {
-      dataInStream = FileFactory.getDataInputStream(indexFile, 
FileFactory.getFileType(indexFile));
-      objectInStream = new ObjectInputStream(dataInStream);
-      try {
-        BloomDMModel model = null;
-        while ((model = (BloomDMModel) objectInStream.readObject()) != null) {
-          LOGGER.info("read bloom index: " + model);
-          bloomDMModelList.add(model);
-        }
-      } catch (EOFException e) {
-        LOGGER.info("read " + bloomDMModelList.size() + " bloom indices from " 
+ indexFile);
-      }
-      return bloomDMModelList;
-    } catch (ClassNotFoundException e) {
-      LOGGER.error("Error occrus while reading bloom index");
-      throw new RuntimeException("Error occrus while reading bloom index", e);
-    } finally {
-      CarbonUtil.closeStreams(objectInStream, dataInStream);
-    }
+  public void setIndexedColumn(Set<String> indexedColumn) {
+    this.indexedColumn = indexedColumn;
   }
 
   @Override
@@ -139,26 +91,22 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
     }
 
     List<BloomQueryModel> bloomQueryModels = 
getQueryValue(filterExp.getFilterExpression());
-
     for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
-      LOGGER.info("prune blocklet for query: " + bloomQueryModel);
-      for (List<BloomDMModel> bloomDMModels : indexCol2BloomDMList.get(
-          bloomQueryModel.columnName)) {
-        for (BloomDMModel bloomDMModel : bloomDMModels) {
-          boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
-              convertValueToBytes(bloomQueryModel.dataType, 
bloomQueryModel.filterValue));
-          if (scanRequired) {
-            LOGGER.info(String.format(
-                "BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
-                String.valueOf(bloomDMModel.getBlockletNo())));
-            Blocklet blocklet =
-                new Blocklet(shardName, 
String.valueOf(bloomDMModel.getBlockletNo()));
-            hitBlocklets.add(blocklet);
-          } else {
-            LOGGER.info(String.format(
-                "BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
-                String.valueOf(bloomDMModel.getBlockletNo())));
-          }
+      LOGGER.debug("prune blocklet for query: " + bloomQueryModel);
+      BloomDataMapCache.CacheKey cacheKey = new BloomDataMapCache.CacheKey(
+          this.indexPath.toString(), bloomQueryModel.columnName);
+      List<BloomDMModel> bloomDMModels = 
this.bloomDataMapCache.getBloomDMModelByKey(cacheKey);
+      for (BloomDMModel bloomDMModel : bloomDMModels) {
+        boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
+            convertValueToBytes(bloomQueryModel.dataType, 
bloomQueryModel.filterValue));
+        if (scanRequired) {
+          LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> 
blocklet#%s",
+              String.valueOf(bloomDMModel.getBlockletNo())));
+          Blocklet blocklet = new Blocklet(shardName, 
String.valueOf(bloomDMModel.getBlockletNo()));
+          hitBlocklets.add(blocklet);
+        } else {
+          LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> 
blocklet#%s",
+              String.valueOf(bloomDMModel.getBlockletNo())));
         }
       }
     }
@@ -228,12 +176,20 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
     bloomIndexList = null;
   }
 
+  /**
+   * get bloom index file
+   * @param shardPath path for the shard
+   * @param colName index column name
+   */
+  public static String getBloomIndexFile(String shardPath, String colName) {
+    return 
shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX);
+  }
   static class BloomQueryModel {
     private String columnName;
     private DataType dataType;
     private Object filterValue;
 
-    public BloomQueryModel(String columnName, DataType dataType, Object 
filterValue) {
+    private BloomQueryModel(String columnName, DataType dataType, Object 
filterValue) {
       this.columnName = columnName;
       this.dataType = dataType;
       this.filterValue = filterValue;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d14c403f/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 95c21fa..581c3a6 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 
@@ -189,6 +190,7 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
       for (CarbonFile carbonFile : carbonFiles) {
         BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
         bloomDM.init(new DataMapModel(carbonFile.getAbsolutePath()));
+        bloomDM.setIndexedColumn(new 
HashSet<String>(dataMapMeta.getIndexedColumnNames()));
         dataMaps.add(bloomDM);
       }
     } catch (Exception e) {
@@ -204,6 +206,8 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
     BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new 
BloomCoarseGrainDataMap();
     String indexPath = ((BloomDataMapDistributable) 
distributable).getIndexPath();
     bloomCoarseGrainDataMap.init(new DataMapModel(indexPath));
+    bloomCoarseGrainDataMap.setIndexedColumn(
+        new HashSet<String>(dataMapMeta.getIndexedColumnNames()));
     coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
     return coarseGrainDataMaps;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d14c403f/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
new file mode 100644
index 0000000..fc23f33
--- /dev/null
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/**
+ * This class is used to add cache for bloomfilter datamap to accelerate query 
through it.
+ * The cache is implemented using guava cache and is a singleton which will be 
shared by all the
+ * bloomfilter datamaps.
+ * As for the cache, the key is a bloomindex file for a shard and the value is 
the bloomfilters
+ * for the blocklets in this shard.
+ * The size of cache can be configurable through CarbonProperties and the 
cache will be expired if
+ * no one access it in the past 2 hours.
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapCache implements Serializable {
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      BloomDataMapCache.class.getName());
+  private static final long serialVersionUID = 20160822L;
+  private static final int DEFAULT_CACHE_EXPIRED_HOURS = 2;
+  private LoadingCache<CacheKey, List<BloomDMModel>> bloomDMCache = null;
+
+  private BloomDataMapCache() {
+    RemovalListener<CacheKey, List<BloomDMModel>> listener =
+        new RemovalListener<CacheKey, List<BloomDMModel>>() {
+      @Override
+      public void onRemoval(RemovalNotification<CacheKey, List<BloomDMModel>> 
notification) {
+        LOGGER.info(
+            String.format("Remove bloom datamap entry %s from cache due to %s",
+                notification.getKey(), notification.getCause()));
+      }
+    };
+    CacheLoader<CacheKey, List<BloomDMModel>> cacheLoader =
+        new CacheLoader<CacheKey, List<BloomDMModel>>() {
+      @Override
+      public List<BloomDMModel> load(CacheKey key) throws Exception {
+        LOGGER.info(String.format("Load bloom datamap entry %s to cache", 
key));
+        return loadBloomDataMapModel(key);
+      }
+    };
+
+    int cacheSizeInBytes = validateAndGetCacheSize()
+        * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
+        * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
+    this.bloomDMCache = CacheBuilder.newBuilder()
+        .recordStats()
+        .maximumSize(cacheSizeInBytes)
+        .expireAfterAccess(DEFAULT_CACHE_EXPIRED_HOURS, TimeUnit.HOURS)
+        .removalListener(listener)
+        .build(cacheLoader);
+  }
+
+  private static class SingletonHolder {
+    private static final BloomDataMapCache INSTANCE = new BloomDataMapCache();
+  }
+
+  /**
+   * get instance
+   */
+  public static BloomDataMapCache getInstance() {
+    return SingletonHolder.INSTANCE;
+  }
+
+  /**
+   * for resolve from serialized
+   */
+  protected Object readResolve() {
+    return getInstance();
+  }
+
+  private int validateAndGetCacheSize() {
+    String cacheSizeStr = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE,
+        
CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL);
+    int cacheSize;
+    try {
+      cacheSize = Integer.parseInt(cacheSizeStr);
+      if (cacheSize <= 0) {
+        throw new NumberFormatException("Value should be greater than 0: " + 
cacheSize);
+      }
+    } catch (NumberFormatException ex) {
+      LOGGER.error(String.format(
+          "The value '%s' for '%s' is invalid, it must be an Integer that 
greater than 0."
+              + " Use default value '%s' instead.", cacheSizeStr,
+          CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE,
+          
CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL));
+      cacheSize = Integer.parseInt(
+          
CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL);
+    }
+    return cacheSize;
+  }
+
+  /**
+   * load datamap from bloomindex file
+   */
+  private List<BloomDMModel> loadBloomDataMapModel(CacheKey cacheKey) {
+    DataInputStream dataInStream = null;
+    ObjectInputStream objectInStream = null;
+    List<BloomDMModel> bloomDMModels = new ArrayList<BloomDMModel>();
+    try {
+      String indexFile = getIndexFileFromCacheKey(cacheKey);
+      dataInStream = FileFactory.getDataInputStream(indexFile, 
FileFactory.getFileType(indexFile));
+      objectInStream = new ObjectInputStream(dataInStream);
+      try {
+        BloomDMModel model = null;
+        while ((model = (BloomDMModel) objectInStream.readObject()) != null) {
+          bloomDMModels.add(model);
+        }
+      } catch (EOFException e) {
+        LOGGER.info(String.format("Read %d bloom indices from %s",
+            bloomDMModels.size(), indexFile));
+      }
+      this.bloomDMCache.put(cacheKey, bloomDMModels);
+      return bloomDMModels;
+    } catch (ClassNotFoundException | IOException e) {
+      LOGGER.error(e, "Error occurs while reading bloom index");
+      throw new RuntimeException("Error occurs while reading bloom index", e);
+    } finally {
+      clear();
+      CarbonUtil.closeStreams(objectInStream, dataInStream);
+    }
+  }
+
+  /**
+   * get bloom index file name from cachekey
+   */
+  private String getIndexFileFromCacheKey(CacheKey cacheKey) {
+    return BloomCoarseGrainDataMap.getBloomIndexFile(cacheKey.shardPath, 
cacheKey.indexColumn);
+  }
+
+  /**
+   * get bloom datamap from cache
+   */
+  public List<BloomDMModel> getBloomDMModelByKey(CacheKey cacheKey) {
+    return this.bloomDMCache.getUnchecked(cacheKey);
+  }
+
+  /**
+   * get cache status
+   */
+  private String getCacheStatus() {
+    StringBuilder sb = new StringBuilder();
+    CacheStats stats = this.bloomDMCache.stats();
+    sb.append("hitCount: 
").append(stats.hitCount()).append(System.lineSeparator())
+        .append("hitRate: 
").append(stats.hitCount()).append(System.lineSeparator())
+        .append("loadCount: 
").append(stats.loadCount()).append(System.lineSeparator())
+        .append("averageLoadPenalty: ").append(stats.averageLoadPenalty())
+        .append(System.lineSeparator())
+        .append("evictionCount: ").append(stats.evictionCount());
+    return sb.toString();
+  }
+
+  /**
+   * clear this cache
+   */
+  private void clear() {
+    LOGGER.info(String.format("Current meta cache statistic: %s", 
getCacheStatus()));
+    LOGGER.info("Trigger invalid all the cache for bloom datamap");
+    this.bloomDMCache.invalidateAll();
+  }
+
+  public static class CacheKey {
+    private String shardPath;
+    private String indexColumn;
+
+    CacheKey(String shardPath, String indexColumn) {
+      this.shardPath = shardPath;
+      this.indexColumn = indexColumn;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("CacheKey{");
+      sb.append("shardPath='").append(shardPath).append('\'');
+      sb.append(", indexColumn='").append(indexColumn).append('\'');
+      sb.append('}');
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof CacheKey)) return false;
+      CacheKey cacheKey = (CacheKey) o;
+      return Objects.equals(shardPath, cacheKey.shardPath)
+          && Objects.equals(indexColumn, cacheKey.indexColumn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(shardPath, indexColumn);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d14c403f/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index a55de11..f6eb331 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -138,8 +137,8 @@ public class BloomDataMapWriter extends DataMapWriter {
     }
     List<CarbonColumn> indexColumns = getIndexColumns();
     for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
-      String dmFile = dataMapPath + CarbonCommonConstants.FILE_SEPARATOR +
-          indexColumns.get(indexColId).getColName() + 
BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
+      String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath,
+          indexColumns.get(indexColId).getColName());
       DataOutputStream dataOutStream = null;
       ObjectOutputStream objectOutStream = null;
       try {

Reply via email to