This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch bloomfilter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d797dff895417f75b23c85281b37ab3ae1880be5
Author: kishoreg <[email protected]>
AuthorDate: Mon Nov 19 18:29:39 2018 -0800

    Adding support for bloom filter
---
 .../pinot/common/config/IndexingConfig.java        |  12 ++
 .../linkedin/pinot/common/config/TableConfig.java  |   7 +
 .../pinot/common/segment/SegmentMetadata.java      |   3 +
 .../com/linkedin/pinot/core/common/DataSource.java |   3 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |   4 +-
 .../core/query/pruner/AbstractSegmentPruner.java   |  13 +-
 .../query/pruner/ColumnValueSegmentPruner.java     |  44 ++++--
 .../core/query/pruner/PartitionSegmentPruner.java  |  10 +-
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../creator/impl/bloom/BloomFilterCreator.java     |  51 +++++++
 .../core/segment/index/SegmentMetadataImpl.java    |   5 +
 .../segment/index/column/ColumnIndexContainer.java |   3 +
 .../index/column/PhysicalColumnIndexContainer.java |  23 ++-
 .../index/data/source/ColumnDataSource.java        |  19 ++-
 .../segment/index/loader/IndexLoadingConfig.java   |  17 +++
 .../segment/index/loader/SegmentPreProcessor.java  |   6 +-
 .../loader/bloomfilter/BloomFilterHandler.java     | 158 +++++++++++++++++++++
 .../segment/index/readers/BloomFilterReader.java   |  32 +++++
 .../core/segment/store/ColumnIndexDirectory.java   |  18 ++-
 .../pinot/core/segment/store/ColumnIndexType.java  |   3 +-
 .../core/segment/store/FilePerIndexDirectory.java  |  17 +++
 .../segment/store/SegmentLocalFSDirectory.java     |   6 +
 .../segment/store/SingleFileIndexDirectory.java    |  12 ++
 .../virtualcolumn/VirtualColumnIndexContainer.java |   6 +
 .../v2/store/StarTreeDimensionDataSource.java      |   6 +
 .../v2/store/StarTreeMetricDataSource.java         |   8 ++
 .../core/common/RealtimeNoDictionaryTest.java      |   8 +-
 .../index/creator/BloomFilterCreatorTest.java      |  71 +++++++++
 .../query/pruner/ColumnValueSegmentPrunerTest.java |   4 +-
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |   6 +-
 .../pinot/tools/perf/PerfBenchmarkRunner.java      |  16 ++-
 pom.xml                                            |   1 +
 32 files changed, 552 insertions(+), 41 deletions(-)

diff --git 
a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
 
b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
index b57825c..35de1e3 100644
--- 
a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
+++ 
b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
@@ -43,6 +43,9 @@ public class IndexingConfig {
   @ConfigKey("sortedColumn")
   private List<String> _sortedColumn = new ArrayList<>();
 
+  @ConfigKey("bloomFilterColumns")
+  private List<String> _bloomFilterColumns = new ArrayList<>();
+
   @ConfigKey("loadMode")
   private String _loadMode;
 
@@ -113,6 +116,15 @@ public class IndexingConfig {
     _sortedColumn = sortedColumn;
   }
 
+  
+  public List<String> getBloomFilterColumns() {
+    return _bloomFilterColumns;
+  }
+
+  public void setBloomFilterColumns(List<String> _bloomFilterColumns) {
+    this._bloomFilterColumns = _bloomFilterColumns;
+  }
+
   public String getLoadMode() {
     return _loadMode;
   }
diff --git 
a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java 
b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
index 960c7f0..373f369 100644
--- 
a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
+++ 
b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
@@ -394,6 +394,7 @@ public class TableConfig {
     private List<String> _invertedIndexColumns;
     private List<String> _noDictionaryColumns;
     private List<String> _onHeapDictionaryColumns;
+    private List<String> _bloomFilterColumns;
     private Map<String, String> _streamConfigs;
     private String _streamPartitionAssignmentStrategy = 
DEFAULT_STREAM_PARTITION_ASSIGNMENT_STRATEGY;
 
@@ -508,6 +509,11 @@ public class TableConfig {
       return this;
     }
 
+    public Builder setBloomFilterColumns(List<String> bloomFilterColumns) {
+      _bloomFilterColumns = bloomFilterColumns;
+      return this;
+    }
+
     public Builder setNoDictionaryColumns(List<String> noDictionaryColumns) {
       _noDictionaryColumns = noDictionaryColumns;
       return this;
@@ -583,6 +589,7 @@ public class TableConfig {
       indexingConfig.setNoDictionaryColumns(_noDictionaryColumns);
       indexingConfig.setOnHeapDictionaryColumns(_onHeapDictionaryColumns);
       indexingConfig.setStreamConfigs(_streamConfigs);
+      indexingConfig.setBloomFilterColumns(_bloomFilterColumns);
       StreamConsumptionConfig streamConsumptionConfig = new 
StreamConsumptionConfig();
       
streamConsumptionConfig.setStreamPartitionAssignmentStrategy(_streamPartitionAssignmentStrategy);
       indexingConfig.setStreamConsumptionConfig(streamConsumptionConfig);
diff --git 
a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
 
b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
index 3a2d1bb..aaf53fe 100644
--- 
a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
+++ 
b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
@@ -82,6 +82,8 @@ public interface SegmentMetadata {
 
   String getBitmapInvertedIndexFileName(String column);
 
+  String getBloomFilterFileName(String column);
+
   String getCreatorName();
 
   char getPaddingCharacter();
@@ -99,4 +101,5 @@ public interface SegmentMetadata {
   String getDerivedColumn(String column, MetricFieldSpec.DerivedMetricType 
derivedMetricType);
 
   boolean close();
+
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java 
b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
index b44d721..6397024 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.core.common;
 
 import com.linkedin.pinot.core.operator.BaseOperator;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -26,4 +27,6 @@ public abstract class DataSource extends BaseOperator {
   public abstract InvertedIndexReader getInvertedIndex();
 
   public abstract Dictionary getDictionary();
+  
+  public abstract BloomFilterReader getBloomFilter();  
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index a32068a..7c356a0 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,6 +35,7 @@ import 
com.linkedin.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndex
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
 import com.linkedin.pinot.core.segment.index.data.source.ColumnDataSource;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnContext;
 import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
 import 
com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
@@ -79,6 +80,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, DataFileReader> _indexReaderWriterMap = new 
HashMap<>();
   private final Map<String, Integer> _maxNumValuesMap = new HashMap<>();
   private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = 
new HashMap<>();
+  private final Map<String, BloomFilterReader> _bloomFilterMap = new 
HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
   private boolean _aggregateMetrics;
 
@@ -386,7 +388,7 @@ public class MutableSegmentImpl implements MutableSegment {
   public ColumnDataSource getDataSource(String columnName) {
     if (!_schema.isVirtualColumn(columnName)) {
       return new ColumnDataSource(_schema.getFieldSpecFor(columnName), 
_numDocsIndexed, _maxNumValuesMap.get(columnName),
-          _indexReaderWriterMap.get(columnName), 
_invertedIndexMap.get(columnName), _dictionaryMap.get(columnName));
+          _indexReaderWriterMap.get(columnName), 
_invertedIndexMap.get(columnName), _dictionaryMap.get(columnName), 
_bloomFilterMap.get(columnName));
     } else {
       return getVirtualDataSource(columnName);
     }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
index 258fae9..b23c438 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
@@ -22,6 +22,8 @@ import com.linkedin.pinot.common.request.FilterOperator;
 import com.linkedin.pinot.common.utils.request.FilterQueryTree;
 import com.linkedin.pinot.core.query.exception.BadQueryRequestException;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
 import javax.annotation.Nonnull;
 
 
@@ -32,7 +34,8 @@ import javax.annotation.Nonnull;
  */
 public abstract class AbstractSegmentPruner implements SegmentPruner {
 
-  public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, 
Map<String, ColumnMetadata> columnMetadataMap);
+  public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, 
Map<String, ColumnMetadata> columnMetadataMap,
+      Map<String, BloomFilterReader> bloomFilterMap);
 
   /**
    * Given a non leaf filter query tree node prunes it as follows:
@@ -46,8 +49,8 @@ public abstract class AbstractSegmentPruner implements 
SegmentPruner {
    *
    * @return True to prune, false otherwise
    */
-  protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree,
-      @Nonnull Map<String, ColumnMetadata> columnMetadataMap) {
+  protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree, 
@Nonnull Map<String, ColumnMetadata> columnMetadataMap,
+      Map<String, BloomFilterReader> bloomFilterMap) {
     List<FilterQueryTree> children = filterQueryTree.getChildren();
 
     if (children.isEmpty()) {
@@ -58,7 +61,7 @@ public abstract class AbstractSegmentPruner implements 
SegmentPruner {
     switch (filterOperator) {
       case AND:
         for (FilterQueryTree child : children) {
-          if (pruneSegment(child, columnMetadataMap)) {
+          if (pruneSegment(child, columnMetadataMap, bloomFilterMap)) {
             return true;
           }
         }
@@ -66,7 +69,7 @@ public abstract class AbstractSegmentPruner implements 
SegmentPruner {
 
       case OR:
         for (FilterQueryTree child : children) {
-          if (!pruneSegment(child, columnMetadataMap)) {
+          if (!pruneSegment(child, columnMetadataMap, bloomFilterMap)) {
             return false;
           }
         }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index a2953ce..40c34f7 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -23,8 +23,13 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment;
 import com.linkedin.pinot.core.query.request.ServerQueryRequest;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import javax.annotation.Nonnull;
 import org.apache.commons.configuration.Configuration;
 
@@ -48,7 +53,15 @@ public class ColumnValueSegmentPruner extends 
AbstractSegmentPruner {
     // For realtime segment, this map can be null.
     Map<String, ColumnMetadata> columnMetadataMap =
         ((SegmentMetadataImpl) 
segment.getSegmentMetadata()).getColumnMetadataMap();
-    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, 
columnMetadataMap);
+
+    Map<String, BloomFilterReader> bloomFilterMap = new HashMap<>();
+    for (String column : columnMetadataMap.keySet()) {
+      BloomFilterReader bloomFilterReader = 
segment.getDataSource(column).getBloomFilter();
+      if (bloomFilterReader != null) {
+        bloomFilterMap.put(column, bloomFilterReader);
+      }
+    }
+    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, 
columnMetadataMap, bloomFilterMap);
   }
 
   @Override
@@ -74,7 +87,7 @@ public class ColumnValueSegmentPruner extends 
AbstractSegmentPruner {
   @SuppressWarnings("unchecked")
   @Override
   public boolean pruneSegment(@Nonnull FilterQueryTree filterQueryTree,
-      @Nonnull Map<String, ColumnMetadata> columnMetadataMap) {
+      @Nonnull Map<String, ColumnMetadata> columnMetadataMap, Map<String, 
BloomFilterReader> bloomFilterMap) {
     FilterOperator filterOperator = filterQueryTree.getOperator();
     List<FilterQueryTree> children = filterQueryTree.getChildren();
 
@@ -86,7 +99,8 @@ public class ColumnValueSegmentPruner extends 
AbstractSegmentPruner {
         return false;
       }
 
-      ColumnMetadata columnMetadata = 
columnMetadataMap.get(filterQueryTree.getColumn());
+      String column = filterQueryTree.getColumn();
+      ColumnMetadata columnMetadata = columnMetadataMap.get(column);
       if (columnMetadata == null) {
         // Should not reach here after DataSchemaSegmentPruner
         return true;
@@ -97,16 +111,22 @@ public class ColumnValueSegmentPruner extends 
AbstractSegmentPruner {
 
       if (filterOperator == FilterOperator.EQUALITY) {
         // EQUALITY
-
-        // Doesn't have min/max value set in metadata
-        if ((minValue == null) || (maxValue == null)) {
-          return false;
-        }
-
-        // Check if the value is in the min/max range
+        boolean prune = false;
         FieldSpec.DataType dataType = columnMetadata.getDataType();
         Comparable value = getValue(filterQueryTree.getValue().get(0), 
dataType);
-        return (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 
0);
+        // Doesn't have min/max value set in metadata
+        if (minValue != null && maxValue != null) {
+          // Check if the value is in the min/max range
+          prune = (value.compareTo(minValue) < 0) || 
(value.compareTo(maxValue) > 0);
+        }        
+        //check bloom filter if it exists
+        if( !prune && bloomFilterMap.containsKey(column)) {
+          BloomFilterReader bloomFilterReader = bloomFilterMap.get(column);
+          if(bloomFilterReader != null){
+            prune = !bloomFilterReader.mightContain(value);
+          }
+        }
+        return prune;
       } else {
         // RANGE
 
@@ -170,7 +190,7 @@ public class ColumnValueSegmentPruner extends 
AbstractSegmentPruner {
       }
     } else {
       // Parent node
-      return pruneNonLeaf(filterQueryTree, columnMetadataMap);
+      return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
index f8e1cd9..6846fae 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
@@ -22,6 +22,9 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment;
 import com.linkedin.pinot.core.query.request.ServerQueryRequest;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.configuration.Configuration;
@@ -59,7 +62,8 @@ public class PartitionSegmentPruner extends 
AbstractSegmentPruner {
     Map<String, ColumnMetadata> columnMetadataMap =
         ((SegmentMetadataImpl) 
segment.getSegmentMetadata()).getColumnMetadataMap();
 
-    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, 
columnMetadataMap);
+    Map<String, BloomFilterReader> emptyMap = Collections.emptyMap();
+    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, 
columnMetadataMap, emptyMap);
   }
 
   /**
@@ -74,12 +78,12 @@ public class PartitionSegmentPruner extends 
AbstractSegmentPruner {
    * @return True if segment can be pruned, false otherwise
    */
   @Override
-  public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, 
ColumnMetadata> columnMetadataMap) {
+  public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, 
ColumnMetadata> columnMetadataMap, Map<String, BloomFilterReader> 
bloomFilterMap) {
     List<FilterQueryTree> children = filterQueryTree.getChildren();
 
     // Non-leaf node
     if (children != null && !children.isEmpty()) {
-      return pruneNonLeaf(filterQueryTree, columnMetadataMap);
+      return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap);
     }
 
     // TODO: Enhance partition based pruning for RANGE operator.
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
index 772d5a3..c3facda 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
@@ -57,6 +57,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = 
".bitmap.inv";
+    public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
   }
 
   public static class MetadataKeys {
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
new file mode 100644
index 0000000..64579b0
--- /dev/null
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.creator.impl.bloom;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+
+import com.clearspring.analytics.stream.membership.BloomFilter;
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+
+public class BloomFilterCreator implements AutoCloseable {
+
+  BloomFilter _bloomFilter;
+  File _bloomFilterFile;
+
+  public BloomFilterCreator(File indexDir, FieldSpec fieldSpec, int 
cardinality, int numDocs, int totalNumberOfEntries) {
+    String columnName = fieldSpec.getName();
+    _bloomFilterFile = new File(indexDir, columnName + 
V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+    _bloomFilter = new BloomFilter(cardinality, .03);
+
+  }
+
+  @Override
+  public void close() throws Exception {
+    byte[] buffer = BloomFilter.serialize(_bloomFilter);
+    try (DataOutputStream dataOutputStream = new DataOutputStream(new 
BufferedOutputStream(new FileOutputStream(_bloomFilterFile)))) {
+      dataOutputStream.write(buffer);
+    }
+  }
+
+  public void add(Object input) {
+    _bloomFilter.add(input.toString());
+  }
+
+}
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
index 4b1c8af..5d8f164 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -555,6 +555,11 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
     return column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
   }
 
+  @Override
+  public String getBloomFilterFileName(String column) {
+    return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
+  }
+
   @Nullable
   @Override
   public String getCreatorName() {
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
index 8850ed2..6e4e515 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.core.segment.index.column;
 
 import com.linkedin.pinot.core.io.reader.DataFileReader;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -39,4 +40,6 @@ public interface ColumnIndexContainer {
    * Returns the dictionary for the column, or {@code null} if it does not 
exist.
    */
   Dictionary getDictionary();
+  
+  BloomFilterReader getBloomFilter();
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index fb78751..2af188b 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -27,6 +27,7 @@ import 
com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig;
 import com.linkedin.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.BytesDictionary;
 import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary;
 import com.linkedin.pinot.core.segment.index.readers.FloatDictionary;
@@ -44,6 +45,7 @@ import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
 import com.linkedin.pinot.core.segment.store.ColumnIndexType;
 import com.linkedin.pinot.core.segment.store.SegmentDirectory;
 import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,18 +56,29 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
   private final DataFileReader _forwardIndex;
   private final InvertedIndexReader _invertedIndex;
   private final ImmutableDictionaryReader _dictionary;
-
+  private final BloomFilterReader _bloomFilterReader; 
+  
   public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, 
ColumnMetadata metadata,
       IndexLoadingConfig indexLoadingConfig) throws IOException {
     String columnName = metadata.getColumnName();
     boolean loadInvertedIndex = false;
     boolean loadOnHeapDictionary = false;
+    boolean loadBloomFilter = false;
     if (indexLoadingConfig != null) {
       loadInvertedIndex = 
indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
       loadOnHeapDictionary = 
indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
+      loadBloomFilter = 
indexLoadingConfig.getBloomFilterColumns().contains(columnName);
     }
     PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, 
ColumnIndexType.FORWARD_INDEX);
+
     if (metadata.hasDictionary()) {
+      //bloom filter
+      if (loadBloomFilter) {
+        PinotDataBuffer bloomFilterBuffer = 
segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
+        _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer, 
metadata.getDataType());
+      } else {
+        _bloomFilterReader = null;
+      }
       // Dictionary-based index
       _dictionary = loadDictionary(segmentReader.getIndexFor(columnName, 
ColumnIndexType.DICTIONARY), metadata,
           loadOnHeapDictionary);
@@ -100,6 +113,7 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
       _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, 
metadata.getDataType());
       _invertedIndex = null;
       _dictionary = null;
+      _bloomFilterReader = null;
     }
   }
 
@@ -117,6 +131,12 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
   public ImmutableDictionaryReader getDictionary() {
     return _dictionary;
   }
+  
+  @Override
+  public BloomFilterReader getBloomFilter() {
+    return _bloomFilterReader;
+  }
+
 
   private static ImmutableDictionaryReader loadDictionary(PinotDataBuffer 
dictionaryBuffer, ColumnMetadata metadata,
       boolean loadOnHeap) {
@@ -175,4 +195,5 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
         throw new IllegalStateException("Illegal data type for raw forward 
index: " + dataType);
     }
   }
+
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
index 16c79f4..c8e6af3 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -30,6 +30,7 @@ import 
com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
 import com.linkedin.pinot.core.realtime.impl.dictionary.MutableDictionary;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -44,6 +45,7 @@ public final class ColumnDataSource extends DataSource {
   private final DataFileReader _forwardIndex;
   private final InvertedIndexReader _invertedIndex;
   private final Dictionary _dictionary;
+  private final BloomFilterReader _bloomFilter;
   private final int _cardinality;
   private final DataSourceMetadata _metadata;
 
@@ -53,21 +55,22 @@ public final class ColumnDataSource extends DataSource {
   public ColumnDataSource(ColumnIndexContainer indexContainer, ColumnMetadata 
metadata) {
     this(metadata.getColumnName(), metadata.getDataType(), 
metadata.isSingleValue(), metadata.isSorted(),
         metadata.getTotalDocs(), metadata.getMaxNumberOfMultiValues(), 
indexContainer.getForwardIndex(),
-        indexContainer.getInvertedIndex(), indexContainer.getDictionary(), 
metadata.getCardinality());
+        indexContainer.getInvertedIndex(), indexContainer.getDictionary(), 
indexContainer.getBloomFilter(), 
+        metadata.getCardinality());
   }
 
   /**
    * For REALTIME segment.
    */
   public ColumnDataSource(FieldSpec fieldSpec, int numDocs, int 
maxNumMultiValues, DataFileReader forwardIndex,
-      InvertedIndexReader invertedIndex, MutableDictionary dictionary) {
+      InvertedIndexReader invertedIndex, MutableDictionary dictionary, 
BloomFilterReader bloomFilter) {
     this(fieldSpec.getName(), fieldSpec.getDataType(), 
fieldSpec.isSingleValueField(), false, numDocs,
-        maxNumMultiValues, forwardIndex, invertedIndex, dictionary, 
Constants.UNKNOWN_CARDINALITY);
+        maxNumMultiValues, forwardIndex, invertedIndex, dictionary, 
bloomFilter, Constants.UNKNOWN_CARDINALITY);
   }
 
   private ColumnDataSource(String columnName, FieldSpec.DataType dataType, 
boolean isSingleValue, boolean isSorted,
-      int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, 
InvertedIndexReader invertedIndex,
-      Dictionary dictionary, int cardinality) {
+      int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, 
InvertedIndexReader invertedIndex, 
+      Dictionary dictionary, BloomFilterReader bloomFilterReader, int 
cardinality) {
     // Sanity check
     if (isSingleValue) {
       Preconditions.checkState(forwardIndex instanceof 
SingleColumnSingleValueReader);
@@ -93,6 +96,7 @@ public final class ColumnDataSource extends DataSource {
     _forwardIndex = forwardIndex;
     _invertedIndex = invertedIndex;
     _dictionary = dictionary;
+    _bloomFilter = bloomFilterReader;
     _cardinality = cardinality;
 
     _metadata = new DataSourceMetadata() {
@@ -154,6 +158,11 @@ public final class ColumnDataSource extends DataSource {
   }
 
   @Override
+  public BloomFilterReader getBloomFilter() {
+    return _bloomFilter;
+  }
+  
+  @Override
   protected Block getNextBlock() {
     if (_isSingleValue) {
       return new SingleValueBlock((SingleColumnSingleValueReader) 
_forwardIndex, _numDocs, _dataType, _dictionary);
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 6381014..7c54c90 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -44,6 +44,8 @@ public class IndexLoadingConfig {
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace 
this by _noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
+  private Set<String> _bloomFilterColumns = new HashSet<>();
+
   private SegmentVersion _segmentVersion;
   // This value will remain true only when the empty constructor is invoked.
   private boolean _enableDefaultColumns = true;
@@ -76,6 +78,11 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
+    List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
+    if (bloomFilterColumns != null) {
+      _bloomFilterColumns.addAll(bloomFilterColumns);
+    }
+
     List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
     if (noDictionaryColumns != null) {
       _noDictionaryColumns.addAll(noDictionaryColumns);
@@ -164,6 +171,12 @@ public class IndexLoadingConfig {
   }
 
   @VisibleForTesting
+  public void setBloomFilterColumns(@Nonnull Set<String> bloomFilterColumns) {
+    _bloomFilterColumns = bloomFilterColumns;
+  }
+
+
+  @VisibleForTesting
   public void setOnHeapDictionaryColumns(@Nonnull Set<String> 
onHeapDictionaryColumns) {
     _onHeapDictionaryColumns = onHeapDictionaryColumns;
   }
@@ -183,6 +196,10 @@ public class IndexLoadingConfig {
     return _onHeapDictionaryColumns;
   }
 
+  public Set<String> getBloomFilterColumns() {
+    return _bloomFilterColumns;
+  }
+
   @Nullable
   public SegmentVersion getSegmentVersion() {
     return _segmentVersion;
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
index 6cf064d..93d9f15 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -19,6 +19,7 @@ import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.segment.ReadMode;
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import 
com.linkedin.pinot.core.segment.index.loader.bloomfilter.BloomFilterHandler;
 import 
com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGenerator;
 import 
com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
 import 
com.linkedin.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
@@ -63,7 +64,6 @@ public class SegmentPreProcessor implements AutoCloseable {
     if (_segmentMetadata.getTotalDocs() == 0) {
       return;
     }
-
     // Remove all the existing inverted index temp files before loading 
segments.
     // NOTE: This step fixes the issue of temporary files not getting deleted 
after creating new inverted indexes.
     // In this, we look for all files in the directory and remove the ones 
with  '.bitmap.inv.tmp' extension.
@@ -91,6 +91,10 @@ public class SegmentPreProcessor implements AutoCloseable {
           new InvertedIndexHandler(_indexDir, _segmentMetadata, 
_indexLoadingConfig, segmentWriter);
       invertedIndexHandler.createInvertedIndices();
 
+      BloomFilterHandler bloomFilterHandler = 
+          new BloomFilterHandler(_indexDir, _segmentMetadata, 
_indexLoadingConfig, segmentWriter);
+      bloomFilterHandler.createBloomFilters();
+      
       // Add min/max value to column metadata according to the prune mode.
       // For star-tree index, because it can only increase the range, so 
min/max value can still be used in pruner.
       ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode =
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
new file mode 100644
index 0000000..d5494b8
--- /dev/null
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -0,0 +1,158 @@
+package com.linkedin.pinot.core.segment.index.loader.bloomfilter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.FieldSpec.DataType;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.core.io.reader.DataFileReader;
+import com.linkedin.pinot.core.io.reader.ReaderContext;
+import com.linkedin.pinot.core.io.reader.SingleColumnSingleValueReader;
+import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
+import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import 
com.linkedin.pinot.core.io.reader.impl.v1.FixedByteChunkSingleValueReader;
+import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+import com.linkedin.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig;
+import com.linkedin.pinot.core.segment.index.loader.LoaderUtils;
+import 
com.linkedin.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary;
+import com.linkedin.pinot.core.segment.index.readers.FloatDictionary;
+import com.linkedin.pinot.core.segment.index.readers.ImmutableDictionaryReader;
+import com.linkedin.pinot.core.segment.index.readers.IntDictionary;
+import com.linkedin.pinot.core.segment.index.readers.LongDictionary;
+import com.linkedin.pinot.core.segment.index.readers.StringDictionary;
+import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
+import com.linkedin.pinot.core.segment.store.ColumnIndexType;
+import com.linkedin.pinot.core.segment.store.SegmentDirectory;
+
+public class BloomFilterHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InvertedIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>();
+
+  public BloomFilterHandler(@Nonnull File indexDir, @Nonnull 
SegmentMetadataImpl segmentMetadata, @Nonnull IndexLoadingConfig 
indexLoadingConfig,
+      @Nonnull SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Do not create inverted index for sorted column
+    for (String column : indexLoadingConfig.getBloomFilterColumns()) {
+      ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null) {
+        _bloomFilterColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  public void createBloomFilters() throws Exception {
+    for (ColumnMetadata columnMetadata : _bloomFilterColumns) {
+      if(columnMetadata.hasDictionary()) {
+        createBloomFilterForColumn(columnMetadata);
+      }
+    }
+  }
+
+  private void createBloomFilterForColumn(ColumnMetadata columnMetadata) 
throws Exception {
+    String column = columnMetadata.getColumnName();
+
+    File inProgress = new File(_indexDir, column + ".bloom.inprogress");
+    File bloomFilterFile = new File(_indexDir, column + 
V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+
+      if (_segmentWriter.hasIndexFor(column, ColumnIndexType.BLOOM_FILTER)) {
+        // Skip creating bloom filter index if already exists.
+
+        LOGGER.info("Found bloom filter for segment: {}, column: {}", 
_segmentName, column);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+
+      // Remove inverted index if exists.
+      // For v1 and v2, it's the actual inverted index. For v3, it's the 
temporary inverted index.
+      FileUtils.deleteQuietly(bloomFilterFile);
+    }
+
+    // Create new bloom filter for the column.
+    LOGGER.info("Creating new bloom filter for segment: {}, column: {}", 
_segmentName, column);
+    int numDocs = columnMetadata.getTotalDocs();
+    try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, 
columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), numDocs,
+        columnMetadata.getTotalNumberOfEntries())) {
+      if (columnMetadata.hasDictionary()) {
+        //read dictionary
+        try (ImmutableDictionaryReader dictionaryReader = 
getDictionaryReader(columnMetadata, _segmentWriter)) {
+          for (int i = 0; i < dictionaryReader.length(); i++) {
+            creator.add(dictionaryReader.get(i));
+          }
+        }
+      } else {
+        //read the forward index
+        throw new UnsupportedOperationException("Bloom filters not supported 
for No Dictionary columns");
+      }
+    }
+
+    // For v3, write the generated inverted index file into the single file 
and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, 
bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created bloom filter for segment: {}, column: {}", 
_segmentName, column);
+  }
+
+
+
+  private ImmutableDictionaryReader getDictionaryReader(ColumnMetadata 
columnMetadata, SegmentDirectory.Writer segmentWriter) throws IOException {
+    PinotDataBuffer dictionaryBuffer = 
segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
ColumnIndexType.DICTIONARY);
+    int cardinality = columnMetadata.getCardinality();
+    ImmutableDictionaryReader dictionaryReader;
+    DataType dataType = columnMetadata.getDataType();
+    switch (dataType) {
+    case INT:
+      dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality);
+      break;
+    case LONG:
+      dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality);
+      break;
+    case FLOAT:
+      dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality);
+      break;
+    case DOUBLE:
+      dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality);
+      break;
+    case STRING:
+      dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, 
columnMetadata.getColumnMaxLength(), (byte) 
columnMetadata.getPaddingCharacter());
+      break;
+    default:
+      throw new IllegalStateException("Unsupported data type: " + dataType + " 
for column: " + columnMetadata.getColumnName());
+    }
+    return dictionaryReader;
+  }
+
+}
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
new file mode 100644
index 0000000..bab69f1
--- /dev/null
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
@@ -0,0 +1,32 @@
+package com.linkedin.pinot.core.segment.index.readers;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+
+import com.clearspring.analytics.stream.membership.BloomFilter;
+import com.linkedin.pinot.common.data.FieldSpec.DataType;
+import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
+
+public class BloomFilterReader {
+  BloomFilter _bloomFilter;
+  
+  public BloomFilterReader(PinotDataBuffer bloomFilterBuffer, DataType 
dataType) throws IOException{
+    byte[] buffer = new byte[(int)bloomFilterBuffer.size()];
+    bloomFilterBuffer.copyTo(0, buffer);
+    _bloomFilter = BloomFilter.deserialize(buffer);
+  }
+  
+  public boolean mightContain(Object key){
+    return _bloomFilter.isPresent(key.toString());
+  }
+  
+  public static void main(String[] args) throws Exception {
+    FileInputStream fileInputStream = new FileInputStream(new 
File("/home/kgopalak/pinot_perf/index_dir/tpch_lineitem_OFFLINE/tpch_lineitem_0/l_orderkey.bloom.inv"));
+    byte[] buffer = IOUtils.toByteArray(fileInputStream);
+    BloomFilter bloomFilter = BloomFilter.deserialize(buffer);
+    System.out.println(bloomFilter.buckets());
+  }
+}
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
index 3579f1c..2973969 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
@@ -93,7 +93,14 @@ abstract class ColumnIndexDirectory implements Closeable {
    */
   public abstract PinotDataBuffer getInvertedIndexBufferFor(String column)
       throws IOException;
-
+  /**
+   * Get inverted bloom filter buffer for a column
+   * @param column column name
+   * @return in-memory ByteBuffer like buffer for data
+   * @throws IOException
+   */
+  public abstract PinotDataBuffer getBloomFilterBufferFor(String column)
+      throws IOException;
   /**
    * Allocate a new data buffer of specified sizeBytes in the columnar index 
directory
    * @param column column name
@@ -121,6 +128,15 @@ abstract class ColumnIndexDirectory implements Closeable {
    */
   public abstract PinotDataBuffer newInvertedIndexBuffer(String column, long 
sizeBytes)
       throws IOException;
+  /**
+   * Allocate a new data buffer of specified sizeBytes in the columnar index 
directory
+   * @param column column name
+   * @param sizeBytes sizeBytes for the buffer allocation
+   * @return in-memory ByteBuffer like buffer for data
+   * @throws IOException
+   */
+  public abstract PinotDataBuffer newBloomFilterBuffer(String column, long 
sizeBytes)
+      throws IOException;
 
   /**
    * Check if an index exists for a column
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
index 85d38f7..5f7e008 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
@@ -18,7 +18,8 @@ package com.linkedin.pinot.core.segment.store;
 public enum ColumnIndexType {
   DICTIONARY("dictionary"),
   FORWARD_INDEX("forward_index"),
-  INVERTED_INDEX("inverted_index");
+  INVERTED_INDEX("inverted_index"),
+  BLOOM_FILTER("bloom_filter");
 
   private final String indexName;
   ColumnIndexType(String name) {
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
index c6ea2d9..8599cde 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -81,6 +81,20 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer getBloomFilterBufferFor(String column)
+      throws IOException {
+    IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
+    return getReadBufferFor(key);
+  }
+
+  @Override
+  public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+      throws IOException {
+    IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
+    return getWriteBufferFor(key, sizeBytes);
+  }
+
+  @Override
   public boolean hasIndexFor(String column, ColumnIndexType type) {
     File indexFile = getFileFor(column, type);
     return indexFile.exists();
@@ -140,6 +154,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case INVERTED_INDEX:
         filename = metadata.getBitmapInvertedIndexFileName(column);
         break;
+      case BLOOM_FILTER:
+        filename = metadata.getBloomFilterFileName(column);
+        break;
       default:
         throw new UnsupportedOperationException("Unknown index type: " + 
indexType.toString());
     }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
index d29c37c..235f9cf 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
@@ -233,6 +233,10 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
       case INVERTED_INDEX:
         buffer = columnIndexDirectory.getInvertedIndexBufferFor(column);
         break;
+      case BLOOM_FILTER:
+        buffer = columnIndexDirectory.getBloomFilterBufferFor(column);
+        break;
+
       default:
         throw new RuntimeException("Unknown index type: " + type.name());
     }
@@ -412,6 +416,8 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
           return columnIndexDirectory.newForwardIndexBuffer(key.name, 
sizeBytes);
         case INVERTED_INDEX:
           return columnIndexDirectory.newInvertedIndexBuffer(key.name, 
sizeBytes);
+        case BLOOM_FILTER:
+          return columnIndexDirectory.newBloomFilterBuffer(key.name, 
sizeBytes);
         default:
           throw new RuntimeException("Unknown index type: " + indexType.name() 
+
               " for directory: " + segmentDirectory);
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
index cf22cc6..5271985 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
@@ -105,6 +105,12 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer getBloomFilterBufferFor(String column)
+      throws IOException {
+    return checkAndGetIndexBuffer(column, ColumnIndexType.BLOOM_FILTER);
+  }
+
+  @Override
   public boolean hasIndexFor(String column, ColumnIndexType type) {
     IndexKey key = new IndexKey(column, type);
     return columnEntries.containsKey(key);
@@ -128,6 +134,12 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
     return  allocNewBufferInternal(column, ColumnIndexType.INVERTED_INDEX, 
sizeBytes, "inverted_index.create");
   }
 
+  @Override
+  public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+      throws IOException {
+    return  allocNewBufferInternal(column, ColumnIndexType.BLOOM_FILTER, 
sizeBytes, "bloom_filter.create");
+  }
+
   private PinotDataBuffer checkAndGetIndexBuffer(String column, 
ColumnIndexType type) {
     IndexKey key = new IndexKey(column, type);
     IndexEntry entry = columnEntries.get(key);
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index aedd6db..f3d1c52 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.core.segment.virtualcolumn;
 import com.linkedin.pinot.core.io.reader.DataFileReader;
 import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer;
 import 
com.linkedin.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -51,4 +52,9 @@ public class VirtualColumnIndexContainer implements 
ColumnIndexContainer {
   public Dictionary getDictionary() {
     return _dictionary;
   }
+
+  @Override
+  public BloomFilterReader getBloomFilter() {
+    return null;
+  }
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
index b450ff5..8959969 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
@@ -21,6 +21,7 @@ import com.linkedin.pinot.core.common.DataSource;
 import com.linkedin.pinot.core.common.DataSourceMetadata;
 import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
 import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
@@ -100,6 +101,11 @@ public class StarTreeDimensionDataSource extends 
DataSource {
   }
 
   @Override
+  public BloomFilterReader getBloomFilter() {
+    return null;
+  }
+
+  @Override
   public Dictionary getDictionary() {
     return _dictionary;
   }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
index 31f742a..97d4ca1 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
@@ -24,6 +24,7 @@ import 
com.linkedin.pinot.core.io.reader.impl.v1.BaseChunkSingleValueReader;
 import 
com.linkedin.pinot.core.io.reader.impl.v1.FixedByteChunkSingleValueReader;
 import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
@@ -110,6 +111,11 @@ public class StarTreeMetricDataSource extends DataSource {
   }
 
   @Override
+  public BloomFilterReader getBloomFilter() {
+    return null;
+  }
+  
+  @Override
   protected Block getNextBlock() {
     return new SingleValueBlock(_forwardIndex, _numDocs, _dataType, null);
   }
@@ -118,4 +124,6 @@ public class StarTreeMetricDataSource extends DataSource {
   public String getOperatorName() {
     return _operatorName;
   }
+
+
 }
diff --git 
a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
 
b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
index a278149..52b8542 100644
--- 
a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
+++ 
b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -86,10 +86,10 @@ public class RealtimeNoDictionaryTest {
     }
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
-    dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 
0, intRawIndex, null, null));
-    dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, 
NUM_ROWS, 0, longRawIndex, null, null));
-    dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, 
NUM_ROWS, 0, floatRawIndex, null, null));
-    dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, 
NUM_ROWS, 0, doubleRawIndex, null, null));
+    dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 
0, intRawIndex, null, null, null));
+    dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, 
NUM_ROWS, 0, longRawIndex, null, null, null));
+    dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, 
NUM_ROWS, 0, floatRawIndex, null, null, null));
+    dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, 
NUM_ROWS, 0, doubleRawIndex, null, null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
diff --git 
a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
 
b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
new file mode 100644
index 0000000..70ea025
--- /dev/null
+++ 
b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
@@ -0,0 +1,71 @@
+package com.linkedin.pinot.core.segment.index.creator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.Test;
+
+import com.clearspring.analytics.stream.membership.BloomFilter;
+import com.google.common.hash.Funnels;
+
+public class BloomFilterCreatorTest {
+
+  @Test
+  public void compareGauvaVsClearSpring() throws Exception {
+    int numElementsArray[] = new int[] { 10000, 100000, 1000000 };
+    for (int numElements : numElementsArray) {
+      double maxFalsePosProbability = 0.1;
+//      int optimalNumOfBits 
=com.google.common.hash.BloomFilter.optimalNumOfBits
+      BloomFilter bloomFilter = new BloomFilter(numElements, 
maxFalsePosProbability);
+      com.google.common.hash.BloomFilter<Integer> gauvaIntegerBloomFilter = 
com.google.common.hash.BloomFilter.create(Funnels.integerFunnel(), numElements,
+          maxFalsePosProbability);
+      com.google.common.hash.BloomFilter<String> gauvaStringBloomFilter = 
com.google.common.hash.BloomFilter
+          .create(Funnels.stringFunnel(Charset.forName("UTF-8")), numElements, 
maxFalsePosProbability);
+      System.out.println("BitSet size:" + bloomFilter.buckets());
+      Random random = new Random();
+      Set<Integer> set = new HashSet<>(); 
+      for (int i = 0; i < 100000; i++) {
+        int id = random.nextInt(500000000);
+        set.add(id);
+        bloomFilter.add(String.valueOf(id));
+        gauvaIntegerBloomFilter.put(id);
+        gauvaStringBloomFilter.put(String.valueOf(id));
+      }
+      byte[] buffer1 = BloomFilter.serialize(bloomFilter);
+      ByteArrayOutputStream intBuffer = new ByteArrayOutputStream();
+      gauvaIntegerBloomFilter.writeTo(intBuffer);
+      ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();
+      gauvaIntegerBloomFilter.writeTo(stringBuffer);
+      //get the internal bitset
+      System.out.println(Arrays.toString(BloomFilter.class.getFields()));
+      Field field = BloomFilter.class.getDeclaredField("filter_");
+      field.setAccessible(true);
+      BitSet bitset = (BitSet) field.get(bloomFilter);
+      MutableRoaringBitmap roaringBitmap = new MutableRoaringBitmap();
+      for(long l:bitset.toLongArray()){
+        roaringBitmap.add((int) l);
+      }
+      int roaringBitmapSize = roaringBitmap.serializedSizeInBytes();
+      System.out.println("numBits:" + bitset.length());
+      System.out.println("numBitsSet:" + bitset.cardinality());
+      System.out.println("numElements:" + numElements);
+      System.out.println("clear spring size:" + buffer1.length);
+      System.out.println("Gauva int size:" + intBuffer.size());
+      System.out.println("Gauva string size:" + stringBuffer.size());
+      System.out.println("Roaring bitmap size:" + roaringBitmapSize);
+      System.out.println("\n");
+      BitSet bitSet;
+    }
+
+  }
+}
diff --git 
a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
 
b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
index 22fc481..1e3fba1 100644
--- 
a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
+++ 
b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -21,6 +21,7 @@ import 
com.linkedin.pinot.common.utils.request.FilterQueryTree;
 import com.linkedin.pinot.common.utils.request.RequestUtils;
 import com.linkedin.pinot.core.query.pruner.ColumnValueSegmentPruner;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.pql.parsers.Pql2Compiler;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
 public class ColumnValueSegmentPrunerTest {
   private static final Pql2Compiler COMPILER = new Pql2Compiler();
   private static final Map<String, ColumnMetadata> COLUMN_METADATA_MAP = new 
HashMap<>();
+  private static final Map<String, BloomFilterReader> BLOOM_FILTER_MAP = new 
HashMap<>();
 
   static {
     COLUMN_METADATA_MAP.put("time", new 
ColumnMetadata.Builder().setColumnName("time")
@@ -95,6 +97,6 @@ public class ColumnValueSegmentPrunerTest {
     BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
     FilterQueryTree filterQueryTree = 
RequestUtils.generateFilterQueryTree(brokerRequest);
     ColumnValueSegmentPruner pruner = new ColumnValueSegmentPruner();
-    return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP);
+    return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP, 
BLOOM_FILTER_MAP);
   }
 }
diff --git 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
index 999f022..2368d8a 100644
--- 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
+++ 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.tools.perf;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
@@ -259,10 +260,10 @@ public class PerfBenchmarkDriver {
 
   public void configureTable(String tableName)
       throws Exception {
-    configureTable(tableName, null);
+    configureTable(tableName, null, null);
   }
 
-  public void configureTable(String tableName, List<String> 
invertedIndexColumns)
+  public void configureTable(String tableName, List<String> 
invertedIndexColumns, List<String> bloomFilterColumns)
       throws Exception {
     TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
         .setSegmentAssignmentStrategy(_segmentAssignmentStrategy)
@@ -272,6 +273,7 @@ public class PerfBenchmarkDriver {
         .setLoadMode(_loadMode)
         .setSegmentVersion(_segmentFormatVersion)
         .setInvertedIndexColumns(invertedIndexColumns)
+        .setBloomFilterColumns(bloomFilterColumns)
         .build();
     _helixResourceManager.addTable(tableConfig);
   }
diff --git 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
index ac124fe..17fec66 100644
--- 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
+++ 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
@@ -72,6 +72,10 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand 
implements Command
       usage = "Comma separated inverted index columns to be created (non-batch 
load).")
   private String _invertedIndexColumns;
 
+  @Option(name = "-bloomFilterColumns", required = false, metaVar = "<String>",
+      usage = "Comma separated bloom filter columns to be created (non-batch 
load).")
+  private String _bloomFilterColumns;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", 
"--h", "--help"},
       usage = "Print this message.")
   private boolean _help = false;
@@ -132,7 +136,7 @@ public class PerfBenchmarkRunner extends 
AbstractBaseCommand implements Command
           @Override
           public void run() {
             try {
-              loadTable(driver, _dataDir, tableName, null);
+              loadTable(driver, _dataDir, tableName, null, null);
             } catch (Exception e) {
               LOGGER.error("Caught exception while loading table: {}", 
tableName, e);
             }
@@ -146,14 +150,18 @@ public class PerfBenchmarkRunner extends 
AbstractBaseCommand implements Command
       if (_invertedIndexColumns != null) {
         invertedIndexColumns = Arrays.asList(_invertedIndexColumns.split(","));
       }
+      List<String> bloomFilterColumns = null;
+      if (_bloomFilterColumns != null) {
+        bloomFilterColumns = Arrays.asList(_bloomFilterColumns.split(","));
+      }
       for (String tableName : _tableNames.split(",")) {
-        loadTable(driver, _dataDir, tableName, invertedIndexColumns);
+        loadTable(driver, _dataDir, tableName, invertedIndexColumns, 
bloomFilterColumns);
       }
     }
   }
 
   public static void loadTable(PerfBenchmarkDriver driver, String dataDir, 
String tableName,
-      List<String> invertedIndexColumns)
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns)
       throws Exception {
     boolean tableConfigured = false;
     File[] segments = new File(dataDir, tableName).listFiles();
@@ -161,7 +169,7 @@ public class PerfBenchmarkRunner extends 
AbstractBaseCommand implements Command
     for (File segment : segments) {
       SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segment);
       if (!tableConfigured) {
-        driver.configureTable(segmentMetadata.getTableName(), 
invertedIndexColumns);
+        driver.configureTable(segmentMetadata.getTableName(), 
invertedIndexColumns, bloomFilterColumns);
         tableConfigured = true;
       }
       driver.addSegment(segmentMetadata);
diff --git a/pom.xml b/pom.xml
index a482f7b..217eed0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -874,6 +874,7 @@
           <configuration>
             <source>${jdk.version}</source>
             <target>${jdk.version}</target>
+            <compilerVersion>${jdk.version}</compilerVersion>
           </configuration>
         </plugin>
         <plugin>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to