This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bcf592d43 Separate and parallelize BloomFilter based semgment pruner 
(#10660)
2bcf592d43 is described below

commit 2bcf592d433241b07dcaac82833df4b225617410
Author: Xiaobing <[email protected]>
AuthorDate: Fri Apr 28 13:05:21 2023 -0700

    Separate and parallelize BloomFilter based semgment pruner (#10660)
---
 .../core/operator/combine/BaseCombineOperator.java |   3 +-
 .../operator/combine/CombineOperatorUtils.java     |  20 --
 .../apache/pinot/core/plan/CombinePlanNode.java    |  86 ++----
 .../query/executor/ServerQueryExecutorV1Impl.java  |   3 +-
 .../query/pruner/BloomFilterSegmentPruner.java     | 264 +++++++++++++++++
 .../query/pruner/ColumnValueSegmentPruner.java     | 323 ++++-----------------
 .../pinot/core/query/pruner/SegmentPruner.java     |   7 +
 .../core/query/pruner/SegmentPrunerProvider.java   |   4 +-
 .../core/query/pruner/SegmentPrunerService.java    |  10 +-
 .../core/query/pruner/ValueBasedSegmentPruner.java | 240 +++++++++++++++
 .../pinot/core/util/QueryMultiThreadingUtils.java  | 113 +++++++
 .../combine/SelectionCombineOperatorTest.java      |  21 +-
 .../query/pruner/BloomFilterSegmentPrunerTest.java | 216 ++++++++++++++
 .../query/pruner/ColumnValueSegmentPrunerTest.java | 139 +++------
 .../pinot/util/QueryMultiThreadingUtilsTest.java   |  84 ++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +-
 16 files changed, 1063 insertions(+), 474 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index f98880aeb6..9390925bc9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -70,7 +71,7 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
     // NOTE: We split the query execution into multiple tasks, where each task 
handles the query execution on multiple
     //       (>=1) segments. These tasks are assigned to multiple execution 
threads so that they can run in parallel.
     //       The parallelism is bounded by the task count.
-    _numTasks = CombineOperatorUtils.getNumTasksForQuery(operators.size(), 
queryContext.getMaxExecutionThreads());
+    _numTasks = QueryMultiThreadingUtils.getNumTasksForQuery(operators.size(), 
queryContext.getMaxExecutionThreads());
 
     // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
     // returns. We need to ensure this because the main thread holds the 
reference to the segments. If a segment is
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
index 842f5053f7..eae5678f61 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
@@ -30,26 +30,6 @@ public class CombineOperatorUtils {
   private CombineOperatorUtils() {
   }
 
-  /**
-   * Use at most 10 or half of the processors threads for each query. If there 
are less than 2 processors, use 1 thread.
-   * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2 
in container based environment, e.g.
-   *          Kubernetes.
-   */
-  public static final int MAX_NUM_THREADS_PER_QUERY =
-      Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 
2));
-
-  /**
-   * Returns the number of tasks for the query execution. The tasks can be 
assigned to multiple execution threads so
-   * that they can run in parallel. The parallelism is bounded by the task 
count.
-   */
-  public static int getNumTasksForQuery(int numOperators, int 
maxExecutionThreads) {
-    if (maxExecutionThreads > 0) {
-      return Math.min(numOperators, maxExecutionThreads);
-    } else {
-      return Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY);
-    }
-  }
-
   /**
    * Sets the execution statistics into the results block.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 5a6542932e..3b860061d1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -20,12 +20,8 @@ package org.apache.pinot.core.plan;
 
 import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.context.ExpressionContext;
@@ -33,7 +29,6 @@ import 
org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.combine.AggregationCombineOperator;
 import org.apache.pinot.core.operator.combine.BaseCombineOperator;
-import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
 import org.apache.pinot.core.operator.combine.DistinctCombineOperator;
 import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
 import 
org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator;
@@ -46,7 +41,7 @@ import 
org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOpe
 import 
org.apache.pinot.core.operator.streaming.StreamingSelectionOrderByCombineOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.apache.pinot.spi.trace.InvocationRecording;
@@ -105,78 +100,31 @@ public class CombinePlanNode implements PlanNode {
       // Large number of plan nodes, run them in parallel
       // NOTE: Even if we get single executor thread, still run it using a 
separate thread so that the timeout can be
       //       honored
-
-      int maxExecutionThreads = _queryContext.getMaxExecutionThreads();
-      if (maxExecutionThreads <= 0) {
-        maxExecutionThreads = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY;
-      }
-      int numTasks =
-          Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / 
TARGET_NUM_PLANS_PER_THREAD, maxExecutionThreads);
+      int numTasks = QueryMultiThreadingUtils.getNumTasks(numPlanNodes, 
TARGET_NUM_PLANS_PER_THREAD,
+          _queryContext.getMaxExecutionThreads());
       recording.setNumTasks(numTasks);
-
-      // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
-      // returns. We need to ensure no execution left before the main thread 
returning because the main thread holds the
-      // reference to the segments, and if the segments are deleted/refreshed, 
the segments can be released after the
-      // main thread returns, which would lead to undefined behavior (even JVM 
crash) when executing queries against
-      // them.
-      Phaser phaser = new Phaser(1);
-
-      // Submit all jobs
-      Future[] futures = new Future[numTasks];
-      for (int i = 0; i < numTasks; i++) {
-        int index = i;
-        futures[i] = _executorService.submit(new 
TraceCallable<List<Operator>>() {
-          @Override
-          public List<Operator> callJob() {
-            try {
-              // Register the thread to the phaser.
-              // If the phaser is terminated (returning negative value) when 
trying to register the thread, that means
-              // the query execution has timed out, and the main thread has 
deregistered itself and returned the result.
-              // Directly return as no execution result will be taken.
-              if (phaser.register() < 0) {
-                return Collections.emptyList();
-              }
-
-              List<Operator> operators = new ArrayList<>();
-              for (int i = index; i < numPlanNodes; i += numTasks) {
-                operators.add(_planNodes.get(i).run());
-              }
-              return operators;
-            } finally {
-              phaser.arriveAndDeregister();
-            }
-          }
-        });
-      }
-
-      // Get all results
-      try {
-        for (Future future : futures) {
-          List<Operator> ops = (List<Operator>) 
future.get(_queryContext.getEndTimeMs() - System.currentTimeMillis(),
-              TimeUnit.MILLISECONDS);
-          operators.addAll(ops);
+      QueryMultiThreadingUtils.runTasksWithDeadline(numTasks, index -> {
+        List<Operator> ops = new ArrayList<>();
+        for (int i = index; i < numPlanNodes; i += numTasks) {
+          ops.add(_planNodes.get(i).run());
+        }
+        return ops;
+      }, taskRes -> {
+        if (taskRes != null) {
+          operators.addAll(taskRes);
         }
-      } catch (Exception e) {
+      }, e -> {
         // Future object will throw ExecutionException for execution 
exception, need to check the cause to determine
         // whether it is caused by bad query
         Throwable cause = e.getCause();
         if (cause instanceof BadQueryRequestException) {
           throw (BadQueryRequestException) cause;
-        } else if (e instanceof InterruptedException) {
-          throw new QueryCancelledException("Cancelled while running 
CombinePlanNode", e);
-        } else {
-          throw new RuntimeException("Caught exception while running 
CombinePlanNode.", e);
         }
-      } finally {
-        // Cancel all ongoing jobs
-        for (Future future : futures) {
-          if (!future.isDone()) {
-            future.cancel(true);
-          }
+        if (e instanceof InterruptedException) {
+          throw new QueryCancelledException("Cancelled while running 
CombinePlanNode", e);
         }
-        // Deregister the main thread and wait for all threads done
-        phaser.awaitAdvance(phaser.arriveAndDeregister());
-      }
+        throw new RuntimeException("Caught exception while running 
CombinePlanNode.", e);
+      }, _executorService, _queryContext.getEndTimeMs());
     }
 
     if (_streamObserver != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 1442a9ec2b..eae6732267 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -354,7 +354,8 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     TimerContext.Timer segmentPruneTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
     int numTotalSegments = indexSegments.size();
     SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics();
-    List<IndexSegment> selectedSegments = 
_segmentPrunerService.prune(indexSegments, queryContext, prunerStats);
+    List<IndexSegment> selectedSegments =
+        _segmentPrunerService.prune(indexSegments, queryContext, prunerStats, 
executorService);
     segmentPruneTimer.stopAndRecord();
     int numSelectedSegments = selectedSegments.size();
     LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
new file mode 100644
index 0000000000..6277aac8eb
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.prefetch.FetchPlanner;
+import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.segment.spi.FetchContext;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryCancelledException;
+
+
+/**
+ * The {@code BloomFilterSegmentPruner} prunes segments based on bloom filter 
for EQUALITY filter. Because the access
+ * to bloom filter data is required, segment pruning is done in parallel when 
the number of segments is large.
+ */
+@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
+public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner {
+  // Try to schedule 10 segments for each thread, or evenly distribute them to 
all MAX_NUM_THREADS_PER_QUERY threads.
+  // TODO: make this threshold configurable? threshold 10 is also used in 
CombinePlanNode, which accesses the
+  //       dictionary data to do query planning and if segments are more than 
10, planning is done in parallel.
+  private static final int TARGET_NUM_SEGMENTS_PER_THREAD = 10;
+
+  private FetchPlanner _fetchPlanner;
+
+  @Override
+  public void init(PinotConfiguration config) {
+    super.init(config);
+    _fetchPlanner = FetchPlannerRegistry.getPlanner();
+  }
+
+  @Override
+  protected boolean isApplicableToPredicate(Predicate predicate) {
+    // Only prune columns
+    if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) {
+      return false;
+    }
+    Predicate.Type predicateType = predicate.getType();
+    if (predicateType == Predicate.Type.EQ) {
+      return true;
+    }
+    if (predicateType == Predicate.Type.IN) {
+      List<String> values = ((InPredicate) predicate).getValues();
+      // Skip pruning when there are too many values in the IN predicate
+      if (values.size() <= _inPredicateThreshold) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query) {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    if (!query.isEnablePrefetch()) {
+      return super.prune(segments, query);
+    }
+    return prefetch(segments, query, fetchContexts -> {
+      int numSegments = segments.size();
+      FilterContext filter = Objects.requireNonNull(query.getFilter());
+      ValueCache cachedValues = new ValueCache();
+      Map<String, DataSource> dataSourceCache = new HashMap<>();
+      List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
+      for (int i = 0; i < numSegments; i++) {
+        dataSourceCache.clear();
+        IndexSegment segment = segments.get(i);
+        if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, 
dataSourceCache, cachedValues)) {
+          selectedSegments.add(segment);
+        }
+      }
+      return selectedSegments;
+    });
+  }
+
+  @Override
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query,
+      @Nullable ExecutorService executorService) {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    if (executorService == null || segments.size() <= 
TARGET_NUM_SEGMENTS_PER_THREAD) {
+      // If executor is not provided, or the number of segments is small, 
prune them sequentially
+      return prune(segments, query);
+    }
+    // With executor service and large number of segments, prune them in 
parallel.
+    // NOTE: Even if numTasks=1 i.e. we get a single executor thread, still 
run it using a separate thread so that
+    //       the timeout can be honored. For example, this may happen when 
there is only one processor.
+    int numTasks = QueryMultiThreadingUtils.getNumTasks(segments.size(), 
TARGET_NUM_SEGMENTS_PER_THREAD,
+        query.getMaxExecutionThreads());
+    if (!query.isEnablePrefetch()) {
+      return pruneInParallel(numTasks, segments, query, executorService, null);
+    }
+    return prefetch(segments, query,
+        fetchContexts -> pruneInParallel(numTasks, segments, query, 
executorService, fetchContexts));
+  }
+
+  private List<IndexSegment> pruneInParallel(int numTasks, List<IndexSegment> 
segments, QueryContext queryContext,
+      ExecutorService executorService, FetchContext[] fetchContexts) {
+    int numSegments = segments.size();
+    List<IndexSegment> allSelectedSegments = new ArrayList<>();
+    QueryMultiThreadingUtils.runTasksWithDeadline(numTasks, index -> {
+      FilterContext filter = Objects.requireNonNull(queryContext.getFilter());
+      ValueCache cachedValues = new ValueCache();
+      Map<String, DataSource> dataSourceCache = new HashMap<>();
+      List<IndexSegment> selectedSegments = new ArrayList<>();
+      for (int i = index; i < numSegments; i += numTasks) {
+        dataSourceCache.clear();
+        IndexSegment segment = segments.get(i);
+        FetchContext fetchContext = fetchContexts == null ? null : 
fetchContexts[i];
+        if (!pruneSegmentWithFetchContext(segment, fetchContext, filter, 
dataSourceCache, cachedValues)) {
+          selectedSegments.add(segment);
+        }
+      }
+      return selectedSegments;
+    }, taskRes -> {
+      if (taskRes != null) {
+        allSelectedSegments.addAll(taskRes);
+      }
+    }, e -> {
+      if (e instanceof InterruptedException) {
+        throw new QueryCancelledException("Cancelled while running 
BloomFilterSegmentPruner", e);
+      }
+      throw new RuntimeException("Caught exception while running 
BloomFilterSegmentPruner", e);
+    }, executorService, queryContext.getEndTimeMs());
+    return allSelectedSegments;
+  }
+
+  private List<IndexSegment> prefetch(List<IndexSegment> segments, 
QueryContext query,
+      Function<FetchContext[], List<IndexSegment>> pruneFunc) {
+    int numSegments = segments.size();
+    FetchContext[] fetchContexts = new FetchContext[numSegments];
+    try {
+      // Prefetch bloom filter for columns within the EQ/IN predicate if exists
+      for (int i = 0; i < numSegments; i++) {
+        IndexSegment segment = segments.get(i);
+        FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, 
query);
+        if (!fetchContext.isEmpty()) {
+          segment.prefetch(fetchContext);
+          fetchContexts[i] = fetchContext;
+        }
+      }
+      return pruneFunc.apply(fetchContexts);
+    } finally {
+      // Release the prefetched bloom filters
+      for (int i = 0; i < numSegments; i++) {
+        FetchContext fetchContext = fetchContexts[i];
+        if (fetchContext != null) {
+          segments.get(i).release(fetchContext);
+        }
+      }
+    }
+  }
+
+  private boolean pruneSegmentWithFetchContext(IndexSegment segment, 
FetchContext fetchContext, FilterContext filter,
+      Map<String, DataSource> dataSourceCache, ValueCache cachedValues) {
+    if (fetchContext == null) {
+      return pruneSegment(segment, filter, dataSourceCache, cachedValues);
+    }
+    try {
+      segment.acquire(fetchContext);
+      return pruneSegment(segment, filter, dataSourceCache, cachedValues);
+    } finally {
+      segment.release(fetchContext);
+    }
+  }
+
+  @Override
+  boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate, 
Map<String, DataSource> dataSourceCache,
+      ValueCache cachedValues) {
+    Predicate.Type predicateType = predicate.getType();
+    if (predicateType == Predicate.Type.EQ) {
+      return pruneEqPredicate(segment, (EqPredicate) predicate, 
dataSourceCache, cachedValues);
+    } else if (predicateType == Predicate.Type.IN) {
+      return pruneInPredicate(segment, (InPredicate) predicate, 
dataSourceCache, cachedValues);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * For EQ predicate, prune the segments based on column bloom filter.
+   */
+  private boolean pruneEqPredicate(IndexSegment segment, EqPredicate 
eqPredicate,
+      Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
+    String column = eqPredicate.getLhs().getIdentifier();
+    DataSource dataSource = segment instanceof ImmutableSegment ? 
segment.getDataSource(column)
+        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
+    // NOTE: Column must exist after DataSchemaSegmentPruner
+    assert dataSource != null;
+    DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+    ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, 
dataSourceMetadata.getDataType());
+    // Check bloom filter
+    BloomFilterReader bloomFilter = dataSource.getBloomFilter();
+    return bloomFilter != null && !cachedValue.mightBeContained(bloomFilter);
+  }
+
+  /**
+   * For IN predicate, prune the segments based on column bloom filter.
+   * NOTE: segments will not be pruned if the number of values is greater than 
the threshold.
+   */
+  private boolean pruneInPredicate(IndexSegment segment, InPredicate 
inPredicate,
+      Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
+    List<String> values = inPredicate.getValues();
+    // Skip pruning when there are too many values in the IN predicate
+    if (values.size() > _inPredicateThreshold) {
+      return false;
+    }
+    String column = inPredicate.getLhs().getIdentifier();
+    DataSource dataSource = segment instanceof ImmutableSegment ? 
segment.getDataSource(column)
+        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
+    // NOTE: Column must exist after DataSchemaSegmentPruner
+    assert dataSource != null;
+    DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+    List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate, 
dataSourceMetadata.getDataType());
+    // Check bloom filter
+    BloomFilterReader bloomFilter = dataSource.getBloomFilter();
+    if (bloomFilter == null) {
+      return false;
+    }
+    for (ValueCache.CachedValue value : cachedValues) {
+      if (value.mightBeContained(bloomFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 6ee9ba5c05..a2b949da9e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -18,45 +18,32 @@
  */
 package org.apache.pinot.core.query.pruner;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.predicate.EqPredicate;
 import org.apache.pinot.common.request.context.predicate.InPredicate;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
-import org.apache.pinot.core.query.prefetch.FetchPlanner;
-import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import 
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
-import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
 
 
 /**
  * The {@code ColumnValueSegmentPruner} is the segment pruner that prunes 
segments based on the value inside the filter.
+ * This pruner is supposed to use segment metadata like min/max or partition 
id only. Pruners that need to access
+ * segment data like bloom filter is implemented separately and called after 
this one to reduce required data access.
  * <ul>
  *   <li>
  *     For EQUALITY filter, prune the segment based on:
  *     <ul>
  *       <li>Column min/max value</li>
  *       <li>Column partition</li>
- *       <li>Column bloom filter</li>
  *     </ul>
  *   </li>
  *   <li>
@@ -68,126 +55,39 @@ import org.apache.pinot.spi.utils.CommonConstants.Server;
  * </ul>
  */
 @SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
-public class ColumnValueSegmentPruner implements SegmentPruner {
-
-  public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold";
-
-  private int _inPredicateThreshold;
-  private FetchPlanner _fetchPlanner;
-
-  @Override
-  public void init(PinotConfiguration config) {
-    _inPredicateThreshold =
-        config.getProperty(IN_PREDICATE_THRESHOLD, 
Server.DEFAULT_VALUE_PRUNER_IN_PREDICATE_THRESHOLD);
-    _fetchPlanner = FetchPlannerRegistry.getPlanner();
-  }
-
-  @Override
-  public boolean isApplicableTo(QueryContext query) {
-    return query.getFilter() != null;
-  }
-
+public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner {
   @Override
-  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query) {
-    if (segments.isEmpty()) {
-      return segments;
+  protected boolean isApplicableToPredicate(Predicate predicate) {
+    // Only prune columns
+    if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) {
+      return false;
     }
-    FilterContext filter = Objects.requireNonNull(query.getFilter());
-    ValueCache cachedValues = new ValueCache();
-    int numSegments = segments.size();
-    List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
-    if (query.isEnablePrefetch()) {
-      FetchContext[] fetchContexts = new FetchContext[numSegments];
-      try {
-        // Prefetch bloom filter for columns within the EQ/IN predicate if 
exists
-        for (int i = 0; i < numSegments; i++) {
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = 
_fetchPlanner.planFetchForPruning(segment, query);
-          if (!fetchContext.isEmpty()) {
-            segment.prefetch(fetchContext);
-            fetchContexts[i] = fetchContext;
-          }
-        }
-        // Prune segments
-        Map[] dataSourceCaches = new Map[numSegments];
-        for (int i = 0; i < numSegments; i++) {
-          dataSourceCaches[i] = new HashMap<>();
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = fetchContexts[i];
-          if (fetchContext != null) {
-            segment.acquire(fetchContext);
-            try {
-              if (!pruneSegment(segment, filter, dataSourceCaches[i], 
cachedValues)) {
-                selectedSegments.add(segment);
-              }
-            } finally {
-              segment.release(fetchContext);
-            }
-          } else {
-            if (!pruneSegment(segment, filter, dataSourceCaches[i], 
cachedValues)) {
-              selectedSegments.add(segment);
-            }
-          }
-        }
-      } finally {
-        // Release the prefetched bloom filters
-        for (int i = 0; i < numSegments; i++) {
-          FetchContext fetchContext = fetchContexts[i];
-          if (fetchContext != null) {
-            segments.get(i).release(fetchContext);
-          }
-        }
-      }
-    } else {
-      Map<String, DataSource> dataSourceCache = new HashMap<>();
-      for (IndexSegment segment : segments) {
-        dataSourceCache.clear();
-        if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
-          selectedSegments.add(segment);
-        }
+    Predicate.Type predicateType = predicate.getType();
+    if (predicateType == Predicate.Type.EQ || predicateType == 
Predicate.Type.RANGE) {
+      return true;
+    }
+    if (predicateType == Predicate.Type.IN) {
+      List<String> values = ((InPredicate) predicate).getValues();
+      // Skip pruning when there are too many values in the IN predicate
+      if (values.size() <= _inPredicateThreshold) {
+        return true;
       }
     }
-    return selectedSegments;
+    return false;
   }
 
-  private boolean pruneSegment(IndexSegment segment, FilterContext filter, 
Map<String, DataSource> dataSourceCache,
+  @Override
+  boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate, 
Map<String, DataSource> dataSourceCache,
       ValueCache cachedValues) {
-    switch (filter.getType()) {
-      case AND:
-        for (FilterContext child : filter.getChildren()) {
-          if (pruneSegment(segment, child, dataSourceCache, cachedValues)) {
-            return true;
-          }
-        }
-        return false;
-      case OR:
-        for (FilterContext child : filter.getChildren()) {
-          if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) {
-            return false;
-          }
-        }
-        return true;
-      case NOT:
-        // Do not prune NOT filter
-        return false;
-      case PREDICATE:
-        Predicate predicate = filter.getPredicate();
-        // Only prune columns
-        if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) 
{
-          return false;
-        }
-        Predicate.Type predicateType = predicate.getType();
-        if (predicateType == Predicate.Type.EQ) {
-          return pruneEqPredicate(segment, (EqPredicate) predicate, 
dataSourceCache, cachedValues);
-        } else if (predicateType == Predicate.Type.IN) {
-          return pruneInPredicate(segment, (InPredicate) predicate, 
dataSourceCache, cachedValues);
-        } else if (predicateType == Predicate.Type.RANGE) {
-          return pruneRangePredicate(segment, (RangePredicate) predicate, 
dataSourceCache);
-        } else {
-          return false;
-        }
-      default:
-        throw new IllegalStateException();
+    Predicate.Type predicateType = predicate.getType();
+    if (predicateType == Predicate.Type.EQ) {
+      return pruneEqPredicate(segment, (EqPredicate) predicate, 
dataSourceCache, cachedValues);
+    } else if (predicateType == Predicate.Type.IN) {
+      return pruneInPredicate(segment, (InPredicate) predicate, 
dataSourceCache, cachedValues);
+    } else if (predicateType == Predicate.Type.RANGE) {
+      return pruneRangePredicate(segment, (RangePredicate) predicate, 
dataSourceCache);
+    } else {
+      return false;
     }
   }
 
@@ -196,27 +96,22 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
    * <ul>
    *   <li>Column min/max value</li>
    *   <li>Column partition</li>
-   *   <li>Column bloom filter</li>
    * </ul>
    */
   private boolean pruneEqPredicate(IndexSegment segment, EqPredicate 
eqPredicate,
       Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
     String column = eqPredicate.getLhs().getIdentifier();
-    DataSource dataSource = segment instanceof ImmutableSegment
-        ? segment.getDataSource(column)
+    DataSource dataSource = segment instanceof ImmutableSegment ? 
segment.getDataSource(column)
         : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
     // NOTE: Column must exist after DataSchemaSegmentPruner
     assert dataSource != null;
     DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
     ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, 
dataSourceMetadata.getDataType());
-
     Comparable value = cachedValue.getComparableValue();
-
     // Check min/max value
     if (!checkMinMaxRange(dataSourceMetadata, value)) {
       return true;
     }
-
     // Check column partition
     PartitionFunction partitionFunction = 
dataSourceMetadata.getPartitionFunction();
     if (partitionFunction != null) {
@@ -226,15 +121,6 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
         return true;
       }
     }
-
-    // Check bloom filter
-    BloomFilterReader bloomFilter = dataSource.getBloomFilter();
-    if (bloomFilter != null) {
-      if (!cachedValue.mightBeContained(bloomFilter)) {
-        return true;
-      }
-    }
-
     return false;
   }
 
@@ -242,66 +128,26 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
    * For IN predicate, prune the segments based on:
    * <ul>
    *   <li>Column min/max value</li>
-   *   <li>Column bloom filter</li>
    * </ul>
    * <p>NOTE: segments will not be pruned if the number of values is greater 
than the threshold.
    */
   private boolean pruneInPredicate(IndexSegment segment, InPredicate 
inPredicate,
       Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
-    String column = inPredicate.getLhs().getIdentifier();
-    DataSource dataSource = segment instanceof ImmutableSegment
-        ? segment.getDataSource(column)
-        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
-    // NOTE: Column must exist after DataSchemaSegmentPruner
-    assert dataSource != null;
-    DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
     List<String> values = inPredicate.getValues();
-
     // Skip pruning when there are too many values in the IN predicate
     if (values.size() > _inPredicateThreshold) {
       return false;
     }
-
+    String column = inPredicate.getLhs().getIdentifier();
+    DataSource dataSource = segment instanceof ImmutableSegment ? 
segment.getDataSource(column)
+        : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
+    // NOTE: Column must exist after DataSchemaSegmentPruner
+    assert dataSource != null;
+    DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
     List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate, 
dataSourceMetadata.getDataType());
-
     // Check min/max value
-    boolean someInRange = false;
     for (ValueCache.CachedValue value : cachedValues) {
       if (checkMinMaxRange(dataSourceMetadata, value.getComparableValue())) {
-        someInRange = true;
-        break;
-      }
-    }
-    if (!someInRange) {
-      return true;
-    }
-
-    // Check bloom filter
-    BloomFilterReader bloomFilter = dataSource.getBloomFilter();
-    if (bloomFilter == null) {
-      return false;
-    }
-    for (ValueCache.CachedValue value : cachedValues) {
-      if (value.mightBeContained(bloomFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Returns {@code true} if the value is within the column's min/max value 
range, {@code false} otherwise.
-   */
-  private boolean checkMinMaxRange(DataSourceMetadata dataSourceMetadata, 
Comparable value) {
-    Comparable minValue = dataSourceMetadata.getMinValue();
-    if (minValue != null) {
-      if (value.compareTo(minValue) < 0) {
-        return false;
-      }
-    }
-    Comparable maxValue = dataSourceMetadata.getMaxValue();
-    if (maxValue != null) {
-      if (value.compareTo(maxValue) > 0) {
         return false;
       }
     }
@@ -317,8 +163,7 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
   private boolean pruneRangePredicate(IndexSegment segment, RangePredicate 
rangePredicate,
       Map<String, DataSource> dataSourceCache) {
     String column = rangePredicate.getLhs().getIdentifier();
-    DataSource dataSource = segment instanceof ImmutableSegment
-        ? segment.getDataSource(column)
+    DataSource dataSource = segment instanceof ImmutableSegment ? 
segment.getDataSource(column)
         : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
     // NOTE: Column must exist after DataSchemaSegmentPruner
     assert dataSource != null;
@@ -382,95 +227,25 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
         }
       }
     }
-
     return false;
   }
 
-  private static Comparable convertValue(String stringValue, DataType 
dataType) {
-    try {
-      return dataType.convertInternal(stringValue);
-    } catch (Exception e) {
-      throw new BadQueryRequestException(e);
-    }
-  }
-
-  private static class ValueCache {
-    // As Predicates are recursive structures, their hashCode is quite 
expensive.
-    // By using an IdentityHashMap here we don't need to iterate over the 
recursive
-    // structure. This is specially useful in the IN expression.
-    private final Map<Predicate, Object> _cache = new IdentityHashMap<>();
-
-    private CachedValue add(EqPredicate pred) {
-      CachedValue val = new CachedValue(pred.getValue());
-      _cache.put(pred, val);
-      return val;
-    }
-
-    private List<CachedValue> add(InPredicate pred) {
-      List<CachedValue> vals = new ArrayList<>(pred.getValues().size());
-      for (String value : pred.getValues()) {
-        vals.add(new CachedValue(value));
-      }
-      _cache.put(pred, vals);
-      return vals;
-    }
-
-    public CachedValue get(EqPredicate pred, DataType dt) {
-      CachedValue cachedValue = (CachedValue) _cache.get(pred);
-      if (cachedValue == null) {
-        cachedValue = add(pred);
-      }
-      cachedValue.ensureDataType(dt);
-      return cachedValue;
-    }
-
-    public List<CachedValue> get(InPredicate pred, DataType dt) {
-      List<CachedValue> cachedValues = (List<CachedValue>) _cache.get(pred);
-      if (cachedValues == null) {
-        cachedValues = add(pred);
-      }
-      for (CachedValue cachedValue : cachedValues) {
-        cachedValue.ensureDataType(dt);
+  /**
+   * Returns {@code true} if the value is within the column's min/max value 
range, {@code false} otherwise.
+   */
+  private boolean checkMinMaxRange(DataSourceMetadata dataSourceMetadata, 
Comparable value) {
+    Comparable minValue = dataSourceMetadata.getMinValue();
+    if (minValue != null) {
+      if (value.compareTo(minValue) < 0) {
+        return false;
       }
-      return cachedValues;
     }
-
-    public static class CachedValue {
-      private final Object _value;
-      private boolean _hashed = false;
-      private long _hash1;
-      private long _hash2;
-      private DataType _dt;
-      private Comparable _comparableValue;
-
-      private CachedValue(Object value) {
-        _value = value;
-      }
-
-      private Comparable getComparableValue() {
-        assert _dt != null;
-        return _comparableValue;
-      }
-
-      private void ensureDataType(DataType dt) {
-        if (dt != _dt) {
-          String strValue = _value.toString();
-          _dt = dt;
-          _comparableValue = convertValue(strValue, dt);
-          _hashed = false;
-        }
-      }
-
-      private boolean mightBeContained(BloomFilterReader bloomFilter) {
-        if (!_hashed) {
-          GuavaBloomFilterReaderUtils.Hash128AsLongs hash128AsLongs =
-              
GuavaBloomFilterReaderUtils.hashAsLongs(_comparableValue.toString());
-          _hash1 = hash128AsLongs.getHash1();
-          _hash2 = hash128AsLongs.getHash2();
-          _hashed = true;
-        }
-        return bloomFilter.mightContain(_hash1, _hash2);
+    Comparable maxValue = dataSourceMetadata.getMaxValue();
+    if (maxValue != null) {
+      if (value.compareTo(maxValue) > 0) {
+        return false;
       }
     }
+    return true;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
index 39d19c375d..db848fc915 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.core.query.pruner;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -45,4 +47,9 @@ public interface SegmentPruner {
    *                 TODO: Revisit this because the caller doesn't require not 
changing the input segments
    */
   List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query);
+
+  default List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query,
+      @Nullable ExecutorService executorService) {
+    return prune(segments, query);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
index e09d54590d..e9acb210ab 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
@@ -26,8 +26,6 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 
 /**
  * A static SegmentPrunerProvider will give SegmentPruner instance based on 
prunerClassName and configuration.
- *
- *
  */
 public class SegmentPrunerProvider {
   private SegmentPrunerProvider() {
@@ -36,10 +34,12 @@ public class SegmentPrunerProvider {
   private static final Map<String, Class<? extends SegmentPruner>> PRUNER_MAP 
= new HashMap<>();
 
   public static final String COLUMN_VALUE_SEGMENT_PRUNER_NAME = 
"columnvaluesegmentpruner";
+  public static final String BLOOM_FILTER_SEGMENT_PRUNER_NAME = 
"bloomfiltersegmentpruner";
   public static final String SELECTION_QUERY_SEGMENT_PRUNER_NAME = 
"selectionquerysegmentpruner";
 
   static {
     PRUNER_MAP.put(COLUMN_VALUE_SEGMENT_PRUNER_NAME, 
ColumnValueSegmentPruner.class);
+    PRUNER_MAP.put(BLOOM_FILTER_SEGMENT_PRUNER_NAME, 
BloomFilterSegmentPruner.class);
     PRUNER_MAP.put(SELECTION_QUERY_SEGMENT_PRUNER_NAME, 
SelectionQuerySegmentPruner.class);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index 5c79ca1c2d..4b775f67bd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.query.config.SegmentPrunerConfig;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -59,6 +61,7 @@ public class SegmentPrunerService {
             _prunerStatsUpdaters.put(pruner, 
SegmentPrunerStatistics::setLimitPruned);
             break;
           case SegmentPrunerProvider.COLUMN_VALUE_SEGMENT_PRUNER_NAME:
+          case SegmentPrunerProvider.BLOOM_FILTER_SEGMENT_PRUNER_NAME:
             _prunerStatsUpdaters.put(pruner, 
SegmentPrunerStatistics::setValuePruned);
             break;
           default:
@@ -96,6 +99,11 @@ public class SegmentPrunerService {
    *                 undefined way. Therefore, this list should not be used 
after calling this method.
    */
   public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query, SegmentPrunerStatistics stats) {
+    return prune(segments, query, stats, null);
+  }
+
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query, SegmentPrunerStatistics stats,
+      @Nullable ExecutorService executorService) {
     try (InvocationScope scope = 
Tracing.getTracer().createScope(SegmentPrunerService.class)) {
       segments = removeInvalidSegments(segments, query, stats);
       int invokedPrunersCount = 0;
@@ -105,7 +113,7 @@ public class SegmentPrunerService {
           try (InvocationScope prunerScope = 
Tracing.getTracer().createScope(segmentPruner.getClass())) {
             int originalSegmentsSize = segments.size();
             prunerScope.setNumSegments(originalSegmentsSize);
-            segments = segmentPruner.prune(segments, query);
+            segments = segmentPruner.prune(segments, query, executorService);
             _prunerStatsUpdaters.get(segmentPruner).accept(stats, 
originalSegmentsSize - segments.size());
           }
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
new file mode 100644
index 0000000000..76f1b2737b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+
+
+/**
+ * The {@code ValueBasedSegmentPruner} prunes segments based on values inside 
the filter and segment metadata and data.
+ */
+@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
+abstract public class ValueBasedSegmentPruner implements SegmentPruner {
+  public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold";
+  protected int _inPredicateThreshold;
+
+  @Override
+  public void init(PinotConfiguration config) {
+    _inPredicateThreshold =
+        config.getProperty(IN_PREDICATE_THRESHOLD, 
Server.DEFAULT_VALUE_PRUNER_IN_PREDICATE_THRESHOLD);
+  }
+
+  @Override
+  public boolean isApplicableTo(QueryContext query) {
+    if (query.getFilter() == null) {
+      return false;
+    }
+    return isApplicableToFilter(query.getFilter());
+  }
+
+  /**
+   * 1. NOT is not applicable for segment pruning;
+   * 2. For OR, if one of the child filter is not applicable for pruning, the 
parent filter is not applicable;
+   * 3. For AND, if one of the child filter is applicable for pruning, the 
parent filter is applicable, but it
+   *    doesn't mean this child filter can prune the segment.
+   * 4. The specific pruners decide their own applicable predicate types.
+   */
+  private boolean isApplicableToFilter(FilterContext filter) {
+    switch (filter.getType()) {
+      case AND:
+        for (FilterContext child : filter.getChildren()) {
+          if (isApplicableToFilter(child)) {
+            return true;
+          }
+        }
+        return false;
+      case OR:
+        for (FilterContext child : filter.getChildren()) {
+          if (!isApplicableToFilter(child)) {
+            return false;
+          }
+        }
+        return true;
+      case NOT:
+        // Do not prune NOT filter
+        return false;
+      case PREDICATE:
+        return isApplicableToPredicate(filter.getPredicate());
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  abstract boolean isApplicableToPredicate(Predicate predicate);
+
+  @Override
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query) {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    FilterContext filter = Objects.requireNonNull(query.getFilter());
+    ValueCache cachedValues = new ValueCache();
+    Map<String, DataSource> dataSourceCache = new HashMap<>();
+    List<IndexSegment> selectedSegments = new ArrayList<>(segments.size());
+    for (IndexSegment segment : segments) {
+      dataSourceCache.clear();
+      if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
+        selectedSegments.add(segment);
+      }
+    }
+    return selectedSegments;
+  }
+
+  protected boolean pruneSegment(IndexSegment segment, FilterContext filter, 
Map<String, DataSource> dataSourceCache,
+      ValueCache cachedValues) {
+    switch (filter.getType()) {
+      case AND:
+        for (FilterContext child : filter.getChildren()) {
+          if (pruneSegment(segment, child, dataSourceCache, cachedValues)) {
+            return true;
+          }
+        }
+        return false;
+      case OR:
+        for (FilterContext child : filter.getChildren()) {
+          if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) {
+            return false;
+          }
+        }
+        return true;
+      case NOT:
+        // Do not prune NOT filter
+        return false;
+      case PREDICATE:
+        Predicate predicate = filter.getPredicate();
+        // Only prune columns
+        if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) 
{
+          return false;
+        }
+        return pruneSegmentWithPredicate(segment, predicate, dataSourceCache, 
cachedValues);
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  abstract boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate 
predicate,
+      Map<String, DataSource> dataSourceCache, ValueCache cachedValues);
+
+  protected static Comparable convertValue(String stringValue, DataType 
dataType) {
+    try {
+      return dataType.convertInternal(stringValue);
+    } catch (Exception e) {
+      throw new BadQueryRequestException(e);
+    }
+  }
+
+  protected static class ValueCache {
+    // As Predicates are recursive structures, their hashCode is quite 
expensive.
+    // By using an IdentityHashMap here we don't need to iterate over the 
recursive
+    // structure. This is specially useful in the IN expression.
+    private final Map<Predicate, Object> _cache = new IdentityHashMap<>();
+
+    private CachedValue add(EqPredicate pred) {
+      CachedValue val = new CachedValue(pred.getValue());
+      _cache.put(pred, val);
+      return val;
+    }
+
+    private List<CachedValue> add(InPredicate pred) {
+      List<CachedValue> vals = new ArrayList<>(pred.getValues().size());
+      for (String value : pred.getValues()) {
+        vals.add(new CachedValue(value));
+      }
+      _cache.put(pred, vals);
+      return vals;
+    }
+
+    public CachedValue get(EqPredicate pred, DataType dt) {
+      CachedValue cachedValue = (CachedValue) _cache.get(pred);
+      if (cachedValue == null) {
+        cachedValue = add(pred);
+      }
+      cachedValue.ensureDataType(dt);
+      return cachedValue;
+    }
+
+    public List<CachedValue> get(InPredicate pred, DataType dt) {
+      List<CachedValue> cachedValues = (List<CachedValue>) _cache.get(pred);
+      if (cachedValues == null) {
+        cachedValues = add(pred);
+      }
+      for (CachedValue cachedValue : cachedValues) {
+        cachedValue.ensureDataType(dt);
+      }
+      return cachedValues;
+    }
+
+    public static class CachedValue {
+      private final Object _value;
+      private boolean _hashed = false;
+      private long _hash1;
+      private long _hash2;
+      private DataType _dt;
+      private Comparable _comparableValue;
+
+      private CachedValue(Object value) {
+        _value = value;
+      }
+
+      public Comparable getComparableValue() {
+        assert _dt != null;
+        return _comparableValue;
+      }
+
+      public void ensureDataType(DataType dt) {
+        if (dt != _dt) {
+          String strValue = _value.toString();
+          _dt = dt;
+          _comparableValue = convertValue(strValue, dt);
+          _hashed = false;
+        }
+      }
+
+      public boolean mightBeContained(BloomFilterReader bloomFilter) {
+        if (!_hashed) {
+          GuavaBloomFilterReaderUtils.Hash128AsLongs hash128AsLongs =
+              
GuavaBloomFilterReaderUtils.hashAsLongs(_comparableValue.toString());
+          _hash1 = hash128AsLongs.getHash1();
+          _hash2 = hash128AsLongs.getHash2();
+          _hashed = true;
+        }
+        return bloomFilter.mightContain(_hash1, _hash2);
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryMultiThreadingUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryMultiThreadingUtils.java
new file mode 100644
index 0000000000..bea9d0110c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryMultiThreadingUtils.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * The term `task` and `thread` are used interchangeably in the logic to 
parallelize CombinePlanNode and
+ * BaseCombineOperator. This class provides common methods used to set up the 
parallel processing.
+ */
+public class QueryMultiThreadingUtils {
+  private QueryMultiThreadingUtils() {
+  }
+
+  /**
+   * Use at most 10 or half of the processors threads for each query. If there 
are less than 2 processors, use 1
+   * thread.
+   * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2 
in container based environment, e.g.
+   * Kubernetes.
+   */
+  public static final int MAX_NUM_THREADS_PER_QUERY =
+      Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 
2));
+
+  /**
+   * Returns the number of tasks for the query execution. The tasks can be 
assigned to multiple execution threads so
+   * that they can run in parallel. The parallelism is bounded by the task 
count.
+   */
+  public static int getNumTasksForQuery(int numOperators, int 
maxExecutionThreads) {
+    return getNumTasks(numOperators, 1, maxExecutionThreads);
+  }
+
+  public static int getNumTasks(int numWorkUnits, int minUnitsPerThread, int 
maxExecutionThreads) {
+    if (numWorkUnits <= minUnitsPerThread) {
+      return 1;
+    }
+    if (maxExecutionThreads <= 0) {
+      maxExecutionThreads = MAX_NUM_THREADS_PER_QUERY;
+    }
+    return Math.min((numWorkUnits + minUnitsPerThread - 1) / 
minUnitsPerThread, maxExecutionThreads);
+  }
+
+  /**
+   * Use a Phaser to ensure all the Futures are done (not scheduled, finished 
or interrupted) before the main thread
+   * returns. We need to ensure no execution left before the main thread 
returning because the main thread holds the
+   * reference to the segments, and if the segments are deleted/refreshed, the 
segments can be released after the main
+   * thread returns, which would lead to undefined behavior (even JVM crash) 
when executing queries against them.
+   */
+  public static <T> void runTasksWithDeadline(int numTasks, Function<Integer, 
T> taskFunc, Consumer<T> resCollector,
+      Consumer<Exception> errHandler, ExecutorService executorService, long 
deadlineInMs) {
+    Phaser phaser = new Phaser(1);
+    List<Future<T>> futures = new ArrayList<>(numTasks);
+    for (int i = 0; i < numTasks; i++) {
+      int index = i;
+      futures.add(executorService.submit(new TraceCallable<T>() {
+        @Override
+        public T callJob() {
+          try {
+            // Register the thread to the phaser for main thread to wait for 
it to complete.
+            if (phaser.register() < 0) {
+              return null;
+            }
+            return taskFunc.apply(index);
+          } finally {
+            phaser.arriveAndDeregister();
+          }
+        }
+      }));
+    }
+    try {
+      // Check deadline while waiting for the task results.
+      for (Future<T> future : futures) {
+        T taskResult = future.get(deadlineInMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+        resCollector.accept(taskResult);
+      }
+    } catch (Exception e) {
+      errHandler.accept(e);
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future<T> future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      // Deregister the main thread and wait for all threads done
+      phaser.awaitAdvance(phaser.arriveAndDeregister());
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index f7a73647fa..3a4bdd116a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
@@ -74,7 +75,7 @@ public class SelectionCombineOperatorTest {
   private static final String SEGMENT_NAME_PREFIX = "testSegment_";
 
   // Create (MAX_NUM_THREADS_PER_QUERY * 2) segments so that each thread needs 
to process 2 segments
-  private static final int NUM_SEGMENTS = 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 2;
+  private static final int NUM_SEGMENTS = 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 2;
   private static final int NUM_CONSUMING_SEGMENTS = NUM_SEGMENTS / 2;
   private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME + 
"_REALTIME";
   private static final int NUM_RECORDS_PER_SEGMENT = 100;
@@ -177,27 +178,27 @@ public class SelectionCombineOperatorTest {
     // Should early-terminate after processing the result of the first 
segment. Each thread should process at most 1
     // segment.
     long numDocsScanned = combineResult.getNumDocsScanned();
-    assertTrue(numDocsScanned >= 10 && numDocsScanned <= 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10);
+    assertTrue(numDocsScanned >= 10 && numDocsScanned <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 10);
     assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
     assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
     assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
     assertEquals(combineResult.getNumConsumingSegmentsProcessed(), 
NUM_CONSUMING_SEGMENTS);
     int numSegmentsMatched = combineResult.getNumSegmentsMatched();
-    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
     // The check below depends on the order of segment processing. When 
segments# <= 10 (the value of
     // CombinePlanNode.TARGET_NUM_PLANS_PER_THREAD to be specific), the 
segments are processed in the order as they
     // are prepared, which is OFFLINE segments followed by RT segments and 
this case makes the value here equal to 0.
     // But when segments# > 10, the segments are processed in a different 
order and some RT segments can be processed
-    // ahead of the other OFFLINE segments, but no more than 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY for sure
+    // ahead of the other OFFLINE segments, but no more than 
TaskUtils.MAX_NUM_THREADS_PER_QUERY for sure
     // as each thread only processes one segment.
     int numConsumingSegmentsMatched = 
combineResult.getNumConsumingSegmentsMatched();
     if (NUM_SEGMENTS <= 10) {
       assertEquals(numConsumingSegmentsMatched, 0, "numSegments: " + 
NUM_SEGMENTS);
     } else {
       assertTrue(numConsumingSegmentsMatched >= 0
-          && numConsumingSegmentsMatched <= 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY, String
+          && numConsumingSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY, String
           .format("numConsumingSegmentsMatched: %d, maxThreadsPerQuery: %d, 
numSegments: %d",
-              combineResult.getNumConsumingSegmentsMatched(), 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY,
+              combineResult.getNumConsumingSegmentsMatched(), 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY,
               NUM_SEGMENTS));
     }
     assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
@@ -233,14 +234,14 @@ public class SelectionCombineOperatorTest {
     // segment.
     long numDocsScanned = combineResult.getNumDocsScanned();
     // Need to scan 10 documents per segment because 'intColumn' is sorted
-    assertTrue(numDocsScanned >= 10 && numDocsScanned <= 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10);
+    assertTrue(numDocsScanned >= 10 && numDocsScanned <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 10);
     assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
     assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
     assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
     assertEquals(combineResult.getNumConsumingSegmentsProcessed(), 
NUM_CONSUMING_SEGMENTS);
     assertEquals(combineResult.getNumConsumingSegmentsMatched(), 0);
     int numSegmentsMatched = combineResult.getNumSegmentsMatched();
-    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
     assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
 
     combineResult = getCombineResult("SELECT * FROM testTable ORDER BY 
intColumn DESC");
@@ -257,14 +258,14 @@ public class SelectionCombineOperatorTest {
     // segment.
     numDocsScanned = combineResult.getNumDocsScanned();
     assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
-        && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 
NUM_RECORDS_PER_SEGMENT);
+        && numDocsScanned <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
     assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
     assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
     assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
     assertEquals(combineResult.getNumConsumingSegmentsProcessed(), 
NUM_CONSUMING_SEGMENTS);
     assertEquals(combineResult.getNumConsumingSegmentsMatched(), 
NUM_CONSUMING_SEGMENTS);
     numSegmentsMatched = combineResult.getNumSegmentsMatched();
-    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
     assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
 
     combineResult = getCombineResult("SELECT * FROM testTable ORDER BY 
intColumn DESC LIMIT 10000");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
new file mode 100644
index 0000000000..75b2beea0f
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.OnHeapGuavaBloomFilterReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BloomFilterSegmentPrunerTest {
+  private static final BloomFilterSegmentPruner PRUNER = new 
BloomFilterSegmentPruner();
+
+  @BeforeClass
+  public void setUp() {
+    Map<String, Object> properties = new HashMap<>();
+    // override default value
+    properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
+    PinotConfiguration configuration = new PinotConfiguration(properties);
+    PRUNER.init(configuration);
+  }
+
+  @Test
+  public void testBloomFilterPruning()
+      throws IOException {
+    IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0", 
"3.0", "5.0", "7.0", "21.0"});
+
+    // all out the bloom filter
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 6.0"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (6.0)"));
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0.0 OR column = 6.0"));
+
+    // all in the bloom filter
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 OR column = 7.0"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5.0, 7.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 AND column = 7.0"));
+
+    // some in the bloom filter with IN/OR
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0.0, 3.0, 4.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 1.0"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (1.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21.0, 30.0)"));
+    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0 OR column = 30.0"));
+    // 30 out the bloom filter with AND
+    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0 AND column = 30.0"));
+  }
+
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testQueryTimeoutOnPruning()
+      throws IOException {
+    IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0", 
"3.0", "5.0", "7.0", "21.0"});
+    DataSource dataSource = mock(DataSource.class);
+    when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+    runPruner(Collections.singletonList(indexSegment),
+        "SELECT COUNT(*) FROM testTable WHERE column = 5.0 OR column = 0.0", 
1);
+  }
+
+  @Test
+  public void testParallelPrune()
+      throws IOException {
+    List<IndexSegment> segments = new ArrayList<>();
+    for (int i = 0; i < 35; i++) {
+      IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0", 
"3.0", "5.0", "7.0", "21.0"});
+      segments.add(indexSegment);
+    }
+    assertTrue(
+        runPruner(segments, "SELECT COUNT(*) FROM testTable WHERE column = 
21.0 AND column = 30.0", 5000).isEmpty());
+
+    IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0", 
"3.0", "5.0", "7.0", "21.0", "30.0"});
+    segments.add(indexSegment);
+    List<IndexSegment> selected =
+        runPruner(segments, "SELECT COUNT(*) FROM testTable WHERE column = 
21.0 AND column = 30.0", 5000);
+    assertEquals(selected.size(), 1);
+  }
+
+  @Test
+  public void testIsApplicableTo() {
+    // EQ and IN (with small number of values) are applicable for bloom filter 
based pruning.
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM 
testTable WHERE column = 1");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) 
FROM testTable WHERE column IN (1, 2)");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+
+    // NOT is not applicable
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) 
FROM testTable WHERE NOT column = 1");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+    // Too many values for IN clause
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column IN (1, 2, 3, 4, 5, 6, 
7)");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+    // Other predicate types are not applicable
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) 
FROM testTable WHERE column LIKE 5");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+
+    // AND with one applicable child filter is applicable
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column NOT IN (1, 2) AND column 
= 3");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+
+    // OR with one child filter that's not applicable is not applicable
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column = 3 OR column NOT IN (1, 
2)");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+
+    // Nested with AND/OR
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column = 3 OR (column NOT IN (1, 
2) AND column = 4)");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+  }
+
+  private IndexSegment mockIndexSegment(String[] values)
+      throws IOException {
+    IndexSegment indexSegment = mock(IndexSegment.class);
+    when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("column"));
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getTotalDocs()).thenReturn(20);
+    when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+
+    DataSource dataSource = mock(DataSource.class);
+    when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+    // Add support for bloom filter
+    DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
+    BloomFilterReaderBuilder builder = new BloomFilterReaderBuilder();
+    for (String v : values) {
+      builder.put(v);
+    }
+    when(dataSourceMetadata.getDataType()).thenReturn(DataType.DOUBLE);
+    when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
+    when(dataSource.getBloomFilter()).thenReturn(builder.build());
+
+    return indexSegment;
+  }
+
+  private boolean runPruner(IndexSegment segment, String query) {
+    return runPruner(Collections.singletonList(segment), query, 
5000).isEmpty();
+  }
+
+  private List<IndexSegment> runPruner(List<IndexSegment> segments, String 
query, long queryTimeout) {
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
+    queryContext.setEndTimeMs(System.currentTimeMillis() + queryTimeout);
+    return PRUNER.prune(segments, queryContext, 
Executors.newCachedThreadPool());
+  }
+
+  private static class BloomFilterReaderBuilder {
+    private BloomFilter<String> _bloomfilter = 
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 100, 0.01);
+    public BloomFilterReaderBuilder put(String value) {
+      _bloomfilter.put(value);
+      return this;
+    }
+
+    public BloomFilterReader build() throws IOException {
+      File file = Files.createTempFile("test", ".bloom").toFile();
+      try (FileOutputStream fos = new FileOutputStream(file)) {
+        _bloomfilter.writeTo(fos);
+        try (PinotDataBuffer pinotDataBuffer = 
PinotDataBuffer.loadBigEndianFile(file)) {
+          // on heap filter should never use the buffer, so we can close it 
and delete the file
+          return new OnHeapGuavaBloomFilterReader(pinotDataBuffer);
+        }
+      } finally {
+        file.delete();
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
index 5edbdb32a3..5a5d7bd505 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -18,30 +18,21 @@
  */
 package org.apache.pinot.core.query.pruner;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.hash.BloomFilter;
-import com.google.common.hash.Funnels;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import 
org.apache.pinot.segment.local.segment.index.readers.bloom.OnHeapGuavaBloomFilterReader;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -53,14 +44,17 @@ import static org.testng.Assert.assertTrue;
 public class ColumnValueSegmentPrunerTest {
   private static final ColumnValueSegmentPruner PRUNER = new 
ColumnValueSegmentPruner();
 
-  @Test
-  public void testMinMaxValuePruning() {
+  @BeforeClass
+  public void setUp() {
     Map<String, Object> properties = new HashMap<>();
-    //override default value
-    properties.put(PRUNER.IN_PREDICATE_THRESHOLD, 5);
+    // override default value
+    properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
     PinotConfiguration configuration = new PinotConfiguration(properties);
     PRUNER.init(configuration);
+  }
 
+  @Test
+  public void testMinMaxValuePruning() {
     IndexSegment indexSegment = mockIndexSegment();
 
     DataSource dataSource = mock(DataSource.class);
@@ -147,66 +141,42 @@ public class ColumnValueSegmentPrunerTest {
   }
 
   @Test
-  public void testBloomFilterPredicatePruning()
-      throws IOException {
-    Map<String, Object> properties = new HashMap<>();
-    // override default value
-    properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
-    PinotConfiguration configuration = new PinotConfiguration(properties);
-    PRUNER.init(configuration);
-
-    IndexSegment indexSegment = mockIndexSegment();
-    DataSource dataSource = mock(DataSource.class);
-    when(indexSegment.getDataSource("column")).thenReturn(dataSource);
-    // Add support for bloom filter
-    DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
-    BloomFilterReader bloomFilterReader = new BloomFilterReaderBuilder()
-        .put("1.0")
-        .put("2.0")
-        .put("3.0")
-        .put("5.0")
-        .put("7.0")
-        .put("21.0")
-        .build();
-
-    when(dataSourceMetadata.getDataType()).thenReturn(DataType.DOUBLE);
-    when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
-    when(dataSource.getBloomFilter()).thenReturn(bloomFilterReader);
-    when(dataSourceMetadata.getMinValue()).thenReturn(5.0);
-    when(dataSourceMetadata.getMaxValue()).thenReturn(10.0);
-
-    // all out the bloom filter and out of range
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0.0)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (0.0, 3.0, 4.0)"));
-
-    // some in the bloom filter but all out of range
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 1.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (1.0)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21.0)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (21.0, 30.0)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0 AND column = 30.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 21.0 OR column = 30.0"));
-
-    // all out the bloom filter but some in range
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 6.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (6.0)"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 0.0 OR column = 6.0"));
-
-    // all in the bloom filter and in range
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5.0)"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 OR column = 7.0"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (5.0, 7.0)"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 AND column = 7.0"));
-
-    // some in the bloom filter and in range
-    assertFalse(
-        runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE column 
IN (0.0, 5.0)"));
-    assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 OR column = 0.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column = 5.0 AND column = 0.0"));
-    assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE 
column IN (8.0, 10.0)"));
+  public void testIsApplicableTo() {
+    // EQ, RANGE and IN (with small number of values) are applicable for 
min/max/partitionId based pruning.
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM 
testTable WHERE column = 1");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) 
FROM testTable WHERE column IN (1, 2)");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+    queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM 
testTable WHERE column BETWEEN 1 AND 2");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+
+    // NOT is not applicable
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) 
FROM testTable WHERE NOT column = 1");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+    // Too many values for IN clause
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column IN (1, 2, 3, 4, 5, 6, 
7)");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+    // Other predicate types are not applicable
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) 
FROM testTable WHERE column LIKE 5");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+
+    // AND with one applicable child filter is applicable
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column NOT IN (1, 2) AND column 
= 3");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
+
+    // OR with one child filter that's not applicable is not applicable
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column = 3 OR column NOT IN (1, 
2)");
+    assertFalse(PRUNER.isApplicableTo(queryContext));
+
+    // Nested with AND/OR
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT COUNT(*) FROM testTable WHERE column = 3 OR (column NOT IN (1, 
2) AND column BETWEEN 4 AND 5)");
+    assertTrue(PRUNER.isApplicableTo(queryContext));
   }
 
   private IndexSegment mockIndexSegment() {
@@ -222,25 +192,4 @@ public class ColumnValueSegmentPrunerTest {
     QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
     return PRUNER.prune(Arrays.asList(indexSegment), queryContext).isEmpty();
   }
-
-  private static class BloomFilterReaderBuilder {
-    private BloomFilter<String> _bloomfilter = 
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 100, 0.01);
-    public BloomFilterReaderBuilder put(String value) {
-      _bloomfilter.put(value);
-      return this;
-    }
-
-    public BloomFilterReader build() throws IOException {
-      File file = Files.createTempFile("test", ".bloom").toFile();
-      try (FileOutputStream fos = new FileOutputStream(file)) {
-        _bloomfilter.writeTo(fos);
-        try (PinotDataBuffer pinotDataBuffer = 
PinotDataBuffer.loadBigEndianFile(file)) {
-          // on heap filter should never use the buffer, so we can close it 
and delete the file
-          return new OnHeapGuavaBloomFilterReader(pinotDataBuffer);
-        }
-      } finally {
-        file.delete();
-      }
-    }
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/util/QueryMultiThreadingUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/util/QueryMultiThreadingUtilsTest.java
new file mode 100644
index 0000000000..92ac06f967
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/util/QueryMultiThreadingUtilsTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class QueryMultiThreadingUtilsTest {
+  @Test
+  public void testGetNumTasksForQuery() {
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(1, 2), 1);
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(3, 2), 2);
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(0, -1), 
1);
+    int numOps = QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY;
+    // TaskUtils.MAX_NUM_THREADS_PER_QUERY at max
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(numOps + 
10, -1),
+        QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+    // But 1 at min
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(numOps - 
1, -1), Math.max(1, numOps - 1));
+  }
+
+  @Test
+  public void testGetNumTasks() {
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(2, 3, 4), 1);
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(7, 3, 4), 3);
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(9, 3, 4), 3);
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(10, 3, 4), 4);
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(100, 3, 4), 4);
+    int targetPerThread = 5;
+    int numWorkUnits = QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 
targetPerThread;
+    Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(numWorkUnits + 
10, targetPerThread, -1),
+        QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+  }
+
+  @Test
+  public void testRunTasksWithDeadline() {
+    ExecutorService exec = Executors.newCachedThreadPool();
+    AtomicInteger sum = new AtomicInteger(0);
+    QueryMultiThreadingUtils.runTasksWithDeadline(5, index -> index, 
sum::addAndGet, e -> {
+    }, exec, System.currentTimeMillis() + 500);
+    // sum of 0, 1, .., 4 indices.
+    Assert.assertEquals(sum.get(), 10);
+
+    // Task throws exception before timeout.
+    Exception[] err = new Exception[1];
+    QueryMultiThreadingUtils.runTasksWithDeadline(5, index -> {
+      throw new RuntimeException("oops: " + index);
+    }, sum::addAndGet, e -> err[0] = e, exec, System.currentTimeMillis() + 
500);
+    Assert.assertTrue(err[0].getMessage().contains("oops"));
+
+    // Task timed out.
+    QueryMultiThreadingUtils.runTasksWithDeadline(5, index -> {
+      try {
+        Thread.sleep(10_000);
+        return index;
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }, sum::addAndGet, e -> err[0] = e, exec, System.currentTimeMillis() + 
500);
+    Assert.assertTrue(err[0] instanceof TimeoutException);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0be6c57267..123a789bcb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -559,8 +559,10 @@ public class CommonConstants {
         "org.apache.pinot.server.starter.helix.HelixInstanceDataManager";
     public static final String DEFAULT_QUERY_EXECUTOR_CLASS =
         "org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl";
+    // The order of the pruners matters. Pruning with segment metadata ahead 
of those using segment data like bloom
+    // filters to reduce the required data access.
     public static final List<String> DEFAULT_QUERY_EXECUTOR_PRUNER_CLASS =
-        ImmutableList.of("ColumnValueSegmentPruner", 
"SelectionQuerySegmentPruner");
+        ImmutableList.of("ColumnValueSegmentPruner", 
"BloomFilterSegmentPruner", "SelectionQuerySegmentPruner");
     public static final String DEFAULT_QUERY_EXECUTOR_PLAN_MAKER_CLASS =
         "org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2";
     public static final long DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS = 15_000L;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to