This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f345da102 Handle big In clause (#10254)
2f345da102 is described below
commit 2f345da102b4b099fa47d67fcc793dc3266d072a
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Feb 15 17:20:48 2023 -0800
Handle big In clause (#10254)
---
.../predicate/InPredicateEvaluatorFactory.java | 18 ++++++-----
.../predicate/NotInPredicateEvaluatorFactory.java | 16 ++++++----
.../predicate/PredicateEvaluatorProvider.java | 21 ++++++++++---
.../operator/filter/predicate/PredicateUtils.java | 28 +++++++++++++----
.../org/apache/pinot/core/plan/FilterPlanNode.java | 5 ++--
.../index/readers/BaseImmutableDictionary.java | 35 ++++++++++++++++++++++
.../pinot/segment/spi/index/reader/Dictionary.java | 15 ++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +++
8 files changed, 117 insertions(+), 25 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
index 6559ba502f..9bbebcec57 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
@@ -32,8 +32,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
@@ -49,14 +51,15 @@ public class InPredicateEvaluatorFactory {
/**
* Create a new instance of dictionary based IN predicate evaluator.
*
- * @param inPredicate IN predicate to evaluate
- * @param dictionary Dictionary for the column
- * @param dataType Data type for the column
+ * @param inPredicate IN predicate to evaluate
+ * @param dictionary Dictionary for the column
+ * @param dataType Data type for the column
+ * @param queryContext Query context
* @return Dictionary based IN predicate evaluator
*/
public static BaseDictionaryBasedPredicateEvaluator
newDictionaryBasedEvaluator(InPredicate inPredicate,
- Dictionary dictionary, DataType dataType) {
- return new DictionaryBasedInPredicateEvaluator(inPredicate, dictionary,
dataType);
+ Dictionary dictionary, DataType dataType, @Nullable QueryContext
queryContext) {
+ return new DictionaryBasedInPredicateEvaluator(inPredicate, dictionary,
dataType, queryContext);
}
/**
@@ -155,9 +158,10 @@ public class InPredicateEvaluatorFactory {
final int _numMatchingDictIds;
int[] _matchingDictIds;
- DictionaryBasedInPredicateEvaluator(InPredicate inPredicate, Dictionary
dictionary, DataType dataType) {
+ DictionaryBasedInPredicateEvaluator(InPredicate inPredicate, Dictionary
dictionary, DataType dataType,
+ @Nullable QueryContext queryContext) {
super(inPredicate);
- _matchingDictIdSet = PredicateUtils.getDictIdSet(inPredicate,
dictionary, dataType);
+ _matchingDictIdSet = PredicateUtils.getDictIdSet(inPredicate,
dictionary, dataType, queryContext);
_numMatchingDictIds = _matchingDictIdSet.size();
if (_numMatchingDictIds == 0) {
_alwaysFalse = true;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
index b3666b932f..a0e2bf6a8d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
@@ -32,8 +32,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.predicate.NotInPredicate;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
@@ -50,13 +52,14 @@ public class NotInPredicateEvaluatorFactory {
* Create a new instance of dictionary based NOT_IN predicate evaluator.
*
* @param notInPredicate NOT_IN predicate to evaluate
- * @param dictionary Dictionary for the column
- * @param dataType Data type for the column
+ * @param dictionary Dictionary for the column
+ * @param dataType Data type for the column
+ * @param queryContext Query context
* @return Dictionary based NOT_IN predicate evaluator
*/
public static BaseDictionaryBasedPredicateEvaluator
newDictionaryBasedEvaluator(NotInPredicate notInPredicate,
- Dictionary dictionary, DataType dataType) {
- return new DictionaryBasedNotInPredicateEvaluator(notInPredicate,
dictionary, dataType);
+ Dictionary dictionary, DataType dataType, @Nullable QueryContext
queryContext) {
+ return new DictionaryBasedNotInPredicateEvaluator(notInPredicate,
dictionary, dataType, queryContext);
}
/**
@@ -157,9 +160,10 @@ public class NotInPredicateEvaluatorFactory {
int[] _matchingDictIds;
int[] _nonMatchingDictIds;
- DictionaryBasedNotInPredicateEvaluator(NotInPredicate notInPredicate,
Dictionary dictionary, DataType dataType) {
+ DictionaryBasedNotInPredicateEvaluator(NotInPredicate notInPredicate,
Dictionary dictionary, DataType dataType,
+ @Nullable QueryContext queryContext) {
super(notInPredicate);
- _nonMatchingDictIdSet = PredicateUtils.getDictIdSet(notInPredicate,
dictionary, dataType);
+ _nonMatchingDictIdSet = PredicateUtils.getDictIdSet(notInPredicate,
dictionary, dataType, queryContext);
_numNonMatchingDictIds = _nonMatchingDictIdSet.size();
if (_numNonMatchingDictIds == 0) {
_alwaysTrue = true;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
index 0a3bc7f952..519168818b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java
@@ -26,6 +26,8 @@ import
org.apache.pinot.common.request.context.predicate.NotInPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.request.context.predicate.RangePredicate;
import org.apache.pinot.common.request.context.predicate.RegexpLikePredicate;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -37,6 +39,11 @@ public class PredicateEvaluatorProvider {
public static PredicateEvaluator getPredicateEvaluator(Predicate predicate,
@Nullable Dictionary dictionary,
DataType dataType) {
+ return getPredicateEvaluator(predicate, dictionary, dataType, null);
+ }
+
+ public static PredicateEvaluator getPredicateEvaluator(Predicate predicate,
@Nullable Dictionary dictionary,
+ DataType dataType, @Nullable QueryContext queryContext) {
try {
if (dictionary != null) {
// dictionary based predicate evaluators
@@ -48,11 +55,11 @@ public class PredicateEvaluatorProvider {
return NotEqualsPredicateEvaluatorFactory
.newDictionaryBasedEvaluator((NotEqPredicate) predicate,
dictionary, dataType);
case IN:
- return InPredicateEvaluatorFactory
- .newDictionaryBasedEvaluator((InPredicate) predicate,
dictionary, dataType);
+ return
InPredicateEvaluatorFactory.newDictionaryBasedEvaluator((InPredicate)
predicate, dictionary,
+ dataType, queryContext);
case NOT_IN:
- return NotInPredicateEvaluatorFactory
- .newDictionaryBasedEvaluator((NotInPredicate) predicate,
dictionary, dataType);
+ return
NotInPredicateEvaluatorFactory.newDictionaryBasedEvaluator((NotInPredicate)
predicate, dictionary,
+ dataType, queryContext);
case RANGE:
return RangePredicateEvaluatorFactory
.newDictionaryBasedEvaluator((RangePredicate) predicate,
dictionary, dataType);
@@ -87,4 +94,10 @@ public class PredicateEvaluatorProvider {
throw new BadQueryRequestException(e);
}
}
+
+ public static PredicateEvaluator getPredicateEvaluator(Predicate predicate,
DataSource dataSource,
+ QueryContext queryContext) {
+ return getPredicateEvaluator(predicate, dataSource.getDictionary(),
+ dataSource.getDataSourceMetadata().getDataType(), queryContext);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
index a487d36654..52dffb5f04 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
@@ -18,16 +18,21 @@
*/
package org.apache.pinot.core.operator.filter.predicate;
+import com.google.common.base.Equivalence;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.predicate.BaseInPredicate;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimestampUtils;
@@ -70,7 +75,8 @@ public class PredicateUtils {
/**
* Returns a dictionary id set of the values in the given IN/NOT_IN
predicate.
*/
- public static IntSet getDictIdSet(BaseInPredicate inPredicate, Dictionary
dictionary, DataType dataType) {
+ public static IntSet getDictIdSet(BaseInPredicate inPredicate, Dictionary
dictionary, DataType dataType,
+ @Nullable QueryContext queryContext) {
List<String> values = inPredicate.getValues();
int hashSetSize = Integer.min(HashUtil.getMinHashSetSize(values.size()),
MAX_INITIAL_DICT_ID_SET_SIZE);
IntSet dictIdSet = new IntOpenHashSet(hashSetSize);
@@ -139,11 +145,23 @@ public class PredicateUtils {
}
break;
case STRING:
- for (String value : values) {
- int dictId = dictionary.indexOf(value);
- if (dictId >= 0) {
- dictIdSet.add(dictId);
+ if (queryContext == null || values.size() <=
Integer.parseInt(queryContext.getQueryOptions()
+
.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.IN_PREDICATE_SORT_THRESHOLD,
+
CommonConstants.Broker.Request.QueryOptionValue.DEFAULT_IN_PREDICATE_SORT_THRESHOLD)))
{
+ for (String value : values) {
+ int dictId = dictionary.indexOf(value);
+ if (dictId >= 0) {
+ dictIdSet.add(dictId);
+ }
}
+ } else {
+ List<String> sortedValues =
+ queryContext.getOrComputeSharedValue(List.class,
Equivalence.identity().wrap(inPredicate), k -> {
+ List<String> copyValues = new ArrayList<>(values);
+ copyValues.sort(null);
+ return copyValues;
+ });
+ dictionary.getDictIds(sortedValues, dictIdSet);
}
break;
case BYTES:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 97a6a420de..75bd69f7ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -300,9 +300,8 @@ public class FilterPlanNode implements PlanNode {
return new MatchAllFilterOperator(numDocs);
}
default:
- predicateEvaluator =
- PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
dataSource.getDictionary(),
- dataSource.getDataSourceMetadata().getDataType());
+ predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource,
+ _queryContext);
_predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
numDocs,
_queryContext.isNullHandlingEnabled());
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
index 2a3e9d4658..c5f90a0000 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter;
import org.apache.pinot.segment.local.io.util.ValueReader;
import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
@@ -279,4 +281,37 @@ public abstract class BaseImmutableDictionary implements
Dictionary {
protected byte[] getBuffer() {
return new byte[_numBytesPerValue];
}
+
+ /**
+ * Returns the dictionary id for the given sorted values.
+ * @param sortedValues
+ * @param dictIds
+ */
+ @Override
+ public void getDictIds(List<String> sortedValues, IntSet dictIds) {
+ int valueIdx = 0;
+ int dictIdx = 0;
+ byte[] utf8 = null;
+ boolean needNewUtf8 = true;
+ int sortedValuesSize = sortedValues.size();
+ int dictLength = length();
+ while (valueIdx < sortedValuesSize && dictIdx < dictLength) {
+ if (needNewUtf8) {
+ utf8 = sortedValues.get(valueIdx).getBytes(StandardCharsets.UTF_8);
+ }
+ int comparison = _valueReader.compareUtf8Bytes(dictIdx,
_numBytesPerValue, utf8);
+ if (comparison == 0) {
+ dictIds.add(dictIdx);
+ dictIdx++;
+ valueIdx++;
+ needNewUtf8 = true;
+ } else if (comparison > 0) {
+ valueIdx++;
+ needNewUtf8 = true;
+ } else {
+ dictIdx++;
+ needNewUtf8 = false;
+ }
+ }
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
index a5ec723d22..75ab13fa65 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.Closeable;
import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.List;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
@@ -242,4 +243,18 @@ public interface Dictionary extends Closeable {
outValues[i] = getBytesValue(dictIds[i]);
}
}
+
+ /**
+ * Returns the dictIds for the given sorted values. This method is for the
IN/NOT IN predicate evaluation.
+ * @param sortedValues
+ * @param dictIds
+ */
+ default void getDictIds(List<String> sortedValues, IntSet dictIds) {
+ for (String value : sortedValues) {
+ int dictId = indexOf(value);
+ if (dictId >= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fe63a4960a..3a821c3b2b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -310,6 +310,9 @@ public class CommonConstants {
public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
public static final String STAGE_PARALLELISM = "stageParallelism";
+ // Handle IN predicate evaluation for big IN lists
+ public static final String IN_PREDICATE_SORT_THRESHOLD =
"inPredicateSortThreshold";
+
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
public static final String PRESERVE_TYPE = "preserveType";
@@ -321,6 +324,7 @@ public class CommonConstants {
public static class QueryOptionValue {
public static final String ROUTING_FORCE_HLC = "FORCE_HLC";
+ public static final String DEFAULT_IN_PREDICATE_SORT_THRESHOLD =
"1000";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]