clintropolis commented on code in PR #16039:
URL: https://github.com/apache/druid/pull/16039#discussion_r1534874307


##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {

Review Comment:
   added javadocs to hopefully better explain inner workings



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {
+      this.unsortedValues = null;
+      this.lazyMatchValues = () -> sortedValues;
+    } else {
+      this.unsortedValues = values;
+      this.lazyMatchValues = Suppliers.memoize(() -> 
sortValues(unsortedValues, matchValueType));
+    }
+    if (matchValueType.is(ValueType.STRING)) {
+      this.lazyMatchValueBytes = Suppliers.memoize(() -> {
+        final SortedSet<ByteBuffer> matchValueBytes = new 
ObjectAVLTreeSet<>(ByteBufferUtils.utf8Comparator());
+        for (Object s : lazyMatchValues.get()) {
+          matchValueBytes.add(StringUtils.toUtf8ByteBuffer(Evals.asString(s)));
+        }
+        return matchValueBytes;
+      });
+    } else {
+      this.lazyMatchValueBytes = null;
+    }
+
+    this.predicateFactorySupplier = Suppliers.memoize(
+        () -> new InFilterDruidPredicateFactory(lazyMatchValues.get(), 
matchValueType)
+    );
+    this.cacheKeySupplier = Suppliers.memoize(this::computeCacheKey);
+  }
+
+  @JsonProperty
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty
+  public List<?> getSortedValues()
+  {
+    return lazyMatchValues.get();
+  }
+
+  @JsonProperty
+  public ColumnType getMatchValueType()
+  {
+    return matchValueType;
+  }
+
+  @Nullable
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @JsonProperty
+  public FilterTuning getFilterTuning()
+  {
+    return filterTuning;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return cacheKeySupplier.get();
+  }
+
+  @Override
+  public DimFilter optimize(final boolean mayIncludeUnknown)
+  {
+    final List<?> matchValues = lazyMatchValues.get();
+    if (matchValues.isEmpty()) {
+      return FalseDimFilter.instance();
+    } else if (matchValues.size() == 1) {
+      if (matchValues.get(0) == null) {
+        return NullFilter.forColumn(column);
+      }
+      return new EqualityFilter(
+          column,
+          matchValueType,
+          matchValues.iterator().next(),
+          filterTuning
+      );
+    }
+    return this;
+  }
+
+  @Override
+  public Filter toFilter()
+  {
+    return this;
+  }
+
+  @Nullable
+  @Override
+  public RangeSet<String> getDimensionRangeSet(String dimension)
+  {
+    if (!Objects.equals(getColumn(), dimension)) {
+      return null;
+    }
+    RangeSet<String> retSet = TreeRangeSet.create();
+    for (Object value : lazyMatchValues.get()) {
+      String valueEquivalent = 
NullHandling.nullToEmptyIfNeeded(Evals.asString(value));
+      if (valueEquivalent == null) {
+        // Case when SQL compatible null handling is enabled
+        // Range.singleton(null) is invalid, so use the fact that
+        // only null values are less than empty string.
+        retSet.add(Range.lessThan(""));
+      } else {
+        retSet.add(Range.singleton(valueEquivalent));
+      }
+    }
+    return retSet;
+  }
+
+  @Override
+  public Set<String> getRequiredColumns()
+  {
+    return ImmutableSet.of(column);
+  }
+
+  @Override
+  @Nullable
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) {
+      return null;
+    }
+    final ColumnIndexSupplier indexSupplier = 
selector.getIndexSupplier(column);
+
+    if (indexSupplier == null) {
+      // column doesn't exist, match against null
+      DruidPredicateMatch match = 
predicateFactorySupplier.get().makeStringPredicate().apply(null);
+      return Filters.makeMissingColumnNullIndex(match, selector);
+    }
+
+    if (lazyMatchValueBytes != null) {
+      final Utf8ValueSetIndexes utf8ValueSetIndexes = 
indexSupplier.as(Utf8ValueSetIndexes.class);
+      if (utf8ValueSetIndexes != null) {
+        return 
utf8ValueSetIndexes.forSortedValuesUtf8(lazyMatchValueBytes.get());
+      }
+    }
+
+    final ValueSetIndexes valueSetIndexes = 
indexSupplier.as(ValueSetIndexes.class);
+    if (valueSetIndexes != null) {
+      return valueSetIndexes.forSortedValues(lazyMatchValues.get(), 
matchValueType);
+    }
+
+    return Filters.makePredicateIndex(
+        column,
+        selector,
+        predicateFactorySupplier.get()
+    );
+  }
+
+  @Override
+  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+  {
+    return Filters.makeValueMatcher(factory, column, 
predicateFactorySupplier.get());
+  }
+
+  @Override
+  public VectorValueMatcher makeVectorMatcher(final 
VectorColumnSelectorFactory factory)
+  {
+    return ColumnProcessors.makeVectorProcessor(
+        column,
+        VectorValueMatcherColumnProcessorFactory.instance(),
+        factory
+    ).makeMatcher(predicateFactorySupplier.get());
+  }
+
+  @Override
+  public boolean canVectorizeMatcher(ColumnInspector inspector)
+  {
+    return true;
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(column);
+    if (rewriteDimensionTo == null) {
+      throw new IAE("Received a non-applicable rewrite: %s, filter's 
dimension: %s", columnRewrites, column);
+    }
+
+    if (rewriteDimensionTo.equals(column)) {
+      return this;
+    } else {
+      return new TypedInFilter(
+          rewriteDimensionTo,
+          matchValueType,
+          null,
+          lazyMatchValues.get(),
+          filterTuning
+      );
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    final DimFilter.DimFilterToStringBuilder builder = new 
DimFilter.DimFilterToStringBuilder();
+    return builder.appendDimension(column, null)
+                  .append(" IN (")
+                  .append(Joiner.on(", 
").join(Iterables.transform(lazyMatchValues.get(), String::valueOf)))
+                  .append(")")
+                  .append(" (" + matchValueType + ")")
+                  .appendFilterTuning(filterTuning)
+                  .build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TypedInFilter that = (TypedInFilter) o;
+    return column.equals(that.column) &&
+           Objects.equals(matchValueType, that.matchValueType) &&
+           compareValues(lazyMatchValues.get(), that.lazyMatchValues.get(), 
matchValueType) &&
+           Objects.equals(filterTuning, that.filterTuning);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(lazyMatchValues.get(), column, matchValueType, 
filterTuning);
+  }
+
+  private byte[] computeCacheKey()
+  {
+    // Hash all values, in sorted order, as their length followed by their 
content.
+    final Hasher hasher = Hashing.sha256().newHasher();
+    for (Object v : lazyMatchValues.get()) {
+      if (v == null) {
+        // Encode null as length -1, no content.
+        hasher.putInt(-1);
+      } else {
+        final String s = Evals.asString(v);
+        hasher.putInt(s.length());
+        hasher.putString(s, StandardCharsets.UTF_8);
+      }
+    }
+
+    return new CacheKeyBuilder(DimFilterUtils.NEW_IN_CACHE_ID)
+        .appendString(column)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(matchValueType.asTypeString())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(hasher.hash().asBytes())
+        .build();
+  }
+
+  private static List<?> sortValues(List<?> unsortedValues, ColumnType 
matchValueType)
+  {
+    final Stream<?> coerced;
+    if (matchValueType.is(ValueType.LONG)) {
+      coerced = 
unsortedValues.stream().map(DimensionHandlerUtils::convertObjectToLong).distinct();
+    } else if (matchValueType.is(ValueType.DOUBLE)) {
+      coerced = 
unsortedValues.stream().map(DimensionHandlerUtils::convertObjectToDouble).distinct();
+    } else if (matchValueType.is(ValueType.FLOAT)) {
+      coerced = 
unsortedValues.stream().map(DimensionHandlerUtils::convertObjectToFloat).distinct();
+    } else {
+      coerced = unsortedValues.stream().distinct();
+    }
+    return 
coerced.sorted(matchValueType.getNullableStrategy()).collect(Collectors.toList());
+  }
+
+  /**
+   * Since jackson might translate longs into ints and such, we use the type 
comparator to check lists
+   * for {@link #equals(Object)} instead of directly using {@link 
Objects#equals(Object, Object)}
+   */
+  private static boolean compareValues(List<?> o1, List<?> o2, ColumnType 
matchValueType)
+  {
+    final NullableTypeStrategy comparator = 
matchValueType.getNullableStrategy();
+    //noinspection ObjectEquality
+    if (o1 == o2) {
+      return true;
+    }
+    if (o1 == null) {
+      return false;
+    }
+    if (o2 == null) {
+      return false;
+    }
+    final int iter = Math.min(o1.size(), o2.size());
+    for (int i = 0; i < iter; i++) {
+      final int cmp = comparator.compare(o1.get(i), o2.get(i));
+      if (cmp == 0) {
+        continue;
+      }
+      return false;
+    }
+    return o1.size() == o2.size();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to