This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a359c49787 Optimize ColumnValueSegmentPruner by caching value hashes 
(#8766)
a359c49787 is described below

commit a359c4978737053ed65f01adca36385de383e2d2
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue May 31 21:31:49 2022 +0200

    Optimize ColumnValueSegmentPruner by caching value hashes (#8766)
---
 .../query/pruner/ColumnValueSegmentPruner.java     | 162 ++++++++++++---
 .../query/pruner/ColumnValueSegmentPrunerTest.java | 100 ++++++----
 .../apache/pinot/perf/BenchmarkBloomFilter.java    | 154 +++++++++++++++
 .../perf/BenchmarkColumnValueSegmentPruner.java    | 220 +++++++++++++++++++++
 .../immutable/ImmutableSegmentImpl.java            |  18 +-
 .../readers/bloom/GuavaBloomFilterReaderUtils.java |  28 +++
 6 files changed, 615 insertions(+), 67 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index dc32c14851..08cc980637 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -34,7 +35,9 @@ import 
org.apache.pinot.common.request.context.predicate.InPredicate;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
 import org.apache.pinot.segment.spi.FetchContext;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
@@ -94,7 +97,8 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
     // Extract EQ/IN/RANGE predicate columns
     Set<String> eqInColumns = new HashSet<>();
     Set<String> rangeColumns = new HashSet<>();
-    extractPredicateColumns(filter, eqInColumns, rangeColumns);
+    ValueCache cachedValues = new ValueCache();
+    extractPredicateColumns(filter, eqInColumns, rangeColumns, cachedValues);
 
     if (eqInColumns.isEmpty() && rangeColumns.isEmpty()) {
       return segments;
@@ -102,6 +106,7 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
 
     int numSegments = segments.size();
     List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
+
     if (!eqInColumns.isEmpty() && query.isEnablePrefetch()) {
       Map[] dataSourceCaches = new Map[numSegments];
       FetchContext[] fetchContexts = new FetchContext[numSegments];
@@ -134,14 +139,14 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
           if (fetchContext != null) {
             segment.acquire(fetchContext);
             try {
-              if (!pruneSegment(segment, filter, dataSourceCaches[i])) {
+              if (!pruneSegment(segment, filter, dataSourceCaches[i], 
cachedValues)) {
                 selectedSegments.add(segment);
               }
             } finally {
               segment.release(fetchContext);
             }
           } else {
-            if (!pruneSegment(segment, filter, dataSourceCaches[i])) {
+            if (!pruneSegment(segment, filter, dataSourceCaches[i], 
cachedValues)) {
               selectedSegments.add(segment);
             }
           }
@@ -156,9 +161,10 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
         }
       }
     } else {
+      Map<String, DataSource> dataSourceCache = new HashMap<>();
       for (IndexSegment segment : segments) {
-        Map<String, DataSource> dataSourceCache = new HashMap<>();
-        if (!pruneSegment(segment, filter, dataSourceCache)) {
+        dataSourceCache.clear();
+        if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
           selectedSegments.add(segment);
         }
       }
@@ -169,12 +175,13 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
   /**
    * Extracts predicate columns from the given filter.
    */
-  private void extractPredicateColumns(FilterContext filter, Set<String> 
eqInColumns, Set<String> rangeColumns) {
+  private void extractPredicateColumns(FilterContext filter, Set<String> 
eqInColumns, Set<String> rangeColumns,
+      ValueCache valueCache) {
     switch (filter.getType()) {
       case AND:
       case OR:
         for (FilterContext child : filter.getChildren()) {
-          extractPredicateColumns(child, eqInColumns, rangeColumns);
+          extractPredicateColumns(child, eqInColumns, rangeColumns, 
valueCache);
         }
         break;
       case NOT:
@@ -191,11 +198,27 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
         String column = lhs.getIdentifier();
 
         Predicate.Type predicateType = predicate.getType();
-        if (predicateType == Predicate.Type.EQ || (predicateType == 
Predicate.Type.IN
-            && ((InPredicate) predicate).getValues().size() <= 
_inPredicateThreshold)) {
-          eqInColumns.add(column);
-        } else if (predicateType == Predicate.Type.RANGE) {
-          rangeColumns.add(column);
+        switch (predicateType) {
+          case EQ: {
+            eqInColumns.add(column);
+            valueCache.add((EqPredicate) predicate);
+            break;
+          }
+          case IN: {
+            InPredicate inPredicate = (InPredicate) predicate;
+            if (inPredicate.getValues().size() <= _inPredicateThreshold) {
+              eqInColumns.add(column);
+              valueCache.add(inPredicate);
+            }
+            break;
+          }
+          case RANGE: {
+            rangeColumns.add(column);
+            break;
+          }
+          default: {
+            break;
+          }
         }
         break;
       default:
@@ -203,18 +226,19 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
     }
   }
 
-  private boolean pruneSegment(IndexSegment segment, FilterContext filter, 
Map<String, DataSource> dataSourceCache) {
+  private boolean pruneSegment(IndexSegment segment, FilterContext filter, 
Map<String, DataSource> dataSourceCache,
+      ValueCache cachedValues) {
     switch (filter.getType()) {
       case AND:
         for (FilterContext child : filter.getChildren()) {
-          if (pruneSegment(segment, child, dataSourceCache)) {
+          if (pruneSegment(segment, child, dataSourceCache, cachedValues)) {
             return true;
           }
         }
         return false;
       case OR:
         for (FilterContext child : filter.getChildren()) {
-          if (!pruneSegment(segment, child, dataSourceCache)) {
+          if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) {
             return false;
           }
         }
@@ -230,9 +254,9 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
         }
         Predicate.Type predicateType = predicate.getType();
         if (predicateType == Predicate.Type.EQ) {
-          return pruneEqPredicate(segment, (EqPredicate) predicate, 
dataSourceCache);
+          return pruneEqPredicate(segment, (EqPredicate) predicate, 
dataSourceCache, cachedValues);
         } else if (predicateType == Predicate.Type.IN) {
-          return pruneInPredicate(segment, (InPredicate) predicate, 
dataSourceCache);
+          return pruneInPredicate(segment, (InPredicate) predicate, 
dataSourceCache, cachedValues);
         } else if (predicateType == Predicate.Type.RANGE) {
           return pruneRangePredicate(segment, (RangePredicate) predicate, 
dataSourceCache);
         } else {
@@ -252,13 +276,17 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
    * </ul>
    */
   private boolean pruneEqPredicate(IndexSegment segment, EqPredicate 
eqPredicate,
-      Map<String, DataSource> dataSourceCache) {
+      Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
     String column = eqPredicate.getLhs().getIdentifier();
-    DataSource dataSource = dataSourceCache.computeIfAbsent(column, 
segment::getDataSource);
+    DataSource dataSource = segment instanceof ImmutableSegment
+        ? segment.getDataSource(column)
+        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
     // NOTE: Column must exist after DataSchemaSegmentPruner
     assert dataSource != null;
     DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
-    Comparable value = convertValue(eqPredicate.getValue(), 
dataSourceMetadata.getDataType());
+    ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, 
dataSourceMetadata.getDataType());
+
+    Comparable value = cachedValue.getComparableValue();
 
     // Check min/max value
     if (!checkMinMaxRange(dataSourceMetadata, value)) {
@@ -278,7 +306,7 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
     // Check bloom filter
     BloomFilterReader bloomFilter = dataSource.getBloomFilter();
     if (bloomFilter != null) {
-      if (!bloomFilter.mightContain(value.toString())) {
+      if (!cachedValue.mightBeContained(bloomFilter)) {
         return true;
       }
     }
@@ -295,9 +323,11 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
    * <p>NOTE: segments will not be pruned if the number of values is greater 
than the threshold.
    */
   private boolean pruneInPredicate(IndexSegment segment, InPredicate 
inPredicate,
-      Map<String, DataSource> dataSourceCache) {
+      Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
     String column = inPredicate.getLhs().getIdentifier();
-    DataSource dataSource = dataSourceCache.computeIfAbsent(column, 
segment::getDataSource);
+    DataSource dataSource = segment instanceof ImmutableSegment
+        ? segment.getDataSource(column)
+        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
     // NOTE: Column must exist after DataSchemaSegmentPruner
     assert dataSource != null;
     DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
@@ -308,11 +338,12 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
       return false;
     }
 
+    List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate, 
dataSourceMetadata.getDataType());
+
     // Check min/max value
     boolean someInRange = false;
-    for (String value : values) {
-      Comparable inValue = convertValue(value, 
dataSourceMetadata.getDataType());
-      if (checkMinMaxRange(dataSourceMetadata, inValue)) {
+    for (ValueCache.CachedValue value : cachedValues) {
+      if (checkMinMaxRange(dataSourceMetadata, value.getComparableValue())) {
         someInRange = true;
         break;
       }
@@ -326,8 +357,8 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
     if (bloomFilter == null) {
       return false;
     }
-    for (String value : values) {
-      if (bloomFilter.mightContain(value)) {
+    for (ValueCache.CachedValue value : cachedValues) {
+      if (value.mightBeContained(bloomFilter)) {
         return false;
       }
     }
@@ -362,7 +393,9 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
   private boolean pruneRangePredicate(IndexSegment segment, RangePredicate 
rangePredicate,
       Map<String, DataSource> dataSourceCache) {
     String column = rangePredicate.getLhs().getIdentifier();
-    DataSource dataSource = dataSourceCache.computeIfAbsent(column, 
segment::getDataSource);
+    DataSource dataSource = segment instanceof ImmutableSegment
+        ? segment.getDataSource(column)
+        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
     // NOTE: Column must exist after DataSchemaSegmentPruner
     assert dataSource != null;
     DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
@@ -436,4 +469,75 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
       throw new BadQueryRequestException(e);
     }
   }
+
+  private static class ValueCache {
+    // As Predicates are recursive structures, their hashCode is quite 
expensive.
+    // By using an IdentityHashMap here we don't need to iterate over the 
recursive
+    // structure. This is specially useful in the IN expression.
+    private final Map<Predicate, Object> _cache = new IdentityHashMap<>();
+
+    public void add(EqPredicate pred) {
+      _cache.put(pred, new CachedValue(pred.getValue()));
+    }
+
+    public void add(InPredicate pred) {
+      List<CachedValue> list = new ArrayList<>(pred.getValues().size());
+      for (String value : pred.getValues()) {
+        list.add(new CachedValue(value));
+      }
+      _cache.put(pred, list);
+    }
+
+    public CachedValue get(EqPredicate pred, DataType dt) {
+      CachedValue cachedValue = (CachedValue) _cache.get(pred);
+      cachedValue.ensureDataType(dt);
+      return cachedValue;
+    }
+
+    public List<CachedValue> get(InPredicate pred, DataType dt) {
+      List<CachedValue> cachedValues = (List<CachedValue>) _cache.get(pred);
+      for (CachedValue cachedValue : cachedValues) {
+        cachedValue.ensureDataType(dt);
+      }
+      return cachedValues;
+    }
+
+    public static class CachedValue {
+      private final Object _value;
+      private boolean _hashed = false;
+      private long _hash1;
+      private long _hash2;
+      private DataType _dt;
+      private Comparable _comparableValue;
+
+      private CachedValue(Object value) {
+        _value = value;
+      }
+
+      private Comparable getComparableValue() {
+        assert _dt != null;
+        return _comparableValue;
+      }
+
+      private void ensureDataType(DataType dt) {
+        if (dt != _dt) {
+          String strValue = _value.toString();
+          _dt = dt;
+          _comparableValue = convertValue(strValue, dt);
+          _hashed = false;
+        }
+      }
+
+      private boolean mightBeContained(BloomFilterReader bloomFilter) {
+        if (!_hashed) {
+          GuavaBloomFilterReaderUtils.Hash128AsLongs hash128AsLongs =
+              
GuavaBloomFilterReaderUtils.hashAsLongs(_comparableValue.toString());
+          _hash1 = hash128AsLongs.getHash1();
+          _hash2 = hash128AsLongs.getHash2();
+          _hashed = true;
+        }
+        return bloomFilter.mightContain(_hash1, _hash2);
+      }
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
index dbfefdb961..5edbdb32a3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -18,18 +18,27 @@
  */
 package org.apache.pinot.core.query.pruner;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.OnHeapGuavaBloomFilterReader;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
 import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -138,7 +147,8 @@ public class ColumnValueSegmentPrunerTest {
   }
 
   @Test
-  public void testBloomFilterInPredicatePruning() {
+  public void testBloomFilterPredicatePruning()
+      throws IOException {
     Map<String, Object> properties = new HashMap<>();
     // override default value
     properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
@@ -150,52 +160,53 @@ public class ColumnValueSegmentPrunerTest {
     when(indexSegment.getDataSource("column")).thenReturn(dataSource);
     // Add support for bloom filter
     DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
-    BloomFilterReader bloomFilterReader = mock(BloomFilterReader.class);
-
-    when(dataSourceMetadata.getDataType()).thenReturn(DataType.INT);
+    BloomFilterReader bloomFilterReader = new BloomFilterReaderBuilder()
+        .put("1.0")
+        .put("2.0")
+        .put("3.0")
+        .put("5.0")
+        .put("7.0")
+        .put("21.0")
+        .build();
+
+    when(dataSourceMetadata.getDataType()).thenReturn(DataType.DOUBLE);
     when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
     when(dataSource.getBloomFilter()).thenReturn(bloomFilterReader);
-    when(bloomFilterReader.mightContain("1")).thenReturn(true);
-    when(bloomFilterReader.mightContain("2")).thenReturn(true);
-    when(bloomFilterReader.mightContain("3")).thenReturn(true);
-    when(bloomFilterReader.mightContain("5")).thenReturn(true);
-    when(bloomFilterReader.mightContain("7")).thenReturn(true);
-    when(bloomFilterReader.mightContain("21")).thenReturn(true);
-    when(dataSourceMetadata.getMinValue()).thenReturn(5);
-    when(dataSourceMetadata.getMaxValue()).thenReturn(10);
+    when(dataSourceMetadata.getMinValue()).thenReturn(5.0);
+    when(dataSourceMetadata.getMaxValue()).thenReturn(10.0);
 
     // all out the bloom filter and out of range
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0, 3, 4)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0.0, 3.0, 4.0)"));
 
     // some in the bloom filter but all out of range
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 1"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (1)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21, 30)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21 AND column = 30"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21 OR column = 30"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 1.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (1.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21.0, 30.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0 AND column = 30.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0 OR column = 30.0"));
 
     // all out the bloom filter but some in range
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 6"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (6)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0 OR column = 6"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 6.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (6.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0.0 OR column = 6.0"));
 
     // all in the bloom filter and in range
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5)"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5 OR column = 7"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5, 7)"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5 AND column = 7"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 OR column = 7.0"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5.0, 7.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 AND column = 7.0"));
 
     // some in the bloom filter and in range
     assertFalse(
-        runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE column 
IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5 OR column = 0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5 AND column = 0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (8, 10)"));
+        runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE column 
IN (0.0, 5.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 OR column = 0.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 AND column = 0.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (8.0, 10.0)"));
   }
 
   private IndexSegment mockIndexSegment() {
@@ -211,4 +222,25 @@ public class ColumnValueSegmentPrunerTest {
     QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
     return PRUNER.prune(Arrays.asList(indexSegment), queryContext).isEmpty();
   }
+
+  private static class BloomFilterReaderBuilder {
+    private BloomFilter<String> _bloomfilter = 
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 100, 0.01);
+    public BloomFilterReaderBuilder put(String value) {
+      _bloomfilter.put(value);
+      return this;
+    }
+
+    public BloomFilterReader build() throws IOException {
+      File file = Files.createTempFile("test", ".bloom").toFile();
+      try (FileOutputStream fos = new FileOutputStream(file)) {
+        _bloomfilter.writeTo(fos);
+        try (PinotDataBuffer pinotDataBuffer = 
PinotDataBuffer.loadBigEndianFile(file)) {
+          // on heap filter should never use the buffer, so we can close it 
and delete the file
+          return new OnHeapGuavaBloomFilterReader(pinotDataBuffer);
+        }
+      } finally {
+        file.delete();
+      }
+    }
+  }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkBloomFilter.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkBloomFilter.java
new file mode 100644
index 0000000000..fcc8145f42
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkBloomFilter.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import com.google.common.primitives.Longs;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.BaseGuavaBloomFilterReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.OffHeapGuavaBloomFilterReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.OnHeapGuavaBloomFilterReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.JavaFlightRecorderProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Fork(1)
+@Warmup(iterations = 500, time = 10, timeUnit = TimeUnit.MICROSECONDS)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkBloomFilter {
+
+  @Param(value = {OFF_HEAP})
+  private String _reader;
+
+  @Param(value = {"10000"})
+  private int _cardinality;
+  @Param(value = {"100", "1000"})
+  private int _maxSizeInBytes;
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkBloomFilter.class.getSimpleName())
+        .addProfiler(JavaFlightRecorderProfiler.class);
+    new Runner(opt.build()).run();
+  }
+
+  private BaseGuavaBloomFilterReader _actualReader;
+  private Supplier<String> _valueSupplier;
+  private String _value;
+  private long _valueLong1;
+  private long _valueLong2;
+
+  public static final String ON_HEAP = "onHeap";
+  public static final String OFF_HEAP = "offHeap";
+
+  private BaseGuavaBloomFilterReader loadReader(BloomFilter<CharSequence> 
bloomFilter)
+      throws IOException {
+
+    File file = Files.createTempFile("test", ".bloom").toFile();
+    file.deleteOnExit();
+    try (FileOutputStream fos = new FileOutputStream(file)) {
+      bloomFilter.writeTo(fos);
+    }
+
+    PinotDataBuffer pinotDataBuffer = PinotDataBuffer.loadBigEndianFile(file);
+
+    switch (_reader) {
+      case ON_HEAP:
+        return new OnHeapGuavaBloomFilterReader(pinotDataBuffer);
+      case OFF_HEAP:
+        return new OffHeapGuavaBloomFilterReader(pinotDataBuffer);
+      default:
+        throw new IllegalArgumentException("Value " + _reader + " not 
recognized");
+    }
+  }
+
+  @Setup
+  public void setUp()
+      throws IOException {
+    long seed = 0;
+    Random r = new Random(seed);
+    List<String> words =
+        
IntStream.generate(r::nextInt).limit(_cardinality).mapToObj(Integer::toString).collect(Collectors.toList());
+
+    double fpp = Math.max(0.01d, 
GuavaBloomFilterReaderUtils.computeFPP(_maxSizeInBytes, _cardinality));
+    BloomFilter<CharSequence> bloomFilter = BloomFilter.create(
+        Funnels.stringFunnel(StandardCharsets.UTF_8), _cardinality, fpp);
+    words.forEach(bloomFilter::put);
+
+    _actualReader = loadReader(bloomFilter);
+
+    _valueSupplier = () -> words.get(r.nextInt(_cardinality));
+  }
+
+  @Setup(Level.Iteration)
+  public void updateIndex() {
+    _value = _valueSupplier.get();
+
+    byte[] hash = GuavaBloomFilterReaderUtils.hash(_value);
+    _valueLong1 = Longs.fromBytes(hash[7], hash[6], hash[5], hash[4], hash[3], 
hash[2], hash[1], hash[0]);
+    _valueLong2 = Longs.fromBytes(hash[15], hash[14], hash[13], hash[12], 
hash[11], hash[10], hash[9], hash[8]);
+  }
+
+  @Benchmark
+  public boolean mightContainString() {
+    boolean result = _actualReader.mightContain(_value);
+    Preconditions.checkArgument(result);
+
+    return result;
+  }
+
+  @Benchmark
+  public boolean mightContainLongs() {
+    boolean result = _actualReader.mightContain(_valueLong1, _valueLong2);
+    Preconditions.checkArgument(result);
+
+    return result;
+  }
+}
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkColumnValueSegmentPruner.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkColumnValueSegmentPruner.java
new file mode 100644
index 0000000000..aae3ef5841
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkColumnValueSegmentPruner.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.query.pruner.ColumnValueSegmentPruner;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.JavaFlightRecorderProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Fork(1)
+@Warmup(iterations = 5, time = 2)
+@Measurement(iterations = 5, time = 2)
+@State(Scope.Benchmark)
+public class BenchmarkColumnValueSegmentPruner {
+
+  public static final String QUERY_1 = "SELECT * FROM MyTable WHERE SORTED_COL 
IN (1, 2, 3, 4)";
+
+  @Param({"10"})
+  private int _numRows;
+  @Param({"10", "100", "1000"})
+  private int _numSegments;
+
+  private String _query = QUERY_1;
+  String _scenario = "EXP(0.5)";
+  private List<IndexSegment> _indexSegments;
+  private LongSupplier _supplier;
+  private ColumnValueSegmentPruner _pruner;
+  private QueryContext _queryContext;
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkColumnValueSegmentPruner.class.getSimpleName());
+    if (args.length > 0 && args[0].equals("jfr")) {
+      opt = opt.addProfiler(JavaFlightRecorderProfiler.class)
+          .jvmArgsAppend("-XX:+UnlockDiagnosticVMOptions", 
"-XX:+DebugNonSafepoints");
+    }
+    new Runner(opt.build()).run();
+  }
+
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"BenchmarkServerSegmentPruner");
+  private static final String TABLE_NAME = "MyTable";
+  private static final String INT_COL_NAME = "INT_COL";
+  private static final String SORTED_COL_NAME = "SORTED_COL";
+  private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
+  private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
+  private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
+  private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
+  private static final String LOW_CARDINALITY_STRING_COL = 
"LOW_CARDINALITY_STRING_COL";
+
+
+  @Setup
+  public void setUp()
+      throws Exception {
+    _supplier = Distribution.createLongSupplier(42, _scenario);
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+
+    Set<String> invertedIndexCols = new HashSet<>();
+    invertedIndexCols.add(INT_COL_NAME);
+    invertedIndexCols.add(LOW_CARDINALITY_STRING_COL);
+
+    Map<String, BloomFilterConfig> bloomFilterConfigMap = new HashMap<>();
+    bloomFilterConfigMap.put(SORTED_COL_NAME, new 
BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 10000, false));
+
+    indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
+    indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+    indexLoadingConfig.setBloomFilterConfigs(bloomFilterConfigMap);
+
+    _indexSegments = new ArrayList<>();
+    for (int i = 0; i < _numSegments; i++) {
+      String name = "segment_" + i;
+      buildSegment(name);
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, 
name), indexLoadingConfig));
+    }
+
+    _pruner = new ColumnValueSegmentPruner();
+    _pruner.init(new PinotConfiguration());
+    _queryContext = QueryContextConverterUtils.getQueryContext(_query);
+  }
+
+  @TearDown
+  public void tearDown() {
+    for (IndexSegment indexSegment : _indexSegments) {
+      indexSegment.destroy();
+    }
+
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  private List<GenericRow> createTestData(int numRows) {
+    Map<Integer, String> strings = new HashMap<>();
+    List<GenericRow> rows = new ArrayList<>();
+    String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i -> 
"value" + i)
+        .toArray(String[]::new);
+    for (int i = 0; i < numRows; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(SORTED_COL_NAME, numRows - i);
+      row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
+      row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
+      row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
+      row.putValue(RAW_STRING_COL_NAME,
+          strings.computeIfAbsent((int) _supplier.getAsLong(), k -> 
UUID.randomUUID().toString()));
+      row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
+      row.putValue(LOW_CARDINALITY_STRING_COL, lowCardinalityValues[i % 
lowCardinalityValues.length]);
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  private void buildSegment(String segmentName)
+      throws Exception {
+    List<GenericRow> rows = createTestData(_numRows);
+    List<FieldConfig> fieldConfigs = new ArrayList<>();
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInvertedIndexColumns(Collections.singletonList(INT_COL_NAME))
+        .setFieldConfigList(fieldConfigs)
+        .setNoDictionaryColumns(Arrays.asList(RAW_INT_COL_NAME, 
RAW_STRING_COL_NAME))
+        .setSortedColumn(SORTED_COL_NAME)
+        
.setVarLengthDictionaryColumns(Collections.singletonList(SORTED_COL_NAME))
+        .setBloomFilterColumns(Collections.singletonList(SORTED_COL_NAME))
+        .setStarTreeIndexConfigs(Collections.singletonList(new 
StarTreeIndexConfig(
+            Arrays.asList(SORTED_COL_NAME, INT_COL_NAME), null, 
Collections.singletonList(
+                new AggregationFunctionColumnPair(AggregationFunctionType.SUM, 
RAW_INT_COL_NAME).toColumnName()),
+            Integer.MAX_VALUE)))
+        .build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(RAW_STRING_COL_NAME, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(NO_INDEX_STRING_COL, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, 
FieldSpec.DataType.STRING)
+        .build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(segmentName);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  @Benchmark
+  public List<IndexSegment> query() {
+    return _pruner.prune(_indexSegments, _queryContext);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index a11a9171d9..9569a91ec6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -20,10 +20,12 @@ package 
org.apache.pinot.segment.local.indexsegment.immutable;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.HashUtil;
 import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
@@ -52,6 +54,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment 
{
   private final SegmentMetadataImpl _segmentMetadata;
   private final Map<String, ColumnIndexContainer> _indexContainerMap;
   private final StarTreeIndexContainer _starTreeIndexContainer;
+  private final Map<String, DataSource> _dataSources;
 
   // For upsert
   private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
@@ -65,6 +68,12 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
     _segmentMetadata = segmentMetadata;
     _indexContainerMap = columnIndexContainerMap;
     _starTreeIndexContainer = starTreeIndexContainer;
+    _dataSources = new 
HashMap<>(HashUtil.getHashMapCapacity(segmentMetadata.getColumnMetadataMap().size()));
+
+    for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
+      String colName = entry.getKey();
+      _dataSources.put(colName, new ImmutableDataSource(entry.getValue(), 
_indexContainerMap.get(colName)));
+    }
   }
 
   /**
@@ -112,10 +121,11 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
 
   @Override
   public DataSource getDataSource(String column) {
-    ColumnMetadata columnMetadata = 
_segmentMetadata.getColumnMetadataFor(column);
-    Preconditions.checkNotNull(columnMetadata,
-        "ColumnMetadata for " + column + " should not be null. " + 
"Potentially invalid column name specified.");
-    return new ImmutableDataSource(columnMetadata, 
_indexContainerMap.get(column));
+    DataSource result = _dataSources.get(column);
+    Preconditions.checkNotNull(result,
+        "DataSource for %s should not be null. Potentially invalid column name 
specified.",
+        column);
+    return result;
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
index 46fd49790b..e5665eaf41 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
@@ -20,6 +20,7 @@ package 
org.apache.pinot.segment.local.segment.index.readers.bloom;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+import com.google.common.primitives.Longs;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -39,6 +40,15 @@ public class GuavaBloomFilterReaderUtils {
     return HASH_FUNCTION.hashBytes(value.getBytes(UTF_8)).asBytes();
   }
 
+  /**
+   *  Like {@link #hash(String)} but returns the hash as a {@link 
Hash128AsLongs}.
+   */
+  public static Hash128AsLongs hashAsLongs(String value) {
+    byte[] hash = hash(value);
+    return new Hash128AsLongs(Longs.fromBytes(hash[7], hash[6], hash[5], 
hash[4], hash[3], hash[2], hash[1], hash[0]),
+        Longs.fromBytes(hash[15], hash[14], hash[13], hash[12], hash[11], 
hash[10], hash[9], hash[8]));
+  }
+
   /* Cheat sheet:
 
      m: total bits
@@ -63,4 +73,22 @@ public class GuavaBloomFilterReaderUtils {
     double k = b * Math.log(2);
     return Math.pow(2, -k);
   }
+
+  public static class Hash128AsLongs {
+    private final long _hash1;
+    private final long _hash2;
+
+    private Hash128AsLongs(long hash1, long hash2) {
+      _hash1 = hash1;
+      _hash2 = hash2;
+    }
+
+    public long getHash1() {
+      return _hash1;
+    }
+
+    public long getHash2() {
+      return _hash2;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to