This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e7c005  Fix upsert inconsistency by snapshotting the validDocIds 
before reading the numDocs (#8392)
5e7c005 is described below

commit 5e7c0050f684ac15b3d8a249ad596c2aee0911f5
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Mar 23 15:03:32 2022 -0700

    Fix upsert inconsistency by snapshotting the validDocIds before reading the 
numDocs (#8392)
---
 .../org/apache/pinot/core/plan/FilterPlanNode.java | 80 +++++++++++-----------
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  3 +
 .../core/query/request/context/QueryContext.java   | 10 +++
 3 files changed, 52 insertions(+), 41 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 047dbba..c43c5b5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.FunctionContext;
@@ -45,57 +46,55 @@ import 
org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEv
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.util.QueryOptionsUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 public class FilterPlanNode implements PlanNode {
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
-  private final int _numDocs;
-  private FilterContext _filterContext;
+  private final FilterContext _filter;
 
   // Cache the predicate evaluators
   private final Map<Predicate, PredicateEvaluator> _predicateEvaluatorMap = 
new HashMap<>();
 
   public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
-    _indexSegment = indexSegment;
-    _queryContext = queryContext;
-    // NOTE: Fetch number of documents in the segment when creating the plan 
node so that it is consistent among all
-    //       filter operators. Number of documents will keep increasing for 
MutableSegment (CONSUMING segment).
-    _numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    this(indexSegment, queryContext, null);
   }
 
-  public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext, 
FilterContext filterContext) {
-    this(indexSegment, queryContext);
-
-    _filterContext = filterContext;
+  public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext, 
@Nullable FilterContext filter) {
+    _indexSegment = indexSegment;
+    _queryContext = queryContext;
+    _filter = filter;
   }
 
   @Override
   public BaseFilterOperator run() {
-    FilterContext filter = _filterContext == null ? _queryContext.getFilter() 
: _filterContext;
+    // NOTE: Snapshot the validDocIds before reading the numDocs to prevent 
the latest updates getting lost
     ThreadSafeMutableRoaringBitmap validDocIds = 
_indexSegment.getValidDocIds();
-    boolean applyValidDocIds = validDocIds != null && 
!QueryOptionsUtils.isSkipUpsert(_queryContext.getQueryOptions());
+    MutableRoaringBitmap validDocIdsSnapshot =
+        validDocIds != null && !_queryContext.isSkipUpsert() ? 
validDocIds.getMutableRoaringBitmap() : null;
+    int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+
+    FilterContext filter = _filter != null ? _filter : 
_queryContext.getFilter();
     if (filter != null) {
-      BaseFilterOperator filterOperator = constructPhysicalOperator(filter);
-      if (applyValidDocIds) {
-        BaseFilterOperator validDocFilter =
-            new 
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, 
_numDocs);
-        return 
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, 
validDocFilter), _numDocs,
+      BaseFilterOperator filterOperator = constructPhysicalOperator(filter, 
numDocs);
+      if (validDocIdsSnapshot != null) {
+        BaseFilterOperator validDocFilter = new 
BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
+        return 
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, 
validDocFilter), numDocs,
             _queryContext.getDebugOptions());
       } else {
         return filterOperator;
       }
-    } else if (applyValidDocIds) {
-      return new 
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, 
_numDocs);
+    } else if (validDocIdsSnapshot != null) {
+      return new BitmapBasedFilterOperator(validDocIdsSnapshot, false, 
numDocs);
     } else {
-      return new MatchAllFilterOperator(_numDocs);
+      return new MatchAllFilterOperator(numDocs);
     }
   }
 
@@ -143,13 +142,13 @@ public class FilterPlanNode implements PlanNode {
   /**
    * Helper method to build the operator tree from the filter.
    */
-  private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
+  private BaseFilterOperator constructPhysicalOperator(FilterContext filter, 
int numDocs) {
     switch (filter.getType()) {
       case AND:
         List<FilterContext> childFilters = filter.getChildren();
         List<BaseFilterOperator> childFilterOperators = new 
ArrayList<>(childFilters.size());
         for (FilterContext childFilter : childFilters) {
-          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter);
+          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter, numDocs);
           if (childFilterOperator.isResultEmpty()) {
             // Return empty filter operator if any of the child filter 
operator's result is empty
             return EmptyFilterOperator.getInstance();
@@ -158,47 +157,46 @@ public class FilterPlanNode implements PlanNode {
             childFilterOperators.add(childFilterOperator);
           }
         }
-        return FilterOperatorUtils.getAndFilterOperator(childFilterOperators, 
_numDocs,
-            _queryContext.getDebugOptions());
+        return FilterOperatorUtils.getAndFilterOperator(childFilterOperators, 
numDocs, _queryContext.getDebugOptions());
       case OR:
         childFilters = filter.getChildren();
         childFilterOperators = new ArrayList<>(childFilters.size());
         for (FilterContext childFilter : childFilters) {
-          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter);
+          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter, numDocs);
           if (childFilterOperator.isResultMatchingAll()) {
             // Return match all filter operator if any of the child filter 
operator matches all records
-            return new MatchAllFilterOperator(_numDocs);
+            return new MatchAllFilterOperator(numDocs);
           } else if (!childFilterOperator.isResultEmpty()) {
             // Remove child filter operators whose result is empty
             childFilterOperators.add(childFilterOperator);
           }
         }
-        return FilterOperatorUtils.getOrFilterOperator(childFilterOperators, 
_numDocs, _queryContext.getDebugOptions());
+        return FilterOperatorUtils.getOrFilterOperator(childFilterOperators, 
numDocs, _queryContext.getDebugOptions());
       case NOT:
         childFilters = filter.getChildren();
         assert childFilters.size() == 1;
-        BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilters.get(0));
-        return FilterOperatorUtils.getNotFilterOperator(childFilterOperator, 
_numDocs, null);
+        BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilters.get(0), numDocs);
+        return FilterOperatorUtils.getNotFilterOperator(childFilterOperator, 
numDocs, null);
       case PREDICATE:
         Predicate predicate = filter.getPredicate();
         ExpressionContext lhs = predicate.getLhs();
         if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
           if (canApplyH3Index(predicate, lhs.getFunction())) {
-            return new H3IndexFilterOperator(_indexSegment, predicate, 
_numDocs);
+            return new H3IndexFilterOperator(_indexSegment, predicate, 
numDocs);
           }
           // TODO: ExpressionFilterOperator does not support predicate types 
without PredicateEvaluator (IS_NULL,
           //       IS_NOT_NULL, TEXT_MATCH)
-          return new ExpressionFilterOperator(_indexSegment, predicate, 
_numDocs);
+          return new ExpressionFilterOperator(_indexSegment, predicate, 
numDocs);
         } else {
           String column = lhs.getIdentifier();
           DataSource dataSource = _indexSegment.getDataSource(column);
           PredicateEvaluator predicateEvaluator = 
_predicateEvaluatorMap.get(predicate);
           if (predicateEvaluator != null) {
-            return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
_numDocs);
+            return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
numDocs);
           }
           switch (predicate.getType()) {
             case TEXT_MATCH:
-              return new TextMatchFilterOperator(dataSource.getTextIndex(), 
(TextMatchPredicate) predicate, _numDocs);
+              return new TextMatchFilterOperator(dataSource.getTextIndex(), 
(TextMatchPredicate) predicate, numDocs);
             case REGEXP_LIKE:
               // FST Index is available only for rolled out segments. So, we 
use different evaluator for rolled out and
               // consuming segments.
@@ -218,32 +216,32 @@ public class FilterPlanNode implements PlanNode {
                         dataSource.getDataSourceMetadata().getDataType());
               }
               _predicateEvaluatorMap.put(predicate, predicateEvaluator);
-              return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
_numDocs);
+              return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
numDocs);
             case JSON_MATCH:
               JsonIndexReader jsonIndex = dataSource.getJsonIndex();
               Preconditions.checkState(jsonIndex != null, "Cannot apply 
JSON_MATCH on column: %s without json index",
                   column);
-              return new JsonMatchFilterOperator(jsonIndex, 
(JsonMatchPredicate) predicate, _numDocs);
+              return new JsonMatchFilterOperator(jsonIndex, 
(JsonMatchPredicate) predicate, numDocs);
             case IS_NULL:
               NullValueVectorReader nullValueVector = 
dataSource.getNullValueVector();
               if (nullValueVector != null) {
-                return new 
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, _numDocs);
+                return new 
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, numDocs);
               } else {
                 return EmptyFilterOperator.getInstance();
               }
             case IS_NOT_NULL:
               nullValueVector = dataSource.getNullValueVector();
               if (nullValueVector != null) {
-                return new 
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, _numDocs);
+                return new 
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, numDocs);
               } else {
-                return new MatchAllFilterOperator(_numDocs);
+                return new MatchAllFilterOperator(numDocs);
               }
             default:
               predicateEvaluator =
                   PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
dataSource.getDictionary(),
                       dataSource.getDataSourceMetadata().getDataType());
               _predicateEvaluatorMap.put(predicate, predicateEvaluator);
-              return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
_numDocs);
+              return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
numDocs);
           }
         }
       default:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index ce4df76..b9b2543 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -182,6 +182,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   private void applyQueryOptions(QueryContext queryContext) {
     Map<String, String> queryOptions = queryContext.getQueryOptions();
 
+    // Set skipUpsert
+    queryContext.setSkipUpsert(QueryOptionsUtils.isSkipUpsert(queryOptions));
+
     // Set maxExecutionThreads
     int maxExecutionThreads;
     Integer maxExecutionThreadsFromQuery = 
QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 58d2891..6e6752d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -103,6 +103,8 @@ public class QueryContext {
   private long _endTimeMs;
   // Whether to enable prefetch for the query
   private boolean _enablePrefetch;
+  // Whether to skip upsert for the query
+  private boolean _skipUpsert;
   // Maximum number of threads used to execute the query
   private int _maxExecutionThreads = 
InstancePlanMakerImplV2.DEFAULT_MAX_EXECUTION_THREADS;
   // The following properties apply to group-by queries
@@ -294,6 +296,14 @@ public class QueryContext {
     _enablePrefetch = enablePrefetch;
   }
 
+  public boolean isSkipUpsert() {
+    return _skipUpsert;
+  }
+
+  public void setSkipUpsert(boolean skipUpsert) {
+    _skipUpsert = skipUpsert;
+  }
+
   public int getMaxExecutionThreads() {
     return _maxExecutionThreads;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to