clintropolis commented on code in PR #16039: URL: https://github.com/apache/druid/pull/16039#discussion_r1534871090
########## processing/src/test/java/org/apache/druid/segment/filter/TypedInFilterTests.java: ########## @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.filter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.RangeSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.NotDimFilter; +import org.apache.druid.query.filter.TypedInFilter; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.Closeable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@RunWith(Enclosed.class) +public class TypedInFilterTests Review Comment: ok, consolidated these tests. its kind of ugly, but i think ok ########## processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java: ########## @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeRangeSet; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; +import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet; +import it.unimi.dsi.fastutil.objects.ObjectArrays; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ByteBufferUtils; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.math.expr.Evals; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.query.filter.vector.VectorValueMatcher; +import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnIndexSupplier; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.segment.index.BitmapColumnIndex; +import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes; +import org.apache.druid.segment.index.semantic.ValueSetIndexes; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; + +public class TypedInFilter extends AbstractOptimizableDimFilter implements Filter +{ + private final String column; + private final ColumnType matchValueType; + @Nullable + private final List<?> unsortedValues; + private final Supplier<List<?>> lazyMatchValues; + @Nullable + private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes; + @Nullable + private final FilterTuning filterTuning; + private final Supplier<DruidPredicateFactory> predicateFactorySupplier; + + @JsonIgnore + private final Supplier<byte[]> cacheKeySupplier; + + /** + * Creates a new filter. + * + * @param column column to search + * @param values set of values to match. This collection may be reused to avoid copying a big collection. + * Therefore, callers should <b>not</b> modify the collection after it is passed to this + * constructor. + * @param matchValueType type of values contained in set + * @param filterTuning optional tuning + */ + @JsonCreator + public TypedInFilter( + @JsonProperty("column") String column, + @JsonProperty("matchValueType") ColumnType matchValueType, + @JsonProperty("values") @Nullable List<?> values, + @JsonProperty("sortedValues") @Nullable List<?> sortedValues, + @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning + ) + { + if (NullHandling.replaceWithDefault()) { + throw InvalidInput.exception( + "Invalid IN filter, typed in filter only supports SQL compatible null handling mode, set druid.generic.useDefaultValue=false to use this filter" + ); + } + this.column = column; + if (column == null) { + throw InvalidInput.exception("Invalid IN filter, column cannot be null"); + } + this.filterTuning = filterTuning; + this.matchValueType = matchValueType; + if (matchValueType == null) { + throw InvalidInput.exception("Invalid IN filter on column [%s], matchValueType cannot be null", column); + } + // one of sorted or not sorted + if (values == null && sortedValues == null) { + throw InvalidInput.exception( + "Invalid IN filter on column [%s], exactly one of values or sortedValues must be non-null", + column + ); + } + if (sortedValues != null) { + this.unsortedValues = null; + this.lazyMatchValues = () -> sortedValues; + } else { + if (checkSorted(values, matchValueType)) { + this.unsortedValues = null; + this.lazyMatchValues = () -> values; + } else { + this.unsortedValues = values; + this.lazyMatchValues = Suppliers.memoize(() -> sortValues(unsortedValues, matchValueType)); + } + } + if (matchValueType.is(ValueType.STRING)) { + this.lazyMatchValueBytes = Suppliers.memoize(() -> { + final SortedSet<ByteBuffer> matchValueBytes = new ObjectAVLTreeSet<>(ByteBufferUtils.utf8Comparator()); + for (Object s : lazyMatchValues.get()) { + matchValueBytes.add(StringUtils.toUtf8ByteBuffer(Evals.asString(s))); + } + return matchValueBytes; + }); + } else { + this.lazyMatchValueBytes = null; + } + + this.predicateFactorySupplier = Suppliers.memoize( + () -> new PredicateFactory(lazyMatchValues.get(), matchValueType) + ); + this.cacheKeySupplier = Suppliers.memoize(this::computeCacheKey); + } + + @JsonProperty + public String getColumn() + { + return column; + } + + @JsonProperty + public List<?> getSortedValues() + { + return lazyMatchValues.get(); + } + + @JsonProperty + public ColumnType getMatchValueType() + { + return matchValueType; + } + + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty + public FilterTuning getFilterTuning() + { + return filterTuning; + } + + @Override + public byte[] getCacheKey() + { + return cacheKeySupplier.get(); + } + + @Override + public DimFilter optimize(final boolean mayIncludeUnknown) + { + final List<?> matchValues = lazyMatchValues.get(); + if (matchValues.isEmpty()) { + return FalseDimFilter.instance(); + } else if (matchValues.size() == 1) { + if (matchValues.get(0) == null) { + return NullFilter.forColumn(column); + } + return new EqualityFilter( + column, + matchValueType, + matchValues.iterator().next(), + filterTuning + ); + } + return this; + } + + @Override + public Filter toFilter() + { + return this; + } + + @Nullable + @Override + public RangeSet<String> getDimensionRangeSet(String dimension) + { + if (!Objects.equals(getColumn(), dimension)) { + return null; + } + RangeSet<String> retSet = TreeRangeSet.create(); + for (Object value : lazyMatchValues.get()) { + String valueEquivalent = NullHandling.nullToEmptyIfNeeded(Evals.asString(value)); + if (valueEquivalent == null) { + // Case when SQL compatible null handling is enabled + // Range.singleton(null) is invalid, so use the fact that + // only null values are less than empty string. + retSet.add(Range.lessThan("")); + } else { + retSet.add(Range.singleton(valueEquivalent)); + } + } + return retSet; + } + + @Override + public Set<String> getRequiredColumns() + { + return ImmutableSet.of(column); + } + + @Override + @Nullable + public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector) + { + if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) { + return null; + } + final ColumnIndexSupplier indexSupplier = selector.getIndexSupplier(column); + + if (indexSupplier == null) { + // column doesn't exist, match against null + DruidPredicateMatch match = predicateFactorySupplier.get().makeStringPredicate().apply(null); + return Filters.makeMissingColumnNullIndex(match, selector); + } + + if (lazyMatchValueBytes != null) { + final Utf8ValueSetIndexes utf8ValueSetIndexes = indexSupplier.as(Utf8ValueSetIndexes.class); + if (utf8ValueSetIndexes != null) { + return utf8ValueSetIndexes.forSortedValuesUtf8(lazyMatchValueBytes.get()); + } + } + + final ValueSetIndexes valueSetIndexes = indexSupplier.as(ValueSetIndexes.class); + if (valueSetIndexes != null) { + return valueSetIndexes.forSortedValues(lazyMatchValues.get(), matchValueType); + } + + return Filters.makePredicateIndex( + column, + selector, + predicateFactorySupplier.get() + ); + } + + @Override + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) + { + return Filters.makeValueMatcher(factory, column, predicateFactorySupplier.get()); + } + + @Override + public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) + { + return ColumnProcessors.makeVectorProcessor( + column, + VectorValueMatcherColumnProcessorFactory.instance(), + factory + ).makeMatcher(predicateFactorySupplier.get()); + } + + @Override + public boolean canVectorizeMatcher(ColumnInspector inspector) + { + return true; + } + + @Override + public boolean supportsRequiredColumnRewrite() + { + return true; + } + + @Override + public Filter rewriteRequiredColumns(Map<String, String> columnRewrites) + { + String rewriteDimensionTo = columnRewrites.get(column); + if (rewriteDimensionTo == null) { + throw new IAE("Received a non-applicable rewrite: %s, filter's dimension: %s", columnRewrites, column); + } + + if (rewriteDimensionTo.equals(column)) { + return this; + } else { + return new TypedInFilter( + rewriteDimensionTo, + matchValueType, + null, + lazyMatchValues.get(), + filterTuning + ); + } + } + + @Override + public String toString() + { + final DimFilter.DimFilterToStringBuilder builder = new DimFilter.DimFilterToStringBuilder(); + return builder.appendDimension(column, null) + .append(" IN (") + .append(Joiner.on(", ").join(Iterables.transform(lazyMatchValues.get(), String::valueOf))) + .append(")") + .append(" (" + matchValueType + ")") + .appendFilterTuning(filterTuning) + .build(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TypedInFilter that = (TypedInFilter) o; + return column.equals(that.column) && + Objects.equals(matchValueType, that.matchValueType) && + compareValues(lazyMatchValues.get(), that.lazyMatchValues.get(), matchValueType) && + Objects.equals(filterTuning, that.filterTuning); + } + + @Override + public int hashCode() + { + return Objects.hash(lazyMatchValues.get(), column, matchValueType, filterTuning); + } + + private byte[] computeCacheKey() + { + // Hash all values, in sorted order, as their length followed by their content. + final Hasher hasher = Hashing.sha256().newHasher(); + for (Object v : lazyMatchValues.get()) { + if (v == null) { + // Encode null as length -1, no content. + hasher.putInt(-1); + } else { + final String s = Evals.asString(v); + hasher.putInt(s.length()); + hasher.putString(s, StandardCharsets.UTF_8); + } + } + + return new CacheKeyBuilder(DimFilterUtils.NEW_IN_CACHE_ID) + .appendString(column) + .appendByte(DimFilterUtils.STRING_SEPARATOR) + .appendString(matchValueType.asTypeString()) + .appendByte(DimFilterUtils.STRING_SEPARATOR) + .appendByteArray(hasher.hash().asBytes()) + .build(); + } + + private static boolean checkSorted(List<?> unsortedValues, ColumnType matchValueType) + { + final Comparator<Object> comparator = matchValueType.getNullableStrategy(); + Object prev = null; + boolean needsCoerceCheck = true; + for (Object o : unsortedValues) { + if (needsCoerceCheck && o != null) { + Object coerced = coerceValue(o, matchValueType); + //noinspection ObjectEquality + if (coerced != o) { + return false; + } + needsCoerceCheck = false; + } + if (prev != null && comparator.compare(prev, o) >= 0) { + return false; + } + prev = o; + } + return true; + } + + @Nullable + private static Object coerceValue(@Nullable Object o, ColumnType matchValueType) + { + if (o == null) { + return o; + } + switch (matchValueType.getType()) { + case STRING: + return DimensionHandlerUtils.convertObjectToString(o); + case LONG: + return DimensionHandlerUtils.convertObjectToLong(o); + case FLOAT: + return DimensionHandlerUtils.convertObjectToFloat(o); + case DOUBLE: + return DimensionHandlerUtils.convertObjectToDouble(o); + default: + throw InvalidInput.exception("Unsupported matchValueType[%s]", matchValueType); + } + } + + private static List<?> sortValues(List<?> unsortedValues, ColumnType matchValueType) + { + final Object[] array = unsortedValues.toArray(new Object[0]); + // check if values need coerced + for (int i = 0; i < array.length; i++) { + Object coerced = coerceValue(array[i], matchValueType); + //noinspection ObjectEquality + if (coerced != null && array[i] == coerced) { + // assume list is all same type objects... + break; + } + array[i] = coerced; + } + ObjectArrays.quickSort(array, matchValueType.getNullableStrategy()); + return Arrays.asList(array); + } + + /** + * Since jackson might translate longs into ints and such, we use the type comparator to check lists + * for {@link #equals(Object)} instead of directly using {@link Objects#equals(Object, Object)} + */ + private static boolean compareValues(List<?> o1, List<?> o2, ColumnType matchValueType) + { + final NullableTypeStrategy comparator = matchValueType.getNullableStrategy(); + //noinspection ObjectEquality + if (o1 == o2) { + return true; + } + if (o1 == null) { + return false; + } + if (o2 == null) { + return false; + } + final int iter = Math.min(o1.size(), o2.size()); + for (int i = 0; i < iter; i++) { + final int cmp = comparator.compare(o1.get(i), o2.get(i)); + if (cmp == 0) { + continue; + } + return false; + } + return o1.size() == o2.size(); + } + + private static DruidObjectPredicate<String> createStringPredicate( + final List sortedValues, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
