This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5e7c005 Fix upsert inconsistency by snapshotting the validDocIds
before reading the numDocs (#8392)
5e7c005 is described below
commit 5e7c0050f684ac15b3d8a249ad596c2aee0911f5
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Mar 23 15:03:32 2022 -0700
Fix upsert inconsistency by snapshotting the validDocIds before reading the
numDocs (#8392)
---
.../org/apache/pinot/core/plan/FilterPlanNode.java | 80 +++++++++++-----------
.../core/plan/maker/InstancePlanMakerImplV2.java | 3 +
.../core/query/request/context/QueryContext.java | 10 +++
3 files changed, 52 insertions(+), 41 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 047dbba..c43c5b5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
@@ -45,57 +46,55 @@ import
org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEv
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
public class FilterPlanNode implements PlanNode {
private final IndexSegment _indexSegment;
private final QueryContext _queryContext;
- private final int _numDocs;
- private FilterContext _filterContext;
+ private final FilterContext _filter;
// Cache the predicate evaluators
private final Map<Predicate, PredicateEvaluator> _predicateEvaluatorMap =
new HashMap<>();
public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
- _indexSegment = indexSegment;
- _queryContext = queryContext;
- // NOTE: Fetch number of documents in the segment when creating the plan
node so that it is consistent among all
- // filter operators. Number of documents will keep increasing for
MutableSegment (CONSUMING segment).
- _numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+ this(indexSegment, queryContext, null);
}
- public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext,
FilterContext filterContext) {
- this(indexSegment, queryContext);
-
- _filterContext = filterContext;
+ public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext,
@Nullable FilterContext filter) {
+ _indexSegment = indexSegment;
+ _queryContext = queryContext;
+ _filter = filter;
}
@Override
public BaseFilterOperator run() {
- FilterContext filter = _filterContext == null ? _queryContext.getFilter()
: _filterContext;
+ // NOTE: Snapshot the validDocIds before reading the numDocs to prevent
the latest updates getting lost
ThreadSafeMutableRoaringBitmap validDocIds =
_indexSegment.getValidDocIds();
- boolean applyValidDocIds = validDocIds != null &&
!QueryOptionsUtils.isSkipUpsert(_queryContext.getQueryOptions());
+ MutableRoaringBitmap validDocIdsSnapshot =
+ validDocIds != null && !_queryContext.isSkipUpsert() ?
validDocIds.getMutableRoaringBitmap() : null;
+ int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+
+ FilterContext filter = _filter != null ? _filter :
_queryContext.getFilter();
if (filter != null) {
- BaseFilterOperator filterOperator = constructPhysicalOperator(filter);
- if (applyValidDocIds) {
- BaseFilterOperator validDocFilter =
- new
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false,
_numDocs);
- return
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator,
validDocFilter), _numDocs,
+ BaseFilterOperator filterOperator = constructPhysicalOperator(filter,
numDocs);
+ if (validDocIdsSnapshot != null) {
+ BaseFilterOperator validDocFilter = new
BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
+ return
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator,
validDocFilter), numDocs,
_queryContext.getDebugOptions());
} else {
return filterOperator;
}
- } else if (applyValidDocIds) {
- return new
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false,
_numDocs);
+ } else if (validDocIdsSnapshot != null) {
+ return new BitmapBasedFilterOperator(validDocIdsSnapshot, false,
numDocs);
} else {
- return new MatchAllFilterOperator(_numDocs);
+ return new MatchAllFilterOperator(numDocs);
}
}
@@ -143,13 +142,13 @@ public class FilterPlanNode implements PlanNode {
/**
* Helper method to build the operator tree from the filter.
*/
- private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
+ private BaseFilterOperator constructPhysicalOperator(FilterContext filter,
int numDocs) {
switch (filter.getType()) {
case AND:
List<FilterContext> childFilters = filter.getChildren();
List<BaseFilterOperator> childFilterOperators = new
ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
- BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter);
+ BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultEmpty()) {
// Return empty filter operator if any of the child filter
operator's result is empty
return EmptyFilterOperator.getInstance();
@@ -158,47 +157,46 @@ public class FilterPlanNode implements PlanNode {
childFilterOperators.add(childFilterOperator);
}
}
- return FilterOperatorUtils.getAndFilterOperator(childFilterOperators,
_numDocs,
- _queryContext.getDebugOptions());
+ return FilterOperatorUtils.getAndFilterOperator(childFilterOperators,
numDocs, _queryContext.getDebugOptions());
case OR:
childFilters = filter.getChildren();
childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
- BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter);
+ BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultMatchingAll()) {
// Return match all filter operator if any of the child filter
operator matches all records
- return new MatchAllFilterOperator(_numDocs);
+ return new MatchAllFilterOperator(numDocs);
} else if (!childFilterOperator.isResultEmpty()) {
// Remove child filter operators whose result is empty
childFilterOperators.add(childFilterOperator);
}
}
- return FilterOperatorUtils.getOrFilterOperator(childFilterOperators,
_numDocs, _queryContext.getDebugOptions());
+ return FilterOperatorUtils.getOrFilterOperator(childFilterOperators,
numDocs, _queryContext.getDebugOptions());
case NOT:
childFilters = filter.getChildren();
assert childFilters.size() == 1;
- BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilters.get(0));
- return FilterOperatorUtils.getNotFilterOperator(childFilterOperator,
_numDocs, null);
+ BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilters.get(0), numDocs);
+ return FilterOperatorUtils.getNotFilterOperator(childFilterOperator,
numDocs, null);
case PREDICATE:
Predicate predicate = filter.getPredicate();
ExpressionContext lhs = predicate.getLhs();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
if (canApplyH3Index(predicate, lhs.getFunction())) {
- return new H3IndexFilterOperator(_indexSegment, predicate,
_numDocs);
+ return new H3IndexFilterOperator(_indexSegment, predicate,
numDocs);
}
// TODO: ExpressionFilterOperator does not support predicate types
without PredicateEvaluator (IS_NULL,
// IS_NOT_NULL, TEXT_MATCH)
- return new ExpressionFilterOperator(_indexSegment, predicate,
_numDocs);
+ return new ExpressionFilterOperator(_indexSegment, predicate,
numDocs);
} else {
String column = lhs.getIdentifier();
DataSource dataSource = _indexSegment.getDataSource(column);
PredicateEvaluator predicateEvaluator =
_predicateEvaluatorMap.get(predicate);
if (predicateEvaluator != null) {
- return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
_numDocs);
+ return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
numDocs);
}
switch (predicate.getType()) {
case TEXT_MATCH:
- return new TextMatchFilterOperator(dataSource.getTextIndex(),
(TextMatchPredicate) predicate, _numDocs);
+ return new TextMatchFilterOperator(dataSource.getTextIndex(),
(TextMatchPredicate) predicate, numDocs);
case REGEXP_LIKE:
// FST Index is available only for rolled out segments. So, we
use different evaluator for rolled out and
// consuming segments.
@@ -218,32 +216,32 @@ public class FilterPlanNode implements PlanNode {
dataSource.getDataSourceMetadata().getDataType());
}
_predicateEvaluatorMap.put(predicate, predicateEvaluator);
- return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
_numDocs);
+ return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
numDocs);
case JSON_MATCH:
JsonIndexReader jsonIndex = dataSource.getJsonIndex();
Preconditions.checkState(jsonIndex != null, "Cannot apply
JSON_MATCH on column: %s without json index",
column);
- return new JsonMatchFilterOperator(jsonIndex,
(JsonMatchPredicate) predicate, _numDocs);
+ return new JsonMatchFilterOperator(jsonIndex,
(JsonMatchPredicate) predicate, numDocs);
case IS_NULL:
NullValueVectorReader nullValueVector =
dataSource.getNullValueVector();
if (nullValueVector != null) {
- return new
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, _numDocs);
+ return new
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, numDocs);
} else {
return EmptyFilterOperator.getInstance();
}
case IS_NOT_NULL:
nullValueVector = dataSource.getNullValueVector();
if (nullValueVector != null) {
- return new
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, _numDocs);
+ return new
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, numDocs);
} else {
- return new MatchAllFilterOperator(_numDocs);
+ return new MatchAllFilterOperator(numDocs);
}
default:
predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
dataSource.getDictionary(),
dataSource.getDataSourceMetadata().getDataType());
_predicateEvaluatorMap.put(predicate, predicateEvaluator);
- return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
_numDocs);
+ return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
numDocs);
}
}
default:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index ce4df76..b9b2543 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -182,6 +182,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
private void applyQueryOptions(QueryContext queryContext) {
Map<String, String> queryOptions = queryContext.getQueryOptions();
+ // Set skipUpsert
+ queryContext.setSkipUpsert(QueryOptionsUtils.isSkipUpsert(queryOptions));
+
// Set maxExecutionThreads
int maxExecutionThreads;
Integer maxExecutionThreadsFromQuery =
QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 58d2891..6e6752d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -103,6 +103,8 @@ public class QueryContext {
private long _endTimeMs;
// Whether to enable prefetch for the query
private boolean _enablePrefetch;
+ // Whether to skip upsert for the query
+ private boolean _skipUpsert;
// Maximum number of threads used to execute the query
private int _maxExecutionThreads =
InstancePlanMakerImplV2.DEFAULT_MAX_EXECUTION_THREADS;
// The following properties apply to group-by queries
@@ -294,6 +296,14 @@ public class QueryContext {
_enablePrefetch = enablePrefetch;
}
+ public boolean isSkipUpsert() {
+ return _skipUpsert;
+ }
+
+ public void setSkipUpsert(boolean skipUpsert) {
+ _skipUpsert = skipUpsert;
+ }
+
public int getMaxExecutionThreads() {
return _maxExecutionThreads;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]