This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f345da102 Handle big In clause (#10254)
2f345da102 is described below

commit 2f345da102b4b099fa47d67fcc793dc3266d072a
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Feb 15 17:20:48 2023 -0800

    Handle big In clause (#10254)
---
 .../predicate/InPredicateEvaluatorFactory.java     | 18 ++++++-----
 .../predicate/NotInPredicateEvaluatorFactory.java  | 16 ++++++----
 .../predicate/PredicateEvaluatorProvider.java      | 21 ++++++++++---
 .../operator/filter/predicate/PredicateUtils.java  | 28 +++++++++++++----
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  5 ++--
 .../index/readers/BaseImmutableDictionary.java     | 35 ++++++++++++++++++++++
 .../pinot/segment/spi/index/reader/Dictionary.java | 15 ++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  4 +++
 8 files changed, 117 insertions(+), 25 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
index 6559ba502f..9bbebcec57 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
@@ -32,8 +32,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.predicate.InPredicate;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -49,14 +51,15 @@ public class InPredicateEvaluatorFactory {
   /**
    * Create a new instance of dictionary based IN predicate evaluator.
    *
-   * @param inPredicate IN predicate to evaluate
-   * @param dictionary Dictionary for the column
-   * @param dataType Data type for the column
+   * @param inPredicate  IN predicate to evaluate
+   * @param dictionary   Dictionary for the column
+   * @param dataType     Data type for the column
+   * @param queryContext Query context
    * @return Dictionary based IN predicate evaluator
    */
   public static BaseDictionaryBasedPredicateEvaluator 
newDictionaryBasedEvaluator(InPredicate inPredicate,
-      Dictionary dictionary, DataType dataType) {
-    return new DictionaryBasedInPredicateEvaluator(inPredicate, dictionary, 
dataType);
+      Dictionary dictionary, DataType dataType, @Nullable QueryContext 
queryContext) {
+    return new DictionaryBasedInPredicateEvaluator(inPredicate, dictionary, 
dataType, queryContext);
   }
 
   /**
@@ -155,9 +158,10 @@ public class InPredicateEvaluatorFactory {
     final int _numMatchingDictIds;
     int[] _matchingDictIds;
 
-    DictionaryBasedInPredicateEvaluator(InPredicate inPredicate, Dictionary 
dictionary, DataType dataType) {
+    DictionaryBasedInPredicateEvaluator(InPredicate inPredicate, Dictionary 
dictionary, DataType dataType,
+        @Nullable QueryContext queryContext) {
       super(inPredicate);
-      _matchingDictIdSet = PredicateUtils.getDictIdSet(inPredicate, 
dictionary, dataType);
+      _matchingDictIdSet = PredicateUtils.getDictIdSet(inPredicate, 
dictionary, dataType, queryContext);
       _numMatchingDictIds = _matchingDictIdSet.size();
       if (_numMatchingDictIds == 0) {
         _alwaysFalse = true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
index b3666b932f..a0e2bf6a8d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
@@ -32,8 +32,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.predicate.NotInPredicate;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -50,13 +52,14 @@ public class NotInPredicateEvaluatorFactory {
    * Create a new instance of dictionary based NOT_IN predicate evaluator.
    *
    * @param notInPredicate NOT_IN predicate to evaluate
-   * @param dictionary Dictionary for the column
-   * @param dataType Data type for the column
+   * @param dictionary     Dictionary for the column
+   * @param dataType       Data type for the column
+   * @param queryContext   Query context
    * @return Dictionary based NOT_IN predicate evaluator
    */
   public static BaseDictionaryBasedPredicateEvaluator 
newDictionaryBasedEvaluator(NotInPredicate notInPredicate,
-      Dictionary dictionary, DataType dataType) {
-    return new DictionaryBasedNotInPredicateEvaluator(notInPredicate, 
dictionary, dataType);
+      Dictionary dictionary, DataType dataType, @Nullable QueryContext 
queryContext) {
+    return new DictionaryBasedNotInPredicateEvaluator(notInPredicate, 
dictionary, dataType, queryContext);
   }
 
   /**
@@ -157,9 +160,10 @@ public class NotInPredicateEvaluatorFactory {
     int[] _matchingDictIds;
     int[] _nonMatchingDictIds;
 
-    DictionaryBasedNotInPredicateEvaluator(NotInPredicate notInPredicate, 
Dictionary dictionary, DataType dataType) {
+    DictionaryBasedNotInPredicateEvaluator(NotInPredicate notInPredicate, 
Dictionary dictionary, DataType dataType,
+        @Nullable QueryContext queryContext) {
       super(notInPredicate);
-      _nonMatchingDictIdSet = PredicateUtils.getDictIdSet(notInPredicate, 
dictionary, dataType);
+      _nonMatchingDictIdSet = PredicateUtils.getDictIdSet(notInPredicate, 
dictionary, dataType, queryContext);
       _numNonMatchingDictIds = _nonMatchingDictIdSet.size();
       if (_numNonMatchingDictIds == 0) {
         _alwaysTrue = true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
index 0a3bc7f952..519168818b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
@@ -26,6 +26,8 @@ import 
org.apache.pinot.common.request.context.predicate.NotInPredicate;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.common.request.context.predicate.RegexpLikePredicate;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -37,6 +39,11 @@ public class PredicateEvaluatorProvider {
 
   public static PredicateEvaluator getPredicateEvaluator(Predicate predicate, 
@Nullable Dictionary dictionary,
       DataType dataType) {
+    return getPredicateEvaluator(predicate, dictionary, dataType, null);
+  }
+
+  public static PredicateEvaluator getPredicateEvaluator(Predicate predicate, 
@Nullable Dictionary dictionary,
+      DataType dataType, @Nullable QueryContext queryContext) {
     try {
       if (dictionary != null) {
         // dictionary based predicate evaluators
@@ -48,11 +55,11 @@ public class PredicateEvaluatorProvider {
             return NotEqualsPredicateEvaluatorFactory
                 .newDictionaryBasedEvaluator((NotEqPredicate) predicate, 
dictionary, dataType);
           case IN:
-            return InPredicateEvaluatorFactory
-                .newDictionaryBasedEvaluator((InPredicate) predicate, 
dictionary, dataType);
+            return 
InPredicateEvaluatorFactory.newDictionaryBasedEvaluator((InPredicate) 
predicate, dictionary,
+                dataType, queryContext);
           case NOT_IN:
-            return NotInPredicateEvaluatorFactory
-                .newDictionaryBasedEvaluator((NotInPredicate) predicate, 
dictionary, dataType);
+            return 
NotInPredicateEvaluatorFactory.newDictionaryBasedEvaluator((NotInPredicate) 
predicate, dictionary,
+                dataType, queryContext);
           case RANGE:
             return RangePredicateEvaluatorFactory
                 .newDictionaryBasedEvaluator((RangePredicate) predicate, 
dictionary, dataType);
@@ -87,4 +94,10 @@ public class PredicateEvaluatorProvider {
       throw new BadQueryRequestException(e);
     }
   }
+
+  public static PredicateEvaluator getPredicateEvaluator(Predicate predicate, 
DataSource dataSource,
+      QueryContext queryContext) {
+    return getPredicateEvaluator(predicate, dataSource.getDictionary(),
+        dataSource.getDataSourceMetadata().getDataType(), queryContext);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
index a487d36654..52dffb5f04 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
@@ -18,16 +18,21 @@
  */
 package org.apache.pinot.core.operator.filter.predicate;
 
+import com.google.common.base.Equivalence;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.predicate.BaseInPredicate;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.TimestampUtils;
 
 
@@ -70,7 +75,8 @@ public class PredicateUtils {
   /**
    * Returns a dictionary id set of the values in the given IN/NOT_IN 
predicate.
    */
-  public static IntSet getDictIdSet(BaseInPredicate inPredicate, Dictionary 
dictionary, DataType dataType) {
+  public static IntSet getDictIdSet(BaseInPredicate inPredicate, Dictionary 
dictionary, DataType dataType,
+      @Nullable QueryContext queryContext) {
     List<String> values = inPredicate.getValues();
     int hashSetSize = Integer.min(HashUtil.getMinHashSetSize(values.size()), 
MAX_INITIAL_DICT_ID_SET_SIZE);
     IntSet dictIdSet = new IntOpenHashSet(hashSetSize);
@@ -139,11 +145,23 @@ public class PredicateUtils {
         }
         break;
       case STRING:
-        for (String value : values) {
-          int dictId = dictionary.indexOf(value);
-          if (dictId >= 0) {
-            dictIdSet.add(dictId);
+        if (queryContext == null || values.size() <= 
Integer.parseInt(queryContext.getQueryOptions()
+            
.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.IN_PREDICATE_SORT_THRESHOLD,
+                
CommonConstants.Broker.Request.QueryOptionValue.DEFAULT_IN_PREDICATE_SORT_THRESHOLD)))
 {
+          for (String value : values) {
+            int dictId = dictionary.indexOf(value);
+            if (dictId >= 0) {
+              dictIdSet.add(dictId);
+            }
           }
+        } else {
+          List<String> sortedValues =
+              queryContext.getOrComputeSharedValue(List.class, 
Equivalence.identity().wrap(inPredicate), k -> {
+                List<String> copyValues = new ArrayList<>(values);
+                copyValues.sort(null);
+                return copyValues;
+              });
+          dictionary.getDictIds(sortedValues, dictIdSet);
         }
         break;
       case BYTES:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 97a6a420de..75bd69f7ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -300,9 +300,8 @@ public class FilterPlanNode implements PlanNode {
                 return new MatchAllFilterOperator(numDocs);
               }
             default:
-              predicateEvaluator =
-                  PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
dataSource.getDictionary(),
-                      dataSource.getDataSourceMetadata().getDataType());
+              predicateEvaluator = 
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource,
+                  _queryContext);
               _predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
               return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
numDocs,
                   _queryContext.isNullHandlingEnabled());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
index 2a3e9d4658..c5f90a0000 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter;
 import org.apache.pinot.segment.local.io.util.ValueReader;
 import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
@@ -279,4 +281,37 @@ public abstract class BaseImmutableDictionary implements 
Dictionary {
   protected byte[] getBuffer() {
     return new byte[_numBytesPerValue];
   }
+
+  /**
+   * Returns the dictionary id for the given sorted values.
+   * @param sortedValues
+   * @param dictIds
+   */
+  @Override
+  public void getDictIds(List<String> sortedValues, IntSet dictIds) {
+    int valueIdx = 0;
+    int dictIdx = 0;
+    byte[] utf8 = null;
+    boolean needNewUtf8 = true;
+    int sortedValuesSize = sortedValues.size();
+    int dictLength = length();
+    while (valueIdx < sortedValuesSize && dictIdx < dictLength) {
+      if (needNewUtf8) {
+        utf8 = sortedValues.get(valueIdx).getBytes(StandardCharsets.UTF_8);
+      }
+      int comparison = _valueReader.compareUtf8Bytes(dictIdx, 
_numBytesPerValue, utf8);
+      if (comparison == 0) {
+        dictIds.add(dictIdx);
+        dictIdx++;
+        valueIdx++;
+        needNewUtf8 = true;
+      } else if (comparison > 0) {
+        valueIdx++;
+        needNewUtf8 = true;
+      } else {
+        dictIdx++;
+        needNewUtf8 = false;
+      }
+    }
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
index a5ec723d22..75ab13fa65 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
 import java.io.Closeable;
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.List;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
 
@@ -242,4 +243,18 @@ public interface Dictionary extends Closeable {
       outValues[i] = getBytesValue(dictIds[i]);
     }
   }
+
+  /**
+   * Returns the dictIds for the given sorted values. This method is for the 
IN/NOT IN predicate evaluation.
+   * @param sortedValues
+   * @param dictIds
+   */
+  default void getDictIds(List<String> sortedValues, IntSet dictIds) {
+    for (String value : sortedValues) {
+      int dictId = indexOf(value);
+      if (dictId >= 0) {
+        dictIds.add(dictId);
+      }
+    }
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fe63a4960a..3a821c3b2b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -310,6 +310,9 @@ public class CommonConstants {
         public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
         public static final String STAGE_PARALLELISM = "stageParallelism";
 
+        // Handle IN predicate evaluation for big IN lists
+        public static final String IN_PREDICATE_SORT_THRESHOLD = 
"inPredicateSortThreshold";
+
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated
         public static final String PRESERVE_TYPE = "preserveType";
@@ -321,6 +324,7 @@ public class CommonConstants {
 
       public static class QueryOptionValue {
         public static final String ROUTING_FORCE_HLC = "FORCE_HLC";
+        public static final String DEFAULT_IN_PREDICATE_SORT_THRESHOLD = 
"1000";
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to