This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2bcf592d43 Separate and parallelize BloomFilter based semgment pruner
(#10660)
2bcf592d43 is described below
commit 2bcf592d433241b07dcaac82833df4b225617410
Author: Xiaobing <[email protected]>
AuthorDate: Fri Apr 28 13:05:21 2023 -0700
Separate and parallelize BloomFilter based semgment pruner (#10660)
---
.../core/operator/combine/BaseCombineOperator.java | 3 +-
.../operator/combine/CombineOperatorUtils.java | 20 --
.../apache/pinot/core/plan/CombinePlanNode.java | 86 ++----
.../query/executor/ServerQueryExecutorV1Impl.java | 3 +-
.../query/pruner/BloomFilterSegmentPruner.java | 264 +++++++++++++++++
.../query/pruner/ColumnValueSegmentPruner.java | 323 ++++-----------------
.../pinot/core/query/pruner/SegmentPruner.java | 7 +
.../core/query/pruner/SegmentPrunerProvider.java | 4 +-
.../core/query/pruner/SegmentPrunerService.java | 10 +-
.../core/query/pruner/ValueBasedSegmentPruner.java | 240 +++++++++++++++
.../pinot/core/util/QueryMultiThreadingUtils.java | 113 +++++++
.../combine/SelectionCombineOperatorTest.java | 21 +-
.../query/pruner/BloomFilterSegmentPrunerTest.java | 216 ++++++++++++++
.../query/pruner/ColumnValueSegmentPrunerTest.java | 139 +++------
.../pinot/util/QueryMultiThreadingUtilsTest.java | 84 ++++++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +-
16 files changed, 1063 insertions(+), 474 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index f98880aeb6..9390925bc9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -70,7 +71,7 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
// NOTE: We split the query execution into multiple tasks, where each task
handles the query execution on multiple
// (>=1) segments. These tasks are assigned to multiple execution
threads so that they can run in parallel.
// The parallelism is bounded by the task count.
- _numTasks = CombineOperatorUtils.getNumTasksForQuery(operators.size(),
queryContext.getMaxExecutionThreads());
+ _numTasks = QueryMultiThreadingUtils.getNumTasksForQuery(operators.size(),
queryContext.getMaxExecutionThreads());
// Use a Phaser to ensure all the Futures are done (not scheduled,
finished or interrupted) before the main thread
// returns. We need to ensure this because the main thread holds the
reference to the segments. If a segment is
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
index 842f5053f7..eae5678f61 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
@@ -30,26 +30,6 @@ public class CombineOperatorUtils {
private CombineOperatorUtils() {
}
- /**
- * Use at most 10 or half of the processors threads for each query. If there
are less than 2 processors, use 1 thread.
- * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2
in container based environment, e.g.
- * Kubernetes.
- */
- public static final int MAX_NUM_THREADS_PER_QUERY =
- Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() /
2));
-
- /**
- * Returns the number of tasks for the query execution. The tasks can be
assigned to multiple execution threads so
- * that they can run in parallel. The parallelism is bounded by the task
count.
- */
- public static int getNumTasksForQuery(int numOperators, int
maxExecutionThreads) {
- if (maxExecutionThreads > 0) {
- return Math.min(numOperators, maxExecutionThreads);
- } else {
- return Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY);
- }
- }
-
/**
* Sets the execution statistics into the results block.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 5a6542932e..3b860061d1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -20,12 +20,8 @@ package org.apache.pinot.core.plan;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.context.ExpressionContext;
@@ -33,7 +29,6 @@ import
org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.combine.AggregationCombineOperator;
import org.apache.pinot.core.operator.combine.BaseCombineOperator;
-import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
import org.apache.pinot.core.operator.combine.DistinctCombineOperator;
import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
import
org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator;
@@ -46,7 +41,7 @@ import
org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOpe
import
org.apache.pinot.core.operator.streaming.StreamingSelectionOrderByCombineOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.InvocationRecording;
@@ -105,78 +100,31 @@ public class CombinePlanNode implements PlanNode {
// Large number of plan nodes, run them in parallel
// NOTE: Even if we get single executor thread, still run it using a
separate thread so that the timeout can be
// honored
-
- int maxExecutionThreads = _queryContext.getMaxExecutionThreads();
- if (maxExecutionThreads <= 0) {
- maxExecutionThreads = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY;
- }
- int numTasks =
- Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) /
TARGET_NUM_PLANS_PER_THREAD, maxExecutionThreads);
+ int numTasks = QueryMultiThreadingUtils.getNumTasks(numPlanNodes,
TARGET_NUM_PLANS_PER_THREAD,
+ _queryContext.getMaxExecutionThreads());
recording.setNumTasks(numTasks);
-
- // Use a Phaser to ensure all the Futures are done (not scheduled,
finished or interrupted) before the main thread
- // returns. We need to ensure no execution left before the main thread
returning because the main thread holds the
- // reference to the segments, and if the segments are deleted/refreshed,
the segments can be released after the
- // main thread returns, which would lead to undefined behavior (even JVM
crash) when executing queries against
- // them.
- Phaser phaser = new Phaser(1);
-
- // Submit all jobs
- Future[] futures = new Future[numTasks];
- for (int i = 0; i < numTasks; i++) {
- int index = i;
- futures[i] = _executorService.submit(new
TraceCallable<List<Operator>>() {
- @Override
- public List<Operator> callJob() {
- try {
- // Register the thread to the phaser.
- // If the phaser is terminated (returning negative value) when
trying to register the thread, that means
- // the query execution has timed out, and the main thread has
deregistered itself and returned the result.
- // Directly return as no execution result will be taken.
- if (phaser.register() < 0) {
- return Collections.emptyList();
- }
-
- List<Operator> operators = new ArrayList<>();
- for (int i = index; i < numPlanNodes; i += numTasks) {
- operators.add(_planNodes.get(i).run());
- }
- return operators;
- } finally {
- phaser.arriveAndDeregister();
- }
- }
- });
- }
-
- // Get all results
- try {
- for (Future future : futures) {
- List<Operator> ops = (List<Operator>)
future.get(_queryContext.getEndTimeMs() - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- operators.addAll(ops);
+ QueryMultiThreadingUtils.runTasksWithDeadline(numTasks, index -> {
+ List<Operator> ops = new ArrayList<>();
+ for (int i = index; i < numPlanNodes; i += numTasks) {
+ ops.add(_planNodes.get(i).run());
+ }
+ return ops;
+ }, taskRes -> {
+ if (taskRes != null) {
+ operators.addAll(taskRes);
}
- } catch (Exception e) {
+ }, e -> {
// Future object will throw ExecutionException for execution
exception, need to check the cause to determine
// whether it is caused by bad query
Throwable cause = e.getCause();
if (cause instanceof BadQueryRequestException) {
throw (BadQueryRequestException) cause;
- } else if (e instanceof InterruptedException) {
- throw new QueryCancelledException("Cancelled while running
CombinePlanNode", e);
- } else {
- throw new RuntimeException("Caught exception while running
CombinePlanNode.", e);
}
- } finally {
- // Cancel all ongoing jobs
- for (Future future : futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
+ if (e instanceof InterruptedException) {
+ throw new QueryCancelledException("Cancelled while running
CombinePlanNode", e);
}
- // Deregister the main thread and wait for all threads done
- phaser.awaitAdvance(phaser.arriveAndDeregister());
- }
+ throw new RuntimeException("Caught exception while running
CombinePlanNode.", e);
+ }, _executorService, _queryContext.getEndTimeMs());
}
if (_streamObserver != null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 1442a9ec2b..eae6732267 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -354,7 +354,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
int numTotalSegments = indexSegments.size();
SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics();
- List<IndexSegment> selectedSegments =
_segmentPrunerService.prune(indexSegments, queryContext, prunerStats);
+ List<IndexSegment> selectedSegments =
+ _segmentPrunerService.prune(indexSegments, queryContext, prunerStats,
executorService);
segmentPruneTimer.stopAndRecord();
int numSelectedSegments = selectedSegments.size();
LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
new file mode 100644
index 0000000000..6277aac8eb
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.prefetch.FetchPlanner;
+import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.segment.spi.FetchContext;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryCancelledException;
+
+
+/**
+ * The {@code BloomFilterSegmentPruner} prunes segments based on bloom filter
for EQUALITY filter. Because the access
+ * to bloom filter data is required, segment pruning is done in parallel when
the number of segments is large.
+ */
+@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
+public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner {
+ // Try to schedule 10 segments for each thread, or evenly distribute them to
all MAX_NUM_THREADS_PER_QUERY threads.
+ // TODO: make this threshold configurable? threshold 10 is also used in
CombinePlanNode, which accesses the
+ // dictionary data to do query planning and if segments are more than
10, planning is done in parallel.
+ private static final int TARGET_NUM_SEGMENTS_PER_THREAD = 10;
+
+ private FetchPlanner _fetchPlanner;
+
+ @Override
+ public void init(PinotConfiguration config) {
+ super.init(config);
+ _fetchPlanner = FetchPlannerRegistry.getPlanner();
+ }
+
+ @Override
+ protected boolean isApplicableToPredicate(Predicate predicate) {
+ // Only prune columns
+ if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) {
+ return false;
+ }
+ Predicate.Type predicateType = predicate.getType();
+ if (predicateType == Predicate.Type.EQ) {
+ return true;
+ }
+ if (predicateType == Predicate.Type.IN) {
+ List<String> values = ((InPredicate) predicate).getValues();
+ // Skip pruning when there are too many values in the IN predicate
+ if (values.size() <= _inPredicateThreshold) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query) {
+ if (segments.isEmpty()) {
+ return segments;
+ }
+ if (!query.isEnablePrefetch()) {
+ return super.prune(segments, query);
+ }
+ return prefetch(segments, query, fetchContexts -> {
+ int numSegments = segments.size();
+ FilterContext filter = Objects.requireNonNull(query.getFilter());
+ ValueCache cachedValues = new ValueCache();
+ Map<String, DataSource> dataSourceCache = new HashMap<>();
+ List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ dataSourceCache.clear();
+ IndexSegment segment = segments.get(i);
+ if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter,
dataSourceCache, cachedValues)) {
+ selectedSegments.add(segment);
+ }
+ }
+ return selectedSegments;
+ });
+ }
+
+ @Override
+ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query,
+ @Nullable ExecutorService executorService) {
+ if (segments.isEmpty()) {
+ return segments;
+ }
+ if (executorService == null || segments.size() <=
TARGET_NUM_SEGMENTS_PER_THREAD) {
+ // If executor is not provided, or the number of segments is small,
prune them sequentially
+ return prune(segments, query);
+ }
+ // With executor service and large number of segments, prune them in
parallel.
+ // NOTE: Even if numTasks=1 i.e. we get a single executor thread, still
run it using a separate thread so that
+ // the timeout can be honored. For example, this may happen when
there is only one processor.
+ int numTasks = QueryMultiThreadingUtils.getNumTasks(segments.size(),
TARGET_NUM_SEGMENTS_PER_THREAD,
+ query.getMaxExecutionThreads());
+ if (!query.isEnablePrefetch()) {
+ return pruneInParallel(numTasks, segments, query, executorService, null);
+ }
+ return prefetch(segments, query,
+ fetchContexts -> pruneInParallel(numTasks, segments, query,
executorService, fetchContexts));
+ }
+
+ private List<IndexSegment> pruneInParallel(int numTasks, List<IndexSegment>
segments, QueryContext queryContext,
+ ExecutorService executorService, FetchContext[] fetchContexts) {
+ int numSegments = segments.size();
+ List<IndexSegment> allSelectedSegments = new ArrayList<>();
+ QueryMultiThreadingUtils.runTasksWithDeadline(numTasks, index -> {
+ FilterContext filter = Objects.requireNonNull(queryContext.getFilter());
+ ValueCache cachedValues = new ValueCache();
+ Map<String, DataSource> dataSourceCache = new HashMap<>();
+ List<IndexSegment> selectedSegments = new ArrayList<>();
+ for (int i = index; i < numSegments; i += numTasks) {
+ dataSourceCache.clear();
+ IndexSegment segment = segments.get(i);
+ FetchContext fetchContext = fetchContexts == null ? null :
fetchContexts[i];
+ if (!pruneSegmentWithFetchContext(segment, fetchContext, filter,
dataSourceCache, cachedValues)) {
+ selectedSegments.add(segment);
+ }
+ }
+ return selectedSegments;
+ }, taskRes -> {
+ if (taskRes != null) {
+ allSelectedSegments.addAll(taskRes);
+ }
+ }, e -> {
+ if (e instanceof InterruptedException) {
+ throw new QueryCancelledException("Cancelled while running
BloomFilterSegmentPruner", e);
+ }
+ throw new RuntimeException("Caught exception while running
BloomFilterSegmentPruner", e);
+ }, executorService, queryContext.getEndTimeMs());
+ return allSelectedSegments;
+ }
+
+ private List<IndexSegment> prefetch(List<IndexSegment> segments,
QueryContext query,
+ Function<FetchContext[], List<IndexSegment>> pruneFunc) {
+ int numSegments = segments.size();
+ FetchContext[] fetchContexts = new FetchContext[numSegments];
+ try {
+ // Prefetch bloom filter for columns within the EQ/IN predicate if exists
+ for (int i = 0; i < numSegments; i++) {
+ IndexSegment segment = segments.get(i);
+ FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment,
query);
+ if (!fetchContext.isEmpty()) {
+ segment.prefetch(fetchContext);
+ fetchContexts[i] = fetchContext;
+ }
+ }
+ return pruneFunc.apply(fetchContexts);
+ } finally {
+ // Release the prefetched bloom filters
+ for (int i = 0; i < numSegments; i++) {
+ FetchContext fetchContext = fetchContexts[i];
+ if (fetchContext != null) {
+ segments.get(i).release(fetchContext);
+ }
+ }
+ }
+ }
+
+ private boolean pruneSegmentWithFetchContext(IndexSegment segment,
FetchContext fetchContext, FilterContext filter,
+ Map<String, DataSource> dataSourceCache, ValueCache cachedValues) {
+ if (fetchContext == null) {
+ return pruneSegment(segment, filter, dataSourceCache, cachedValues);
+ }
+ try {
+ segment.acquire(fetchContext);
+ return pruneSegment(segment, filter, dataSourceCache, cachedValues);
+ } finally {
+ segment.release(fetchContext);
+ }
+ }
+
+ @Override
+ boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate,
Map<String, DataSource> dataSourceCache,
+ ValueCache cachedValues) {
+ Predicate.Type predicateType = predicate.getType();
+ if (predicateType == Predicate.Type.EQ) {
+ return pruneEqPredicate(segment, (EqPredicate) predicate,
dataSourceCache, cachedValues);
+ } else if (predicateType == Predicate.Type.IN) {
+ return pruneInPredicate(segment, (InPredicate) predicate,
dataSourceCache, cachedValues);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * For EQ predicate, prune the segments based on column bloom filter.
+ */
+ private boolean pruneEqPredicate(IndexSegment segment, EqPredicate
eqPredicate,
+ Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
+ String column = eqPredicate.getLhs().getIdentifier();
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
+ : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
+ // NOTE: Column must exist after DataSchemaSegmentPruner
+ assert dataSource != null;
+ DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+ ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate,
dataSourceMetadata.getDataType());
+ // Check bloom filter
+ BloomFilterReader bloomFilter = dataSource.getBloomFilter();
+ return bloomFilter != null && !cachedValue.mightBeContained(bloomFilter);
+ }
+
+ /**
+ * For IN predicate, prune the segments based on column bloom filter.
+ * NOTE: segments will not be pruned if the number of values is greater than
the threshold.
+ */
+ private boolean pruneInPredicate(IndexSegment segment, InPredicate
inPredicate,
+ Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
+ List<String> values = inPredicate.getValues();
+ // Skip pruning when there are too many values in the IN predicate
+ if (values.size() > _inPredicateThreshold) {
+ return false;
+ }
+ String column = inPredicate.getLhs().getIdentifier();
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
+ : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
+ // NOTE: Column must exist after DataSchemaSegmentPruner
+ assert dataSource != null;
+ DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+ List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate,
dataSourceMetadata.getDataType());
+ // Check bloom filter
+ BloomFilterReader bloomFilter = dataSource.getBloomFilter();
+ if (bloomFilter == null) {
+ return false;
+ }
+ for (ValueCache.CachedValue value : cachedValues) {
+ if (value.mightBeContained(bloomFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 6ee9ba5c05..a2b949da9e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -18,45 +18,32 @@
*/
package org.apache.pinot.core.query.pruner;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.request.context.predicate.RangePredicate;
-import org.apache.pinot.core.query.prefetch.FetchPlanner;
-import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
-import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
/**
* The {@code ColumnValueSegmentPruner} is the segment pruner that prunes
segments based on the value inside the filter.
+ * This pruner is supposed to use segment metadata like min/max or partition
id only. Pruners that need to access
+ * segment data like bloom filter is implemented separately and called after
this one to reduce required data access.
* <ul>
* <li>
* For EQUALITY filter, prune the segment based on:
* <ul>
* <li>Column min/max value</li>
* <li>Column partition</li>
- * <li>Column bloom filter</li>
* </ul>
* </li>
* <li>
@@ -68,126 +55,39 @@ import org.apache.pinot.spi.utils.CommonConstants.Server;
* </ul>
*/
@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
-public class ColumnValueSegmentPruner implements SegmentPruner {
-
- public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold";
-
- private int _inPredicateThreshold;
- private FetchPlanner _fetchPlanner;
-
- @Override
- public void init(PinotConfiguration config) {
- _inPredicateThreshold =
- config.getProperty(IN_PREDICATE_THRESHOLD,
Server.DEFAULT_VALUE_PRUNER_IN_PREDICATE_THRESHOLD);
- _fetchPlanner = FetchPlannerRegistry.getPlanner();
- }
-
- @Override
- public boolean isApplicableTo(QueryContext query) {
- return query.getFilter() != null;
- }
-
+public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner {
@Override
- public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query) {
- if (segments.isEmpty()) {
- return segments;
+ protected boolean isApplicableToPredicate(Predicate predicate) {
+ // Only prune columns
+ if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) {
+ return false;
}
- FilterContext filter = Objects.requireNonNull(query.getFilter());
- ValueCache cachedValues = new ValueCache();
- int numSegments = segments.size();
- List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
- if (query.isEnablePrefetch()) {
- FetchContext[] fetchContexts = new FetchContext[numSegments];
- try {
- // Prefetch bloom filter for columns within the EQ/IN predicate if
exists
- for (int i = 0; i < numSegments; i++) {
- IndexSegment segment = segments.get(i);
- FetchContext fetchContext =
_fetchPlanner.planFetchForPruning(segment, query);
- if (!fetchContext.isEmpty()) {
- segment.prefetch(fetchContext);
- fetchContexts[i] = fetchContext;
- }
- }
- // Prune segments
- Map[] dataSourceCaches = new Map[numSegments];
- for (int i = 0; i < numSegments; i++) {
- dataSourceCaches[i] = new HashMap<>();
- IndexSegment segment = segments.get(i);
- FetchContext fetchContext = fetchContexts[i];
- if (fetchContext != null) {
- segment.acquire(fetchContext);
- try {
- if (!pruneSegment(segment, filter, dataSourceCaches[i],
cachedValues)) {
- selectedSegments.add(segment);
- }
- } finally {
- segment.release(fetchContext);
- }
- } else {
- if (!pruneSegment(segment, filter, dataSourceCaches[i],
cachedValues)) {
- selectedSegments.add(segment);
- }
- }
- }
- } finally {
- // Release the prefetched bloom filters
- for (int i = 0; i < numSegments; i++) {
- FetchContext fetchContext = fetchContexts[i];
- if (fetchContext != null) {
- segments.get(i).release(fetchContext);
- }
- }
- }
- } else {
- Map<String, DataSource> dataSourceCache = new HashMap<>();
- for (IndexSegment segment : segments) {
- dataSourceCache.clear();
- if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
- selectedSegments.add(segment);
- }
+ Predicate.Type predicateType = predicate.getType();
+ if (predicateType == Predicate.Type.EQ || predicateType ==
Predicate.Type.RANGE) {
+ return true;
+ }
+ if (predicateType == Predicate.Type.IN) {
+ List<String> values = ((InPredicate) predicate).getValues();
+ // Skip pruning when there are too many values in the IN predicate
+ if (values.size() <= _inPredicateThreshold) {
+ return true;
}
}
- return selectedSegments;
+ return false;
}
- private boolean pruneSegment(IndexSegment segment, FilterContext filter,
Map<String, DataSource> dataSourceCache,
+ @Override
+ boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate,
Map<String, DataSource> dataSourceCache,
ValueCache cachedValues) {
- switch (filter.getType()) {
- case AND:
- for (FilterContext child : filter.getChildren()) {
- if (pruneSegment(segment, child, dataSourceCache, cachedValues)) {
- return true;
- }
- }
- return false;
- case OR:
- for (FilterContext child : filter.getChildren()) {
- if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) {
- return false;
- }
- }
- return true;
- case NOT:
- // Do not prune NOT filter
- return false;
- case PREDICATE:
- Predicate predicate = filter.getPredicate();
- // Only prune columns
- if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER)
{
- return false;
- }
- Predicate.Type predicateType = predicate.getType();
- if (predicateType == Predicate.Type.EQ) {
- return pruneEqPredicate(segment, (EqPredicate) predicate,
dataSourceCache, cachedValues);
- } else if (predicateType == Predicate.Type.IN) {
- return pruneInPredicate(segment, (InPredicate) predicate,
dataSourceCache, cachedValues);
- } else if (predicateType == Predicate.Type.RANGE) {
- return pruneRangePredicate(segment, (RangePredicate) predicate,
dataSourceCache);
- } else {
- return false;
- }
- default:
- throw new IllegalStateException();
+ Predicate.Type predicateType = predicate.getType();
+ if (predicateType == Predicate.Type.EQ) {
+ return pruneEqPredicate(segment, (EqPredicate) predicate,
dataSourceCache, cachedValues);
+ } else if (predicateType == Predicate.Type.IN) {
+ return pruneInPredicate(segment, (InPredicate) predicate,
dataSourceCache, cachedValues);
+ } else if (predicateType == Predicate.Type.RANGE) {
+ return pruneRangePredicate(segment, (RangePredicate) predicate,
dataSourceCache);
+ } else {
+ return false;
}
}
@@ -196,27 +96,22 @@ public class ColumnValueSegmentPruner implements
SegmentPruner {
* <ul>
* <li>Column min/max value</li>
* <li>Column partition</li>
- * <li>Column bloom filter</li>
* </ul>
*/
private boolean pruneEqPredicate(IndexSegment segment, EqPredicate
eqPredicate,
Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
String column = eqPredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment
- ? segment.getDataSource(column)
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
: dataSourceCache.computeIfAbsent(column, segment::getDataSource);
// NOTE: Column must exist after DataSchemaSegmentPruner
assert dataSource != null;
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate,
dataSourceMetadata.getDataType());
-
Comparable value = cachedValue.getComparableValue();
-
// Check min/max value
if (!checkMinMaxRange(dataSourceMetadata, value)) {
return true;
}
-
// Check column partition
PartitionFunction partitionFunction =
dataSourceMetadata.getPartitionFunction();
if (partitionFunction != null) {
@@ -226,15 +121,6 @@ public class ColumnValueSegmentPruner implements
SegmentPruner {
return true;
}
}
-
- // Check bloom filter
- BloomFilterReader bloomFilter = dataSource.getBloomFilter();
- if (bloomFilter != null) {
- if (!cachedValue.mightBeContained(bloomFilter)) {
- return true;
- }
- }
-
return false;
}
@@ -242,66 +128,26 @@ public class ColumnValueSegmentPruner implements
SegmentPruner {
* For IN predicate, prune the segments based on:
* <ul>
* <li>Column min/max value</li>
- * <li>Column bloom filter</li>
* </ul>
* <p>NOTE: segments will not be pruned if the number of values is greater
than the threshold.
*/
private boolean pruneInPredicate(IndexSegment segment, InPredicate
inPredicate,
Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
- String column = inPredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment
- ? segment.getDataSource(column)
- : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
- // NOTE: Column must exist after DataSchemaSegmentPruner
- assert dataSource != null;
- DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
List<String> values = inPredicate.getValues();
-
// Skip pruning when there are too many values in the IN predicate
if (values.size() > _inPredicateThreshold) {
return false;
}
-
+ String column = inPredicate.getLhs().getIdentifier();
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
+ : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
+ // NOTE: Column must exist after DataSchemaSegmentPruner
+ assert dataSource != null;
+ DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate,
dataSourceMetadata.getDataType());
-
// Check min/max value
- boolean someInRange = false;
for (ValueCache.CachedValue value : cachedValues) {
if (checkMinMaxRange(dataSourceMetadata, value.getComparableValue())) {
- someInRange = true;
- break;
- }
- }
- if (!someInRange) {
- return true;
- }
-
- // Check bloom filter
- BloomFilterReader bloomFilter = dataSource.getBloomFilter();
- if (bloomFilter == null) {
- return false;
- }
- for (ValueCache.CachedValue value : cachedValues) {
- if (value.mightBeContained(bloomFilter)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Returns {@code true} if the value is within the column's min/max value
range, {@code false} otherwise.
- */
- private boolean checkMinMaxRange(DataSourceMetadata dataSourceMetadata,
Comparable value) {
- Comparable minValue = dataSourceMetadata.getMinValue();
- if (minValue != null) {
- if (value.compareTo(minValue) < 0) {
- return false;
- }
- }
- Comparable maxValue = dataSourceMetadata.getMaxValue();
- if (maxValue != null) {
- if (value.compareTo(maxValue) > 0) {
return false;
}
}
@@ -317,8 +163,7 @@ public class ColumnValueSegmentPruner implements
SegmentPruner {
private boolean pruneRangePredicate(IndexSegment segment, RangePredicate
rangePredicate,
Map<String, DataSource> dataSourceCache) {
String column = rangePredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment
- ? segment.getDataSource(column)
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
: dataSourceCache.computeIfAbsent(column, segment::getDataSource);
// NOTE: Column must exist after DataSchemaSegmentPruner
assert dataSource != null;
@@ -382,95 +227,25 @@ public class ColumnValueSegmentPruner implements
SegmentPruner {
}
}
}
-
return false;
}
- private static Comparable convertValue(String stringValue, DataType
dataType) {
- try {
- return dataType.convertInternal(stringValue);
- } catch (Exception e) {
- throw new BadQueryRequestException(e);
- }
- }
-
- private static class ValueCache {
- // As Predicates are recursive structures, their hashCode is quite
expensive.
- // By using an IdentityHashMap here we don't need to iterate over the
recursive
- // structure. This is specially useful in the IN expression.
- private final Map<Predicate, Object> _cache = new IdentityHashMap<>();
-
- private CachedValue add(EqPredicate pred) {
- CachedValue val = new CachedValue(pred.getValue());
- _cache.put(pred, val);
- return val;
- }
-
- private List<CachedValue> add(InPredicate pred) {
- List<CachedValue> vals = new ArrayList<>(pred.getValues().size());
- for (String value : pred.getValues()) {
- vals.add(new CachedValue(value));
- }
- _cache.put(pred, vals);
- return vals;
- }
-
- public CachedValue get(EqPredicate pred, DataType dt) {
- CachedValue cachedValue = (CachedValue) _cache.get(pred);
- if (cachedValue == null) {
- cachedValue = add(pred);
- }
- cachedValue.ensureDataType(dt);
- return cachedValue;
- }
-
- public List<CachedValue> get(InPredicate pred, DataType dt) {
- List<CachedValue> cachedValues = (List<CachedValue>) _cache.get(pred);
- if (cachedValues == null) {
- cachedValues = add(pred);
- }
- for (CachedValue cachedValue : cachedValues) {
- cachedValue.ensureDataType(dt);
+ /**
+ * Returns {@code true} if the value is within the column's min/max value
range, {@code false} otherwise.
+ */
+ private boolean checkMinMaxRange(DataSourceMetadata dataSourceMetadata,
Comparable value) {
+ Comparable minValue = dataSourceMetadata.getMinValue();
+ if (minValue != null) {
+ if (value.compareTo(minValue) < 0) {
+ return false;
}
- return cachedValues;
}
-
- public static class CachedValue {
- private final Object _value;
- private boolean _hashed = false;
- private long _hash1;
- private long _hash2;
- private DataType _dt;
- private Comparable _comparableValue;
-
- private CachedValue(Object value) {
- _value = value;
- }
-
- private Comparable getComparableValue() {
- assert _dt != null;
- return _comparableValue;
- }
-
- private void ensureDataType(DataType dt) {
- if (dt != _dt) {
- String strValue = _value.toString();
- _dt = dt;
- _comparableValue = convertValue(strValue, dt);
- _hashed = false;
- }
- }
-
- private boolean mightBeContained(BloomFilterReader bloomFilter) {
- if (!_hashed) {
- GuavaBloomFilterReaderUtils.Hash128AsLongs hash128AsLongs =
-
GuavaBloomFilterReaderUtils.hashAsLongs(_comparableValue.toString());
- _hash1 = hash128AsLongs.getHash1();
- _hash2 = hash128AsLongs.getHash2();
- _hashed = true;
- }
- return bloomFilter.mightContain(_hash1, _hash2);
+ Comparable maxValue = dataSourceMetadata.getMaxValue();
+ if (maxValue != null) {
+ if (value.compareTo(maxValue) > 0) {
+ return false;
}
}
+ return true;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
index 39d19c375d..db848fc915 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
@@ -19,6 +19,8 @@
package org.apache.pinot.core.query.pruner;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -45,4 +47,9 @@ public interface SegmentPruner {
* TODO: Revisit this because the caller doesn't require not
changing the input segments
*/
List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query);
+
+ default List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query,
+ @Nullable ExecutorService executorService) {
+ return prune(segments, query);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
index e09d54590d..e9acb210ab 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
@@ -26,8 +26,6 @@ import org.apache.pinot.spi.env.PinotConfiguration;
/**
* A static SegmentPrunerProvider will give SegmentPruner instance based on
prunerClassName and configuration.
- *
- *
*/
public class SegmentPrunerProvider {
private SegmentPrunerProvider() {
@@ -36,10 +34,12 @@ public class SegmentPrunerProvider {
private static final Map<String, Class<? extends SegmentPruner>> PRUNER_MAP
= new HashMap<>();
public static final String COLUMN_VALUE_SEGMENT_PRUNER_NAME =
"columnvaluesegmentpruner";
+ public static final String BLOOM_FILTER_SEGMENT_PRUNER_NAME =
"bloomfiltersegmentpruner";
public static final String SELECTION_QUERY_SEGMENT_PRUNER_NAME =
"selectionquerysegmentpruner";
static {
PRUNER_MAP.put(COLUMN_VALUE_SEGMENT_PRUNER_NAME,
ColumnValueSegmentPruner.class);
+ PRUNER_MAP.put(BLOOM_FILTER_SEGMENT_PRUNER_NAME,
BloomFilterSegmentPruner.class);
PRUNER_MAP.put(SELECTION_QUERY_SEGMENT_PRUNER_NAME,
SelectionQuerySegmentPruner.class);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index 5c79ca1c2d..4b775f67bd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
import org.apache.pinot.core.query.config.SegmentPrunerConfig;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -59,6 +61,7 @@ public class SegmentPrunerService {
_prunerStatsUpdaters.put(pruner,
SegmentPrunerStatistics::setLimitPruned);
break;
case SegmentPrunerProvider.COLUMN_VALUE_SEGMENT_PRUNER_NAME:
+ case SegmentPrunerProvider.BLOOM_FILTER_SEGMENT_PRUNER_NAME:
_prunerStatsUpdaters.put(pruner,
SegmentPrunerStatistics::setValuePruned);
break;
default:
@@ -96,6 +99,11 @@ public class SegmentPrunerService {
* undefined way. Therefore, this list should not be used
after calling this method.
*/
public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query, SegmentPrunerStatistics stats) {
+ return prune(segments, query, stats, null);
+ }
+
+ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query, SegmentPrunerStatistics stats,
+ @Nullable ExecutorService executorService) {
try (InvocationScope scope =
Tracing.getTracer().createScope(SegmentPrunerService.class)) {
segments = removeInvalidSegments(segments, query, stats);
int invokedPrunersCount = 0;
@@ -105,7 +113,7 @@ public class SegmentPrunerService {
try (InvocationScope prunerScope =
Tracing.getTracer().createScope(segmentPruner.getClass())) {
int originalSegmentsSize = segments.size();
prunerScope.setNumSegments(originalSegmentsSize);
- segments = segmentPruner.prune(segments, query);
+ segments = segmentPruner.prune(segments, query, executorService);
_prunerStatsUpdaters.get(segmentPruner).accept(stats,
originalSegmentsSize - segments.size());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
new file mode 100644
index 0000000000..76f1b2737b
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+
+
+/**
+ * The {@code ValueBasedSegmentPruner} prunes segments based on values inside
the filter and segment metadata and data.
+ */
+@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
+abstract public class ValueBasedSegmentPruner implements SegmentPruner {
+ public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold";
+ protected int _inPredicateThreshold;
+
+ @Override
+ public void init(PinotConfiguration config) {
+ _inPredicateThreshold =
+ config.getProperty(IN_PREDICATE_THRESHOLD,
Server.DEFAULT_VALUE_PRUNER_IN_PREDICATE_THRESHOLD);
+ }
+
+ @Override
+ public boolean isApplicableTo(QueryContext query) {
+ if (query.getFilter() == null) {
+ return false;
+ }
+ return isApplicableToFilter(query.getFilter());
+ }
+
+ /**
+ * 1. NOT is not applicable for segment pruning;
+ * 2. For OR, if one of the child filter is not applicable for pruning, the
parent filter is not applicable;
+ * 3. For AND, if one of the child filter is applicable for pruning, the
parent filter is applicable, but it
+ * doesn't mean this child filter can prune the segment.
+ * 4. The specific pruners decide their own applicable predicate types.
+ */
+ private boolean isApplicableToFilter(FilterContext filter) {
+ switch (filter.getType()) {
+ case AND:
+ for (FilterContext child : filter.getChildren()) {
+ if (isApplicableToFilter(child)) {
+ return true;
+ }
+ }
+ return false;
+ case OR:
+ for (FilterContext child : filter.getChildren()) {
+ if (!isApplicableToFilter(child)) {
+ return false;
+ }
+ }
+ return true;
+ case NOT:
+ // Do not prune NOT filter
+ return false;
+ case PREDICATE:
+ return isApplicableToPredicate(filter.getPredicate());
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ abstract boolean isApplicableToPredicate(Predicate predicate);
+
+ @Override
+ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query) {
+ if (segments.isEmpty()) {
+ return segments;
+ }
+ FilterContext filter = Objects.requireNonNull(query.getFilter());
+ ValueCache cachedValues = new ValueCache();
+ Map<String, DataSource> dataSourceCache = new HashMap<>();
+ List<IndexSegment> selectedSegments = new ArrayList<>(segments.size());
+ for (IndexSegment segment : segments) {
+ dataSourceCache.clear();
+ if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
+ selectedSegments.add(segment);
+ }
+ }
+ return selectedSegments;
+ }
+
+ protected boolean pruneSegment(IndexSegment segment, FilterContext filter,
Map<String, DataSource> dataSourceCache,
+ ValueCache cachedValues) {
+ switch (filter.getType()) {
+ case AND:
+ for (FilterContext child : filter.getChildren()) {
+ if (pruneSegment(segment, child, dataSourceCache, cachedValues)) {
+ return true;
+ }
+ }
+ return false;
+ case OR:
+ for (FilterContext child : filter.getChildren()) {
+ if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) {
+ return false;
+ }
+ }
+ return true;
+ case NOT:
+ // Do not prune NOT filter
+ return false;
+ case PREDICATE:
+ Predicate predicate = filter.getPredicate();
+ // Only prune columns
+ if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER)
{
+ return false;
+ }
+ return pruneSegmentWithPredicate(segment, predicate, dataSourceCache,
cachedValues);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ abstract boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate
predicate,
+ Map<String, DataSource> dataSourceCache, ValueCache cachedValues);
+
+ protected static Comparable convertValue(String stringValue, DataType
dataType) {
+ try {
+ return dataType.convertInternal(stringValue);
+ } catch (Exception e) {
+ throw new BadQueryRequestException(e);
+ }
+ }
+
+ protected static class ValueCache {
+ // As Predicates are recursive structures, their hashCode is quite
expensive.
+ // By using an IdentityHashMap here we don't need to iterate over the
recursive
+ // structure. This is specially useful in the IN expression.
+ private final Map<Predicate, Object> _cache = new IdentityHashMap<>();
+
+ private CachedValue add(EqPredicate pred) {
+ CachedValue val = new CachedValue(pred.getValue());
+ _cache.put(pred, val);
+ return val;
+ }
+
+ private List<CachedValue> add(InPredicate pred) {
+ List<CachedValue> vals = new ArrayList<>(pred.getValues().size());
+ for (String value : pred.getValues()) {
+ vals.add(new CachedValue(value));
+ }
+ _cache.put(pred, vals);
+ return vals;
+ }
+
+ public CachedValue get(EqPredicate pred, DataType dt) {
+ CachedValue cachedValue = (CachedValue) _cache.get(pred);
+ if (cachedValue == null) {
+ cachedValue = add(pred);
+ }
+ cachedValue.ensureDataType(dt);
+ return cachedValue;
+ }
+
+ public List<CachedValue> get(InPredicate pred, DataType dt) {
+ List<CachedValue> cachedValues = (List<CachedValue>) _cache.get(pred);
+ if (cachedValues == null) {
+ cachedValues = add(pred);
+ }
+ for (CachedValue cachedValue : cachedValues) {
+ cachedValue.ensureDataType(dt);
+ }
+ return cachedValues;
+ }
+
+ public static class CachedValue {
+ private final Object _value;
+ private boolean _hashed = false;
+ private long _hash1;
+ private long _hash2;
+ private DataType _dt;
+ private Comparable _comparableValue;
+
+ private CachedValue(Object value) {
+ _value = value;
+ }
+
+ public Comparable getComparableValue() {
+ assert _dt != null;
+ return _comparableValue;
+ }
+
+ public void ensureDataType(DataType dt) {
+ if (dt != _dt) {
+ String strValue = _value.toString();
+ _dt = dt;
+ _comparableValue = convertValue(strValue, dt);
+ _hashed = false;
+ }
+ }
+
+ public boolean mightBeContained(BloomFilterReader bloomFilter) {
+ if (!_hashed) {
+ GuavaBloomFilterReaderUtils.Hash128AsLongs hash128AsLongs =
+
GuavaBloomFilterReaderUtils.hashAsLongs(_comparableValue.toString());
+ _hash1 = hash128AsLongs.getHash1();
+ _hash2 = hash128AsLongs.getHash2();
+ _hashed = true;
+ }
+ return bloomFilter.mightContain(_hash1, _hash2);
+ }
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryMultiThreadingUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryMultiThreadingUtils.java
new file mode 100644
index 0000000000..bea9d0110c
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryMultiThreadingUtils.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * The term `task` and `thread` are used interchangeably in the logic to
parallelize CombinePlanNode and
+ * BaseCombineOperator. This class provides common methods used to set up the
parallel processing.
+ */
+public class QueryMultiThreadingUtils {
+ private QueryMultiThreadingUtils() {
+ }
+
+ /**
+ * Use at most 10 or half of the processors threads for each query. If there
are less than 2 processors, use 1
+ * thread.
+ * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2
in container based environment, e.g.
+ * Kubernetes.
+ */
+ public static final int MAX_NUM_THREADS_PER_QUERY =
+ Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() /
2));
+
+ /**
+ * Returns the number of tasks for the query execution. The tasks can be
assigned to multiple execution threads so
+ * that they can run in parallel. The parallelism is bounded by the task
count.
+ */
+ public static int getNumTasksForQuery(int numOperators, int
maxExecutionThreads) {
+ return getNumTasks(numOperators, 1, maxExecutionThreads);
+ }
+
+ public static int getNumTasks(int numWorkUnits, int minUnitsPerThread, int
maxExecutionThreads) {
+ if (numWorkUnits <= minUnitsPerThread) {
+ return 1;
+ }
+ if (maxExecutionThreads <= 0) {
+ maxExecutionThreads = MAX_NUM_THREADS_PER_QUERY;
+ }
+ return Math.min((numWorkUnits + minUnitsPerThread - 1) /
minUnitsPerThread, maxExecutionThreads);
+ }
+
+ /**
+ * Use a Phaser to ensure all the Futures are done (not scheduled, finished
or interrupted) before the main thread
+ * returns. We need to ensure no execution left before the main thread
returning because the main thread holds the
+ * reference to the segments, and if the segments are deleted/refreshed, the
segments can be released after the main
+ * thread returns, which would lead to undefined behavior (even JVM crash)
when executing queries against them.
+ */
+ public static <T> void runTasksWithDeadline(int numTasks, Function<Integer,
T> taskFunc, Consumer<T> resCollector,
+ Consumer<Exception> errHandler, ExecutorService executorService, long
deadlineInMs) {
+ Phaser phaser = new Phaser(1);
+ List<Future<T>> futures = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ int index = i;
+ futures.add(executorService.submit(new TraceCallable<T>() {
+ @Override
+ public T callJob() {
+ try {
+ // Register the thread to the phaser for main thread to wait for
it to complete.
+ if (phaser.register() < 0) {
+ return null;
+ }
+ return taskFunc.apply(index);
+ } finally {
+ phaser.arriveAndDeregister();
+ }
+ }
+ }));
+ }
+ try {
+ // Check deadline while waiting for the task results.
+ for (Future<T> future : futures) {
+ T taskResult = future.get(deadlineInMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ resCollector.accept(taskResult);
+ }
+ } catch (Exception e) {
+ errHandler.accept(e);
+ } finally {
+ // Cancel all ongoing jobs
+ for (Future<T> future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ // Deregister the main thread and wait for all threads done
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index f7a73647fa..3a4bdd116a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
@@ -74,7 +75,7 @@ public class SelectionCombineOperatorTest {
private static final String SEGMENT_NAME_PREFIX = "testSegment_";
// Create (MAX_NUM_THREADS_PER_QUERY * 2) segments so that each thread needs
to process 2 segments
- private static final int NUM_SEGMENTS =
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 2;
+ private static final int NUM_SEGMENTS =
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 2;
private static final int NUM_CONSUMING_SEGMENTS = NUM_SEGMENTS / 2;
private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME +
"_REALTIME";
private static final int NUM_RECORDS_PER_SEGMENT = 100;
@@ -177,27 +178,27 @@ public class SelectionCombineOperatorTest {
// Should early-terminate after processing the result of the first
segment. Each thread should process at most 1
// segment.
long numDocsScanned = combineResult.getNumDocsScanned();
- assertTrue(numDocsScanned >= 10 && numDocsScanned <=
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10);
+ assertTrue(numDocsScanned >= 10 && numDocsScanned <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 10);
assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
assertEquals(combineResult.getNumConsumingSegmentsProcessed(),
NUM_CONSUMING_SEGMENTS);
int numSegmentsMatched = combineResult.getNumSegmentsMatched();
- assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
// The check below depends on the order of segment processing. When
segments# <= 10 (the value of
// CombinePlanNode.TARGET_NUM_PLANS_PER_THREAD to be specific), the
segments are processed in the order as they
// are prepared, which is OFFLINE segments followed by RT segments and
this case makes the value here equal to 0.
// But when segments# > 10, the segments are processed in a different
order and some RT segments can be processed
- // ahead of the other OFFLINE segments, but no more than
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY for sure
+ // ahead of the other OFFLINE segments, but no more than
TaskUtils.MAX_NUM_THREADS_PER_QUERY for sure
// as each thread only processes one segment.
int numConsumingSegmentsMatched =
combineResult.getNumConsumingSegmentsMatched();
if (NUM_SEGMENTS <= 10) {
assertEquals(numConsumingSegmentsMatched, 0, "numSegments: " +
NUM_SEGMENTS);
} else {
assertTrue(numConsumingSegmentsMatched >= 0
- && numConsumingSegmentsMatched <=
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY, String
+ && numConsumingSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY, String
.format("numConsumingSegmentsMatched: %d, maxThreadsPerQuery: %d,
numSegments: %d",
- combineResult.getNumConsumingSegmentsMatched(),
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY,
+ combineResult.getNumConsumingSegmentsMatched(),
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY,
NUM_SEGMENTS));
}
assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
@@ -233,14 +234,14 @@ public class SelectionCombineOperatorTest {
// segment.
long numDocsScanned = combineResult.getNumDocsScanned();
// Need to scan 10 documents per segment because 'intColumn' is sorted
- assertTrue(numDocsScanned >= 10 && numDocsScanned <=
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10);
+ assertTrue(numDocsScanned >= 10 && numDocsScanned <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 10);
assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
assertEquals(combineResult.getNumConsumingSegmentsProcessed(),
NUM_CONSUMING_SEGMENTS);
assertEquals(combineResult.getNumConsumingSegmentsMatched(), 0);
int numSegmentsMatched = combineResult.getNumSegmentsMatched();
- assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
combineResult = getCombineResult("SELECT * FROM testTable ORDER BY
intColumn DESC");
@@ -257,14 +258,14 @@ public class SelectionCombineOperatorTest {
// segment.
numDocsScanned = combineResult.getNumDocsScanned();
assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
- && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY *
NUM_RECORDS_PER_SEGMENT);
+ && numDocsScanned <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
assertEquals(combineResult.getNumConsumingSegmentsProcessed(),
NUM_CONSUMING_SEGMENTS);
assertEquals(combineResult.getNumConsumingSegmentsMatched(),
NUM_CONSUMING_SEGMENTS);
numSegmentsMatched = combineResult.getNumSegmentsMatched();
- assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
combineResult = getCombineResult("SELECT * FROM testTable ORDER BY
intColumn DESC LIMIT 10000");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
new file mode 100644
index 0000000000..75b2beea0f
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import
org.apache.pinot.segment.local.segment.index.readers.bloom.OnHeapGuavaBloomFilterReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BloomFilterSegmentPrunerTest {
+ private static final BloomFilterSegmentPruner PRUNER = new
BloomFilterSegmentPruner();
+
+ @BeforeClass
+ public void setUp() {
+ Map<String, Object> properties = new HashMap<>();
+ // override default value
+ properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
+ PinotConfiguration configuration = new PinotConfiguration(properties);
+ PRUNER.init(configuration);
+ }
+
+ @Test
+ public void testBloomFilterPruning()
+ throws IOException {
+ IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0",
"3.0", "5.0", "7.0", "21.0"});
+
+ // all out the bloom filter
+ assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (0.0)"));
+ assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 0.0"));
+ assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 6.0"));
+ assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (6.0)"));
+ assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 0.0 OR column = 6.0"));
+
+ // all in the bloom filter
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (5.0)"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0 OR column = 7.0"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (5.0, 7.0)"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0 AND column = 7.0"));
+
+ // some in the bloom filter with IN/OR
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (0.0, 3.0, 4.0)"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 1.0"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (1.0)"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 21.0"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (21.0)"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (21.0, 30.0)"));
+ assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 21.0 OR column = 30.0"));
+ // 30 out the bloom filter with AND
+ assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 21.0 AND column = 30.0"));
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testQueryTimeoutOnPruning()
+ throws IOException {
+ IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0",
"3.0", "5.0", "7.0", "21.0"});
+ DataSource dataSource = mock(DataSource.class);
+ when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+ runPruner(Collections.singletonList(indexSegment),
+ "SELECT COUNT(*) FROM testTable WHERE column = 5.0 OR column = 0.0",
1);
+ }
+
+ @Test
+ public void testParallelPrune()
+ throws IOException {
+ List<IndexSegment> segments = new ArrayList<>();
+ for (int i = 0; i < 35; i++) {
+ IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0",
"3.0", "5.0", "7.0", "21.0"});
+ segments.add(indexSegment);
+ }
+ assertTrue(
+ runPruner(segments, "SELECT COUNT(*) FROM testTable WHERE column =
21.0 AND column = 30.0", 5000).isEmpty());
+
+ IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0",
"3.0", "5.0", "7.0", "21.0", "30.0"});
+ segments.add(indexSegment);
+ List<IndexSegment> selected =
+ runPruner(segments, "SELECT COUNT(*) FROM testTable WHERE column =
21.0 AND column = 30.0", 5000);
+ assertEquals(selected.size(), 1);
+ }
+
+ @Test
+ public void testIsApplicableTo() {
+ // EQ and IN (with small number of values) are applicable for bloom filter
based pruning.
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM
testTable WHERE column = 1");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*)
FROM testTable WHERE column IN (1, 2)");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+
+ // NOT is not applicable
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*)
FROM testTable WHERE NOT column = 1");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+ // Too many values for IN clause
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column IN (1, 2, 3, 4, 5, 6,
7)");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+ // Other predicate types are not applicable
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*)
FROM testTable WHERE column LIKE 5");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+
+ // AND with one applicable child filter is applicable
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column NOT IN (1, 2) AND column
= 3");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+
+ // OR with one child filter that's not applicable is not applicable
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column = 3 OR column NOT IN (1,
2)");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+
+ // Nested with AND/OR
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column = 3 OR (column NOT IN (1,
2) AND column = 4)");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+ }
+
+ private IndexSegment mockIndexSegment(String[] values)
+ throws IOException {
+ IndexSegment indexSegment = mock(IndexSegment.class);
+ when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("column"));
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(20);
+ when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+
+ DataSource dataSource = mock(DataSource.class);
+ when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+ // Add support for bloom filter
+ DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
+ BloomFilterReaderBuilder builder = new BloomFilterReaderBuilder();
+ for (String v : values) {
+ builder.put(v);
+ }
+ when(dataSourceMetadata.getDataType()).thenReturn(DataType.DOUBLE);
+ when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
+ when(dataSource.getBloomFilter()).thenReturn(builder.build());
+
+ return indexSegment;
+ }
+
+ private boolean runPruner(IndexSegment segment, String query) {
+ return runPruner(Collections.singletonList(segment), query,
5000).isEmpty();
+ }
+
+ private List<IndexSegment> runPruner(List<IndexSegment> segments, String
query, long queryTimeout) {
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
+ queryContext.setEndTimeMs(System.currentTimeMillis() + queryTimeout);
+ return PRUNER.prune(segments, queryContext,
Executors.newCachedThreadPool());
+ }
+
+ private static class BloomFilterReaderBuilder {
+ private BloomFilter<String> _bloomfilter =
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 100, 0.01);
+ public BloomFilterReaderBuilder put(String value) {
+ _bloomfilter.put(value);
+ return this;
+ }
+
+ public BloomFilterReader build() throws IOException {
+ File file = Files.createTempFile("test", ".bloom").toFile();
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ _bloomfilter.writeTo(fos);
+ try (PinotDataBuffer pinotDataBuffer =
PinotDataBuffer.loadBigEndianFile(file)) {
+ // on heap filter should never use the buffer, so we can close it
and delete the file
+ return new OnHeapGuavaBloomFilterReader(pinotDataBuffer);
+ }
+ } finally {
+ file.delete();
+ }
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
index 5edbdb32a3..5a5d7bd505 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -18,30 +18,21 @@
*/
package org.apache.pinot.core.query.pruner;
-import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
-import com.google.common.hash.BloomFilter;
-import com.google.common.hash.Funnels;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import
org.apache.pinot.segment.local.segment.index.readers.bloom.OnHeapGuavaBloomFilterReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -53,14 +44,17 @@ import static org.testng.Assert.assertTrue;
public class ColumnValueSegmentPrunerTest {
private static final ColumnValueSegmentPruner PRUNER = new
ColumnValueSegmentPruner();
- @Test
- public void testMinMaxValuePruning() {
+ @BeforeClass
+ public void setUp() {
Map<String, Object> properties = new HashMap<>();
- //override default value
- properties.put(PRUNER.IN_PREDICATE_THRESHOLD, 5);
+ // override default value
+ properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
PinotConfiguration configuration = new PinotConfiguration(properties);
PRUNER.init(configuration);
+ }
+ @Test
+ public void testMinMaxValuePruning() {
IndexSegment indexSegment = mockIndexSegment();
DataSource dataSource = mock(DataSource.class);
@@ -147,66 +141,42 @@ public class ColumnValueSegmentPrunerTest {
}
@Test
- public void testBloomFilterPredicatePruning()
- throws IOException {
- Map<String, Object> properties = new HashMap<>();
- // override default value
- properties.put(ColumnValueSegmentPruner.IN_PREDICATE_THRESHOLD, 5);
- PinotConfiguration configuration = new PinotConfiguration(properties);
- PRUNER.init(configuration);
-
- IndexSegment indexSegment = mockIndexSegment();
- DataSource dataSource = mock(DataSource.class);
- when(indexSegment.getDataSource("column")).thenReturn(dataSource);
- // Add support for bloom filter
- DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
- BloomFilterReader bloomFilterReader = new BloomFilterReaderBuilder()
- .put("1.0")
- .put("2.0")
- .put("3.0")
- .put("5.0")
- .put("7.0")
- .put("21.0")
- .build();
-
- when(dataSourceMetadata.getDataType()).thenReturn(DataType.DOUBLE);
- when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
- when(dataSource.getBloomFilter()).thenReturn(bloomFilterReader);
- when(dataSourceMetadata.getMinValue()).thenReturn(5.0);
- when(dataSourceMetadata.getMaxValue()).thenReturn(10.0);
-
- // all out the bloom filter and out of range
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (0.0)"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 0.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (0.0, 3.0, 4.0)"));
-
- // some in the bloom filter but all out of range
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 1.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (1.0)"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 21.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (21.0)"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (21.0, 30.0)"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 21.0 AND column = 30.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 21.0 OR column = 30.0"));
-
- // all out the bloom filter but some in range
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 6.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (6.0)"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 0.0 OR column = 6.0"));
-
- // all in the bloom filter and in range
- assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0"));
- assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (5.0)"));
- assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0 OR column = 7.0"));
- assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (5.0, 7.0)"));
- assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0 AND column = 7.0"));
-
- // some in the bloom filter and in range
- assertFalse(
- runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE column
IN (0.0, 5.0)"));
- assertFalse(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0 OR column = 0.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column = 5.0 AND column = 0.0"));
- assertTrue(runPruner(indexSegment, "SELECT COUNT(*) FROM testTable WHERE
column IN (8.0, 10.0)"));
+ public void testIsApplicableTo() {
+ // EQ, RANGE and IN (with small number of values) are applicable for
min/max/partitionId based pruning.
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM
testTable WHERE column = 1");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*)
FROM testTable WHERE column IN (1, 2)");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+ queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM
testTable WHERE column BETWEEN 1 AND 2");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+
+ // NOT is not applicable
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*)
FROM testTable WHERE NOT column = 1");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+ // Too many values for IN clause
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column IN (1, 2, 3, 4, 5, 6,
7)");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+ // Other predicate types are not applicable
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*)
FROM testTable WHERE column LIKE 5");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+
+ // AND with one applicable child filter is applicable
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column NOT IN (1, 2) AND column
= 3");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
+
+ // OR with one child filter that's not applicable is not applicable
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column = 3 OR column NOT IN (1,
2)");
+ assertFalse(PRUNER.isApplicableTo(queryContext));
+
+ // Nested with AND/OR
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT COUNT(*) FROM testTable WHERE column = 3 OR (column NOT IN (1,
2) AND column BETWEEN 4 AND 5)");
+ assertTrue(PRUNER.isApplicableTo(queryContext));
}
private IndexSegment mockIndexSegment() {
@@ -222,25 +192,4 @@ public class ColumnValueSegmentPrunerTest {
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
return PRUNER.prune(Arrays.asList(indexSegment), queryContext).isEmpty();
}
-
- private static class BloomFilterReaderBuilder {
- private BloomFilter<String> _bloomfilter =
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 100, 0.01);
- public BloomFilterReaderBuilder put(String value) {
- _bloomfilter.put(value);
- return this;
- }
-
- public BloomFilterReader build() throws IOException {
- File file = Files.createTempFile("test", ".bloom").toFile();
- try (FileOutputStream fos = new FileOutputStream(file)) {
- _bloomfilter.writeTo(fos);
- try (PinotDataBuffer pinotDataBuffer =
PinotDataBuffer.loadBigEndianFile(file)) {
- // on heap filter should never use the buffer, so we can close it
and delete the file
- return new OnHeapGuavaBloomFilterReader(pinotDataBuffer);
- }
- } finally {
- file.delete();
- }
- }
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/util/QueryMultiThreadingUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/util/QueryMultiThreadingUtilsTest.java
new file mode 100644
index 0000000000..92ac06f967
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/util/QueryMultiThreadingUtilsTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class QueryMultiThreadingUtilsTest {
+ @Test
+ public void testGetNumTasksForQuery() {
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(1, 2), 1);
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(3, 2), 2);
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(0, -1),
1);
+ int numOps = QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY;
+ // TaskUtils.MAX_NUM_THREADS_PER_QUERY at max
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(numOps +
10, -1),
+ QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+ // But 1 at min
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasksForQuery(numOps -
1, -1), Math.max(1, numOps - 1));
+ }
+
+ @Test
+ public void testGetNumTasks() {
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(2, 3, 4), 1);
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(7, 3, 4), 3);
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(9, 3, 4), 3);
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(10, 3, 4), 4);
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(100, 3, 4), 4);
+ int targetPerThread = 5;
+ int numWorkUnits = QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY *
targetPerThread;
+ Assert.assertEquals(QueryMultiThreadingUtils.getNumTasks(numWorkUnits +
10, targetPerThread, -1),
+ QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+ }
+
+ @Test
+ public void testRunTasksWithDeadline() {
+ ExecutorService exec = Executors.newCachedThreadPool();
+ AtomicInteger sum = new AtomicInteger(0);
+ QueryMultiThreadingUtils.runTasksWithDeadline(5, index -> index,
sum::addAndGet, e -> {
+ }, exec, System.currentTimeMillis() + 500);
+ // sum of 0, 1, .., 4 indices.
+ Assert.assertEquals(sum.get(), 10);
+
+ // Task throws exception before timeout.
+ Exception[] err = new Exception[1];
+ QueryMultiThreadingUtils.runTasksWithDeadline(5, index -> {
+ throw new RuntimeException("oops: " + index);
+ }, sum::addAndGet, e -> err[0] = e, exec, System.currentTimeMillis() +
500);
+ Assert.assertTrue(err[0].getMessage().contains("oops"));
+
+ // Task timed out.
+ QueryMultiThreadingUtils.runTasksWithDeadline(5, index -> {
+ try {
+ Thread.sleep(10_000);
+ return index;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }, sum::addAndGet, e -> err[0] = e, exec, System.currentTimeMillis() +
500);
+ Assert.assertTrue(err[0] instanceof TimeoutException);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0be6c57267..123a789bcb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -559,8 +559,10 @@ public class CommonConstants {
"org.apache.pinot.server.starter.helix.HelixInstanceDataManager";
public static final String DEFAULT_QUERY_EXECUTOR_CLASS =
"org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl";
+ // The order of the pruners matters. Pruning with segment metadata ahead
of those using segment data like bloom
+ // filters to reduce the required data access.
public static final List<String> DEFAULT_QUERY_EXECUTOR_PRUNER_CLASS =
- ImmutableList.of("ColumnValueSegmentPruner",
"SelectionQuerySegmentPruner");
+ ImmutableList.of("ColumnValueSegmentPruner",
"BloomFilterSegmentPruner", "SelectionQuerySegmentPruner");
public static final String DEFAULT_QUERY_EXECUTOR_PLAN_MAKER_CLASS =
"org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2";
public static final long DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS = 15_000L;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]